| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- package utils
- import (
- "fmt"
- "log"
- "testing"
- "time"
- "context"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- "github.com/apache/rocketmq-client-go/v2/consumer"
- )
- func bbbb(t *testing.T) {
- // 示例1:创建并发消费者
- fmt.Println("=== 示例1:并发消费者 ===")
- exampleConcurrentConsumer()
-
- // 示例2:创建顺序消费者
- // fmt.Println("\n=== 示例2:顺序消费者 ===")
- // exampleOrderlyConsumer()
-
- // 示例3:使用消费者管理器
- // fmt.Println("\n=== 示例3:消费者管理器 ===")
- // exampleConsumerManager()
-
- // 等待中断信号
- // waitForInterruptB()
- }
- // 示例1:并发消费者
- func exampleConcurrentConsumer() {
- // 直接使用rocketmq-client-go的API创建消费者
- c, err := consumer.NewPushConsumer(
- // consumer.WithGroupName(fmt.Sprintf("consumer-group-%d", time.Now().Unix())), // 使用唯一组名
- consumer.WithGroupName(fmt.Sprintf("consumer-group")), // 使用唯一组名
- consumer.WithNameServer([]string{"172.16.1.150:9876"}),
- consumer.WithConsumerModel(consumer.Clustering),
- consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 从第一条消息开始消费
- consumer.WithConsumerOrder(false),
- )
- if err != nil {
- log.Fatalf("创建消费者失败: %v", err)
- }
-
- // 创建信号量来控制消费者生命周期
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // 消息计数器
- messageCount := 0
-
- // 订阅主题
- err = c.Subscribe("TestTopic", consumer.MessageSelector{
- Type: consumer.TAG,
- Expression: "*", // 接收所有标签的消息
- }, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
- for _, msg := range msgs {
- messageCount++
- fmt.Printf("[%s] 收到消息[%d]: Topic=%s, Tag=%s, Body=%s\n",
- time.Now().Format("15:04:05"), messageCount,
- msg.Topic, msg.GetTags(), string(msg.Body))
- }
- return consumer.ConsumeSuccess, nil
- })
- if err != nil {
- log.Fatalf("订阅失败: %v", err)
- }
-
- // 启动消费者
- err = c.Start()
- if err != nil {
- log.Fatalf("启动失败: %v", err)
- }
-
- defer c.Shutdown()
- fmt.Println("消费者已启动...")
-
- // 等待上下文取消或超时
- select {
- case <-ctx.Done():
- fmt.Println("上下文取消,退出消费者")
- case <-time.After(120 * time.Second): // 等待2分钟
- fmt.Printf("超时退出,共收到 %d 条消息\n", messageCount)
- }
- }
|