news 2026/4/9 3:09:24

用 C 实现一个简化版 MessageQueue

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
用 C 实现一个简化版 MessageQueue

Android 的 MessageQueue 很复杂(native poll/epoll、barrier、idle handler…)
但它的核心思想非常简单
✅ 一个队列存消息
✅ 一个循环不断取消息执行
✅ 线程安全(加锁/条件变量)

我们用 C 写一个可跑的简化版

  • 支持post(msg)

  • 支持loop()阻塞取消息

  • 支持quit()退出

1)数据结构:消息节点 + 队列

typedef struct Message { int what; void (*callback)(int what); // 收到消息后执行的回调 struct Message* next; } Message; typedef struct { Message* head; Message* tail; int quit; pthread_mutex_t mutex; pthread_cond_t cond; } MessageQueue;

2)队列初始化 / 销毁

void mq_init(MessageQueue* q){ q->head = q->tail = NULL; q->quit = 0; pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->cond, NULL); } void mq_destroy(MessageQueue* q){ pthread_mutex_lock(&q->mutex); Message* cur = q->head; while(cur){ Message* next = cur->next; free(cur); cur = next; } q->head = q->tail = NULL; pthread_mutex_unlock(&q->mutex); pthread_mutex_destroy(&q->mutex); pthread_cond_destroy(&q->cond); }

3)post:入队 + 唤醒 loop

void mq_post(MessageQueue* q, int what, void (*cb)(int)){ Message* m = (Message*)malloc(sizeof(Message)); m->what = what; m->callback = cb; m->next = NULL; pthread_mutex_lock(&q->mutex); if(q->tail == NULL){ q->head = q->tail = m; }else{ q->tail->next = m; q->tail = m; } pthread_cond_signal(&q->cond); // 通知消费者 pthread_mutex_unlock(&q->mutex); }

4)next:阻塞取消息(队列空就等)

Message* mq_next(MessageQueue* q){ pthread_mutex_lock(&q->mutex); while(!q->quit && q->head == NULL){ pthread_cond_wait(&q->cond, &q->mutex); } if(q->quit){ pthread_mutex_unlock(&q->mutex); return NULL; } Message* m = q->head; q->head = m->next; if(q->head == NULL) q->tail = NULL; pthread_mutex_unlock(&q->mutex); return m; }

5)loop:像 Looper 一样执行消息

void mq_loop(MessageQueue* q){ while(1){ Message* m = mq_next(q); if(m == NULL) break; if(m->callback){ m->callback(m->what); } free(m); } }

6)quit:让 loop 退出

void mq_quit(MessageQueue* q){ pthread_mutex_lock(&q->mutex); q->quit = 1; pthread_cond_broadcast(&q->cond); pthread_mutex_unlock(&q->mutex); }

7)完整可运行 Demo(Linux / macOS)

编译:gcc mq.c -o mq -lpthread

