c_test.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package utils
  2. import (
  3. "fmt"
  4. "log"
  5. "testing"
  6. "time"
  7. "context"
  8. "github.com/apache/rocketmq-client-go/v2/primitive"
  9. "github.com/apache/rocketmq-client-go/v2/consumer"
  10. )
  11. func bbbb(t *testing.T) {
  12. // 示例1:创建并发消费者
  13. fmt.Println("=== 示例1:并发消费者 ===")
  14. exampleConcurrentConsumer()
  15. // 示例2:创建顺序消费者
  16. // fmt.Println("\n=== 示例2:顺序消费者 ===")
  17. // exampleOrderlyConsumer()
  18. // 示例3:使用消费者管理器
  19. // fmt.Println("\n=== 示例3:消费者管理器 ===")
  20. // exampleConsumerManager()
  21. // 等待中断信号
  22. // waitForInterruptB()
  23. }
  24. // 示例1:并发消费者
  25. func exampleConcurrentConsumer() {
  26. // 直接使用rocketmq-client-go的API创建消费者
  27. c, err := consumer.NewPushConsumer(
  28. // consumer.WithGroupName(fmt.Sprintf("consumer-group-%d", time.Now().Unix())), // 使用唯一组名
  29. consumer.WithGroupName(fmt.Sprintf("consumer-group")), // 使用唯一组名
  30. consumer.WithNameServer([]string{"172.16.1.150:9876"}),
  31. consumer.WithConsumerModel(consumer.Clustering),
  32. consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 从第一条消息开始消费
  33. consumer.WithConsumerOrder(false),
  34. )
  35. if err != nil {
  36. log.Fatalf("创建消费者失败: %v", err)
  37. }
  38. // 创建信号量来控制消费者生命周期
  39. ctx, cancel := context.WithCancel(context.Background())
  40. defer cancel()
  41. // 消息计数器
  42. messageCount := 0
  43. // 订阅主题
  44. err = c.Subscribe("TestTopic", consumer.MessageSelector{
  45. Type: consumer.TAG,
  46. Expression: "*", // 接收所有标签的消息
  47. }, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
  48. for _, msg := range msgs {
  49. messageCount++
  50. fmt.Printf("[%s] 收到消息[%d]: Topic=%s, Tag=%s, Body=%s\n",
  51. time.Now().Format("15:04:05"), messageCount,
  52. msg.Topic, msg.GetTags(), string(msg.Body))
  53. }
  54. return consumer.ConsumeSuccess, nil
  55. })
  56. if err != nil {
  57. log.Fatalf("订阅失败: %v", err)
  58. }
  59. // 启动消费者
  60. err = c.Start()
  61. if err != nil {
  62. log.Fatalf("启动失败: %v", err)
  63. }
  64. defer c.Shutdown()
  65. fmt.Println("消费者已启动...")
  66. // 等待上下文取消或超时
  67. select {
  68. case <-ctx.Done():
  69. fmt.Println("上下文取消,退出消费者")
  70. case <-time.After(120 * time.Second): // 等待2分钟
  71. fmt.Printf("超时退出,共收到 %d 条消息\n", messageCount)
  72. }
  73. }