Android 的 MessageQueue 很复杂(native poll/epoll、barrier、idle handler…)
但它的核心思想非常简单:
✅ 一个队列存消息
✅ 一个循环不断取消息执行
✅ 线程安全(加锁/条件变量)我们用 C 写一个可跑的简化版:
支持
post(msg)支持
loop()阻塞取消息支持
quit()退出
1)数据结构:消息节点 + 队列
typedef struct Message { int what; void (*callback)(int what); // 收到消息后执行的回调 struct Message* next; } Message; typedef struct { Message* head; Message* tail; int quit; pthread_mutex_t mutex; pthread_cond_t cond; } MessageQueue;2)队列初始化 / 销毁
void mq_init(MessageQueue* q){ q->head = q->tail = NULL; q->quit = 0; pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->cond, NULL); } void mq_destroy(MessageQueue* q){ pthread_mutex_lock(&q->mutex); Message* cur = q->head; while(cur){ Message* next = cur->next; free(cur); cur = next; } q->head = q->tail = NULL; pthread_mutex_unlock(&q->mutex); pthread_mutex_destroy(&q->mutex); pthread_cond_destroy(&q->cond); }3)post:入队 + 唤醒 loop
void mq_post(MessageQueue* q, int what, void (*cb)(int)){ Message* m = (Message*)malloc(sizeof(Message)); m->what = what; m->callback = cb; m->next = NULL; pthread_mutex_lock(&q->mutex); if(q->tail == NULL){ q->head = q->tail = m; }else{ q->tail->next = m; q->tail = m; } pthread_cond_signal(&q->cond); // 通知消费者 pthread_mutex_unlock(&q->mutex); }4)next:阻塞取消息(队列空就等)
Message* mq_next(MessageQueue* q){ pthread_mutex_lock(&q->mutex); while(!q->quit && q->head == NULL){ pthread_cond_wait(&q->cond, &q->mutex); } if(q->quit){ pthread_mutex_unlock(&q->mutex); return NULL; } Message* m = q->head; q->head = m->next; if(q->head == NULL) q->tail = NULL; pthread_mutex_unlock(&q->mutex); return m; }5)loop:像 Looper 一样执行消息
void mq_loop(MessageQueue* q){ while(1){ Message* m = mq_next(q); if(m == NULL) break; if(m->callback){ m->callback(m->what); } free(m); } }6)quit:让 loop 退出
void mq_quit(MessageQueue* q){ pthread_mutex_lock(&q->mutex); q->quit = 1; pthread_cond_broadcast(&q->cond); pthread_mutex_unlock(&q->mutex); }7)完整可运行 Demo(Linux / macOS)
编译:
gcc mq.c -o mq -lpthread
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> typedef struct Message { int what; void (*callback)(int what); struct Message* next; } Message; typedef struct { Message* head; Message* tail; int quit; pthread_mutex_t mutex; pthread_cond_t cond; } MessageQueue; void mq_init(MessageQueue* q){ q->head = q->tail = NULL; q->quit = 0; pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->cond, NULL); } void mq_post(MessageQueue* q, int what, void (*cb)(int)){ Message* m = (Message*)malloc(sizeof(Message)); m->what = what; m->callback = cb; m->next = NULL; pthread_mutex_lock(&q->mutex); if(q->tail == NULL){ q->head = q->tail = m; }else{ q->tail->next = m; q->tail = m; } pthread_cond_signal(&q->cond); pthread_mutex_unlock(&q->mutex); } Message* mq_next(MessageQueue* q){ pthread_mutex_lock(&q->mutex); while(!q->quit && q->head == NULL){ pthread_cond_wait(&q->cond, &q->mutex); } if(q->quit){ pthread_mutex_unlock(&q->mutex); return NULL; } Message* m = q->head; q->head = m->next; if(q->head == NULL) q->tail = NULL; pthread_mutex_unlock(&q->mutex); return m; } void mq_quit(MessageQueue* q){ pthread_mutex_lock(&q->mutex); q->quit = 1; pthread_cond_broadcast(&q->cond); pthread_mutex_unlock(&q->mutex); } void mq_loop(MessageQueue* q){ while(1){ Message* m = mq_next(q); if(m == NULL) break; if(m->callback) m->callback(m->what); free(m); } } void mq_destroy(MessageQueue* q){ pthread_mutex_lock(&q->mutex); Message* cur = q->head; while(cur){ Message* next = cur->next; free(cur); cur = next; } q->head = q->tail = NULL; pthread_mutex_unlock(&q->mutex); pthread_mutex_destroy(&q->mutex); pthread_cond_destroy(&q->cond); } void on_msg(int what){ printf("handle msg what=%d (thread=%lu)\n", what, (unsigned long)pthread_self()); } typedef struct { MessageQueue* q; } ProducerArg; void* producer(void* arg){ ProducerArg* pa = (ProducerArg*)arg; for(int i=1;i<=5;i++){ mq_post(pa->q, i, on_msg); usleep(200 * 1000); } mq_quit(pa->q); return NULL; } int main(){ MessageQueue q; mq_init(&q); pthread_t t; ProducerArg pa = { .q = &q }; pthread_create(&t, NULL, producer, &pa); mq_loop(&q); pthread_join(t, NULL); mq_destroy(&q); return 0; }8)这跟 Android MessageQueue 的对应关系
mq_post≈enqueueMessagemq_next≈next()mq_loop≈Looper.loop()cond_wait≈ “没有消息就阻塞等待”quit≈Looper.quit()