#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> typedef struct Message { int what; void (*callback)(int what); struct Message* next; } Message; typedef struct { Message* head; Message* tail; int quit; pthread_mutex_t mutex; pthread_cond_t cond; } MessageQueue; void mq_init(MessageQueue* q){ q->head = q->tail = NULL; q->quit = 0; pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->cond, NULL); } void mq_post(MessageQueue* q, int what, void (*cb)(int)){ Message* m = (Message*)malloc(sizeof(Message)); m->what = what; m->callback = cb; m->next = NULL; pthread_mutex_lock(&q->mutex); if(q->tail == NULL){ q->head = q->tail = m; }else{ q->tail->next = m; q->tail = m; } pthread_cond_signal(&q->cond); pthread_mutex_unlock(&q->mutex); } Message* mq_next(MessageQueue* q){ pthread_mutex_lock(&q->mutex); while(!q->quit && q->head == NULL){ pthread_cond_wait(&q->cond, &q->mutex); } if(q->quit){ pthread_mutex_unlock(&q->mutex); return NULL; } Message* m = q->head; q->head = m->next; if(q->head == NULL) q->tail = NULL; pthread_mutex_unlock(&q->mutex); return m; } void mq_quit(MessageQueue* q){ pthread_mutex_lock(&q->mutex); q->quit = 1; pthread_cond_broadcast(&q->cond); pthread_mutex_unlock(&q->mutex); } void mq_loop(MessageQueue* q){ while(1){ Message* m = mq_next(q); if(m == NULL) break; if(m->callback) m->callback(m->what); free(m); } } void mq_destroy(MessageQueue* q){ pthread_mutex_lock(&q->mutex); Message* cur = q->head; while(cur){ Message* next = cur->next; free(cur); cur = next; } q->head = q->tail = NULL; pthread_mutex_unlock(&q->mutex); pthread_mutex_destroy(&q->mutex); pthread_cond_destroy(&q->cond); } void on_msg(int what){ printf("handle msg what=%d (thread=%lu)\n", what, (unsigned long)pthread_self()); } typedef struct { MessageQueue* q; } ProducerArg; void* producer(void* arg){ ProducerArg* pa = (ProducerArg*)arg; for(int i=1;i<=5;i++){ mq_post(pa->q, i, on_msg); usleep(200 * 1000); } mq_quit(pa->q); return NULL; } int main(){ MessageQueue q; mq_init(&q); pthread_t t; ProducerArg pa = { .q = &q }; pthread_create(&t, NULL, producer, &pa); mq_loop(&q); pthread_join(t, NULL); mq_destroy(&q); return 0; }

8)这跟 Android MessageQueue 的对应关系

  • mq_postenqueueMessage
  • mq_nextnext()
  • mq_loopLooper.loop()
  • cond_wait≈ “没有消息就阻塞等待”
  • quitLooper.quit()
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/30 16:11:00

PyTorch安装后如何加载Qwen3-32B?常见问题汇总

PyTorch安装后如何加载Qwen3-32B&#xff1f;常见问题汇总 在当前大模型快速落地的背景下&#xff0c;越来越多开发者尝试将高性能语言模型部署到实际业务系统中。尤其是像 Qwen3-32B 这样具备 320亿参数、支持128K上下文长度的开源强模型&#xff0c;正成为构建智能问答、代码…

作者头像 李华
网站建设 2026/3/29 8:29:21

Qwen3 + NPU 仿真实战 二. MAC 单元设计

Qwen3 NPU 仿真实战 第二节&#xff1a;单个 MAC 单元设计&#xff08;1616 阵列&#xff0c;支持 INT8/BF16&#xff09;1. MAC 单元在 LLM 推理中的作用 Qwen3 推理的主要计算量来自矩阵乘法&#xff08;MatMul/Linear&#xff09;&#xff0c;涵盖 QKV 投影、Attention Sco…

作者头像 李华
网站建设 2026/4/6 12:50:32

vivo Celeborn PB级Shuffle优化处理实践

一、背景近年来&#xff0c;随着vivo大数据平台的数据量和任务量持续快速增长&#xff0c;新增的计算机资源已无法满足不断扩大的存储和计算需求。同时&#xff0c;我们观察到互联网和算法等在线业务在白天流量高峰&#xff0c;而在夜间流量显著下降&#xff0c;导致部分服务器…

作者头像 李华
网站建设 2026/4/8 23:34:34

33、拼写检查:从Unix原型到awk实现

拼写检查:从Unix原型到awk实现 1. 拼写检查概述 拼写检查是一个有趣且具有挑战性的问题,有超过300篇研究论文和书籍都围绕它展开。在处理文本时,拼写检查能帮助我们发现并纠正错误,提高文本质量。下面我们将从不同角度探讨拼写检查的实现方式。 2. 原始Unix拼写检查原型…

作者头像 李华
网站建设 2026/3/15 2:00:46

38、Shell 可移植性问题与扩展

Shell 可移植性问题与扩展 1. 概述 POSIX 定义的 shell 语言比原始的 V7 Bourne shell 大得多,但比 ksh93 和 bash 这两种最常用的 Bourne shell 扩展版本所实现的语言小得多。如果你要进行利用 shell 语言扩展的重型脚本编写,很可能会使用这两种 shell 中的一种或两种。因…

作者头像 李华