Go语言消息队列最佳实践:性能优化与生产部署
1. 生产者优化
type OptimizedProducer struct { producer *KafkaProducer batchSize int lingerMs int bufferSize int } func NewOptimizedProducer(brokers []string, topic string) (*OptimizedProducer, error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Compression = sarama.CompressionSnappy config.Producer.Flush.Messages = 100 config.Producer.Flush.Frequency = 100 * time.Millisecond config.Producer.Return.Successes = true config.Producer.Return.Errors = true config.Net.WriteTimeout = 10 * time.Second config.Net.ReadTimeout = 10 * time.Second producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { return nil, err } return &OptimizedProducer{ producer: producer, }, nil } func (p *OptimizedProducer) SendAsync(msg *ProducerMessage) { p.producer.Input() <- msg }2. 消费者优化
type OptimizedConsumer struct { consumer *KafkaConsumer prefetch int maxWait time.Duration } func NewOptimizedConsumer(brokers []string, groupID, topic string) (*OptimizedConsumer, error) { config := sarama.NewConfig() config.Consumer.Fetch.Min = 1 config.Consumer.Fetch.Max = 10 * 1024 * 1024 config.Consumer.MaxWaitTime = 500 * time.Millisecond config.Consumer.MaxProcessingTime = 5 * time.Second config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { return nil, err } return &OptimizedConsumer{ consumer: consumer, }, nil }3. 连接池管理
type ProducerPool struct { producers []*KafkaProducer index int mu sync.Mutex } func NewProducerPool(brokers []string, size int) (*ProducerPool, error) { pool := &ProducerPool{ producers: make([]*KafkaProducer, size), } for i := 0; i < size; i++ { producer, err := NewProducer(brokers) if err != nil { for j := 0; j < i; j++ { pool.producers[j].Close() } return nil, err } pool.producers[i] = producer } return pool, nil } func (p *ProducerPool) Get() *KafkaProducer { p.mu.Lock() defer p.mu.Unlock() producer := p.producers[p.index] p.index = (p.index + 1) % len(p.producers) return producer } func (p *ProducerPool) Close() { for _, producer := range p.producers { producer.Close() } }4. 总结
本文介绍了消息队列的性能优化技巧,包括生产者批量发送、消费者预取、连接池管理等方法。