1.14 生产者消费者模式实战:用Go实现高并发消息队列系统
引言
生产者消费者模式是并发编程中的经典模式,在Go语言中通过channel可以优雅地实现。本文将手把手教你实现一个高并发的消息队列系统,涵盖生产者、消费者、任务分发等核心功能。
一、基础生产者消费者
1.1 简单实现
packagemainimport("fmt""time")funcproducer(chchan<-int){fori:=0;i<10;i++{fmt.Printf("生产: %d\n",i)ch<-i time.Sleep(100*time.Millisecond)}close(ch)}funcconsumer(ch<-chanint,idint){forvalue:=rangech{fmt.Printf("消费者%d消费: %d\n",id,value)time.Sleep(200*time.Millisecond)}}funcmain(){ch:=make(chanint,5)goproducer(ch)// 多个消费者fori:=1;i<=3;i++{goconsumer(ch,i)}time.Sleep(5*time.Second)}二、任务队列系统
2.1 任务定义
packagemainimport("fmt""time")typeTaskstruct{IDintDatastringResultchanstring}typeTaskQueuestruct{taskschanTask workersint}funcNewTaskQueue(workersint,queueSizeint)*TaskQueue{return&TaskQueue{tasks:make(chanTask,queueSize),workers:workers,}}func(tq*TaskQueue)Start(){fori:=0;i<tq.workers;i++{gotq.worker(i)}}func(tq*TaskQueue)worker(idint){fortask:=rangetq.tasks{fmt.Printf("Worker %d 处理任务 %d: %s\n",id,task.ID,task.Data)// 模拟处理时间time.Sleep(500*time.Millisecond)// 返回结果task.Result<-fmt.Sprintf("任务 %d 完成",task.ID)}}func(tq*TaskQueue)Submit(task Task){tq.tasks<-task}func(tq*TaskQueue)Close(){close(tq.tasks)}funcmain(){queue:=NewTaskQueue(3,10)queue.Start()// 提交任务fori:=1;i<=10;i++{resultCh:=make(chanstring,1)task:=Task{ID:i,Data:fmt.Sprintf("数据%d",i),Result:resultCh,}queue.Submit(task)// 异步获取结果gofunc(idint,ch<-chanstring){result:=<-ch fmt.Printf("任务 %d 结果: %s\n",id