a_test.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package utils
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/apache/rocketmq-client-go/v2"
  7. "github.com/apache/rocketmq-client-go/v2/primitive"
  8. "github.com/apache/rocketmq-client-go/v2/producer"
  9. )
  10. // RocketMQProducer RocketMQ生产者结构体
  11. type RocketMQProducer struct {
  12. producer rocketmq.Producer
  13. options []producer.Option
  14. }
  15. // NewRocketMQProducer 创建生产者
  16. func NewRocketMQProducer(nameServer []string, groupName string) (*RocketMQProducer, error) {
  17. // 生产者配置选项
  18. opts := []producer.Option{
  19. producer.WithNameServer(nameServer),
  20. producer.WithRetry(2), // 重试次数
  21. producer.WithGroupName(groupName),
  22. }
  23. // 创建生产者实例
  24. p, err := rocketmq.NewProducer(opts...)
  25. if err != nil {
  26. return nil, fmt.Errorf("创建生产者失败: %v", err)
  27. }
  28. return &RocketMQProducer{
  29. producer: p,
  30. options: opts,
  31. }, nil
  32. }
  33. // Start 启动生产者
  34. func (p *RocketMQProducer) Start() error {
  35. if err := p.producer.Start(); err != nil {
  36. return fmt.Errorf("启动生产者失败: %v", err)
  37. }
  38. fmt.Println("RocketMQ生产者启动成功")
  39. return nil
  40. }
  41. // Shutdown 关闭生产者
  42. func (p *RocketMQProducer) Shutdown() error {
  43. if err := p.producer.Shutdown(); err != nil {
  44. return fmt.Errorf("关闭生产者失败: %v", err)
  45. }
  46. fmt.Println("RocketMQ生产者已关闭")
  47. return nil
  48. }
  49. // SendSyncMessage 同步发送消息
  50. func (p *RocketMQProducer) SendSyncMessage(topic, body string, tags ...string) (*primitive.SendResult, error) {
  51. tag := "default"
  52. if len(tags) > 0 {
  53. tag = tags[0]
  54. }
  55. msg := &primitive.Message{
  56. Topic: topic,
  57. Body: []byte(body),
  58. }
  59. msg.WithTag(tag)
  60. // 设置消息属性(可选)
  61. msg.WithProperties(map[string]string{
  62. "source": "go-producer",
  63. "time": time.Now().Format("2006-01-02 15:04:05"),
  64. })
  65. // 同步发送消息
  66. res, err := p.producer.SendSync(context.Background(), msg)
  67. if err != nil {
  68. return nil, fmt.Errorf("发送消息失败: %v", err)
  69. }
  70. return res, nil
  71. }
  72. // SendAsyncMessage 异步发送消息
  73. func (p *RocketMQProducer) SendAsyncMessage(topic, body string, callback func(*primitive.SendResult, error), tags ...string) error {
  74. tag := "default"
  75. if len(tags) > 0 {
  76. tag = tags[0]
  77. }
  78. msg := &primitive.Message{
  79. Topic: topic,
  80. Body: []byte(body),
  81. }
  82. msg.WithTag(tag)
  83. // 异步发送消息
  84. err := p.producer.SendAsync(context.Background(), func(ctx context.Context, result *primitive.SendResult, err error) {
  85. if callback != nil {
  86. callback(result, err)
  87. }
  88. }, msg)
  89. if err != nil {
  90. return fmt.Errorf("异步发送消息失败: %v", err)
  91. }
  92. return nil
  93. }
  94. // SendDelayMessage 发送延迟消息
  95. func (p *RocketMQProducer) SendDelayMessage(topic, body string, delayLevel int, tags ...string) (*primitive.SendResult, error) {
  96. tag := "default"
  97. if len(tags) > 0 {
  98. tag = tags[0]
  99. }
  100. msg := &primitive.Message{
  101. Topic: topic,
  102. Body: []byte(body),
  103. }
  104. msg.WithTag(tag)
  105. // 设置延迟级别(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
  106. msg.WithDelayTimeLevel(delayLevel)
  107. res, err := p.producer.SendSync(context.Background(), msg)
  108. if err != nil {
  109. return nil, fmt.Errorf("发送延迟消息失败: %v", err)
  110. }
  111. return res, nil
  112. }
  113. // SendOrderMessage 发送顺序消息
  114. func (p *RocketMQProducer) SendOrderMessage(topic, body, shardingKey string, tags ...string) (*primitive.SendResult, error) {
  115. tag := "default"
  116. if len(tags) > 0 {
  117. tag = tags[0]
  118. }
  119. msg := &primitive.Message{
  120. Topic: topic,
  121. Body: []byte(body),
  122. }
  123. msg.WithTag(tag)
  124. msg.WithShardingKey(shardingKey)
  125. // 添加顺序相关属性
  126. properties := msg.GetProperties()
  127. if properties == nil {
  128. properties = make(map[string]string)
  129. }
  130. properties["ORDER_KEY"] = shardingKey
  131. msg.WithProperties(properties)
  132. // 使用队列选择器保证消息顺序
  133. // res, err := p.producer.SendSync(context.Background(), msg,
  134. // producer.WithQueueSelector(p.selector))
  135. res, err := p.producer.SendSync(context.Background(), msg)
  136. if err != nil {
  137. return nil, fmt.Errorf("发送顺序消息失败: %v", err)
  138. }
  139. return res, nil
  140. }