package utils import ( "fmt" "log" "testing" "time" "context" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/consumer" ) func bbbb(t *testing.T) { // 示例1:创建并发消费者 fmt.Println("=== 示例1:并发消费者 ===") exampleConcurrentConsumer() // 示例2:创建顺序消费者 // fmt.Println("\n=== 示例2:顺序消费者 ===") // exampleOrderlyConsumer() // 示例3:使用消费者管理器 // fmt.Println("\n=== 示例3:消费者管理器 ===") // exampleConsumerManager() // 等待中断信号 // waitForInterruptB() } // 示例1:并发消费者 func exampleConcurrentConsumer() { // 直接使用rocketmq-client-go的API创建消费者 c, err := consumer.NewPushConsumer( // consumer.WithGroupName(fmt.Sprintf("consumer-group-%d", time.Now().Unix())), // 使用唯一组名 consumer.WithGroupName(fmt.Sprintf("consumer-group")), // 使用唯一组名 consumer.WithNameServer([]string{"172.16.1.150:9876"}), consumer.WithConsumerModel(consumer.Clustering), consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 从第一条消息开始消费 consumer.WithConsumerOrder(false), ) if err != nil { log.Fatalf("创建消费者失败: %v", err) } // 创建信号量来控制消费者生命周期 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 消息计数器 messageCount := 0 // 订阅主题 err = c.Subscribe("TestTopic", consumer.MessageSelector{ Type: consumer.TAG, Expression: "*", // 接收所有标签的消息 }, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for _, msg := range msgs { messageCount++ fmt.Printf("[%s] 收到消息[%d]: Topic=%s, Tag=%s, Body=%s\n", time.Now().Format("15:04:05"), messageCount, msg.Topic, msg.GetTags(), string(msg.Body)) } return consumer.ConsumeSuccess, nil }) if err != nil { log.Fatalf("订阅失败: %v", err) } // 启动消费者 err = c.Start() if err != nil { log.Fatalf("启动失败: %v", err) } defer c.Shutdown() fmt.Println("消费者已启动...") // 等待上下文取消或超时 select { case <-ctx.Done(): fmt.Println("上下文取消,退出消费者") case <-time.After(120 * time.Second): // 等待2分钟 fmt.Printf("超时退出,共收到 %d 条消息\n", messageCount) } }