Go语言消息队列设计模式:发布订阅与工作队列

发布时间:2026/7/2 12:20:25

Go语言消息队列设计模式:发布订阅与工作队列 Go语言消息队列设计模式发布订阅与工作队列1. 发布订阅模式发布订阅是一种常见的消息模式发布者发送消息到主题所有订阅该主题的消费者都能收到消息。type PubSubManager struct { mu sync.RWMutex topics map[string]*Topic client *redis.Client } type Topic struct { name string subscribers map[string]chan []byte mu sync.RWMutex } func NewPubSubManager(client *redis.Client) *PubSubManager { return PubSubManager{ topics: make(map[string]*Topic), client: client, } } func (m *PubSubManager) Subscribe(topicName string) (-chan []byte, func()) { m.mu.Lock() defer m.mu.Unlock() topic, exists : m.topics[topicName] if !exists { topic Topic{ name: topicName, subscribers: make(map[string]chan []byte), } m.topics[topicName] topic } id : fmt.Sprintf(subscriber-%d, time.Now().UnixNano()) ch : make(chan []byte, 100) topic.mu.Lock() topic.subscribers[id] ch topic.mu.Unlock() cancel : func() { topic.mu.Lock() delete(topic.subscribers, id) close(ch) topic.mu.Unlock() } return ch, cancel } func (m *PubSubManager) Publish(topicName string, message []byte) { m.mu.RLock() topic, exists : m.topics[topicName] m.mu.RUnlock() if !exists { return } topic.mu.RLock() for _, ch : range topic.subscribers { select { case ch - message: default: } } topic.mu.RUnlock() }2. 工作队列模式工作队列用于将耗时的任务分发给多个Worker处理实现负载均衡。type WorkQueue struct { queueName string client *redis.Client mu sync.Mutex } func NewWorkQueue(client *redis.Client, queueName string) *WorkQueue { return WorkQueue{ queueName: queueName, client: client, } } func (wq *WorkQueue) Enqueue(ctx context.Context, task []byte) error { return wq.client.RPush(ctx, wq.queueName, task).Err() } func (wq *WorkQueue) Dequeue(ctx context.Context, timeout time.Duration) ([]byte, error) { result, err : wq.client.BLPop(ctx, timeout, wq.queueName).Result() if err ! nil { return nil, err } return []byte(result[1]), nil } type WorkerPool struct { queues []*WorkQueue workers int handler func([]byte) error } func NewWorkerPool(queues []*WorkQueue, workers int, handler func([]byte) error) *WorkerPool { return WorkerPool{ queues: queues, workers: workers, handler: handler, } } func (wp *WorkerPool) Start(ctx context.Context) { var wg sync.WaitGroup for i : 0; i wp.workers; i { wg.Add(len(wp.queues)) for _, queue : range wp.queues { go func(q *WorkQueue) { defer wg.Done() wp.processQueue(ctx, q) }(q) } } wg.Wait() } func (wp *WorkerPool) processQueue(ctx context.Context, queue *WorkQueue) { for { select { case -ctx.Done(): return default: task, err : queue.Dequeue(ctx, time.Second) if err ! nil { if err redis.Nil { continue } time.Sleep(time.Second) continue } if err : wp.handler(task); err ! nil { queue.Enqueue(ctx, task) } } } }3. 路由模式根据消息的路由键将消息分发到不同的队列。type RoutingPublisher struct { exchange string client *amqp.Channel } func NewRoutingPublisher(client *amqp.Channel, exchange string) *RoutingPublisher { return RoutingPublisher{ exchange: exchange, client: client, } } func (p *RoutingPublisher) Publish(routingKey string, body []byte) error { return p.client.PublishWithContext( context.Background(), p.exchange, routingKey, false, false, amqp.Publishing{ ContentType: application/json, Body: body, }, ) } type RoutingConsumer struct { queue string bindingKeys []string client *amqp.Channel } func NewRoutingConsumer(client *amqp.Channel, queue, exchange string, bindingKeys []string) error { q, err : client.QueueDeclare(queue, true, false, false, false, nil) if err ! nil { return err } for _, key : range bindingKeys { err client.QueueBind(q.Name, key, exchange, false, nil) if err ! nil { return err } } return nil }4. 总结本文介绍了三种常用的消息队列设计模式发布订阅、工作队列和路由模式每种模式都有其适用场景。

相关新闻