package utils import ( "context" "fmt" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) // RocketMQProducer RocketMQ生产者结构体 type RocketMQProducer struct { producer rocketmq.Producer options []producer.Option } // NewRocketMQProducer 创建生产者 func NewRocketMQProducer(nameServer []string, groupName string) (*RocketMQProducer, error) { // 生产者配置选项 opts := []producer.Option{ producer.WithNameServer(nameServer), producer.WithRetry(2), // 重试次数 producer.WithGroupName(groupName), } // 创建生产者实例 p, err := rocketmq.NewProducer(opts...) if err != nil { return nil, fmt.Errorf("创建生产者失败: %v", err) } return &RocketMQProducer{ producer: p, options: opts, }, nil } // Start 启动生产者 func (p *RocketMQProducer) Start() error { if err := p.producer.Start(); err != nil { return fmt.Errorf("启动生产者失败: %v", err) } fmt.Println("RocketMQ生产者启动成功") return nil } // Shutdown 关闭生产者 func (p *RocketMQProducer) Shutdown() error { if err := p.producer.Shutdown(); err != nil { return fmt.Errorf("关闭生产者失败: %v", err) } fmt.Println("RocketMQ生产者已关闭") return nil } // SendSyncMessage 同步发送消息 func (p *RocketMQProducer) SendSyncMessage(topic, body string, tags ...string) (*primitive.SendResult, error) { tag := "default" if len(tags) > 0 { tag = tags[0] } msg := &primitive.Message{ Topic: topic, Body: []byte(body), } msg.WithTag(tag) // 设置消息属性(可选) msg.WithProperties(map[string]string{ "source": "go-producer", "time": time.Now().Format("2006-01-02 15:04:05"), }) // 同步发送消息 res, err := p.producer.SendSync(context.Background(), msg) if err != nil { return nil, fmt.Errorf("发送消息失败: %v", err) } return res, nil } // SendAsyncMessage 异步发送消息 func (p *RocketMQProducer) SendAsyncMessage(topic, body string, callback func(*primitive.SendResult, error), tags ...string) error { tag := "default" if len(tags) > 0 { tag = tags[0] } msg := &primitive.Message{ Topic: topic, Body: []byte(body), } msg.WithTag(tag) // 异步发送消息 err := p.producer.SendAsync(context.Background(), func(ctx context.Context, result *primitive.SendResult, err error) { if callback != nil { callback(result, err) } }, msg) if err != nil { return fmt.Errorf("异步发送消息失败: %v", err) } return nil } // SendDelayMessage 发送延迟消息 func (p *RocketMQProducer) SendDelayMessage(topic, body string, delayLevel int, tags ...string) (*primitive.SendResult, error) { tag := "default" if len(tags) > 0 { tag = tags[0] } msg := &primitive.Message{ Topic: topic, Body: []byte(body), } msg.WithTag(tag) // 设置延迟级别(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h) msg.WithDelayTimeLevel(delayLevel) res, err := p.producer.SendSync(context.Background(), msg) if err != nil { return nil, fmt.Errorf("发送延迟消息失败: %v", err) } return res, nil } // SendOrderMessage 发送顺序消息 func (p *RocketMQProducer) SendOrderMessage(topic, body, shardingKey string, tags ...string) (*primitive.SendResult, error) { tag := "default" if len(tags) > 0 { tag = tags[0] } msg := &primitive.Message{ Topic: topic, Body: []byte(body), } msg.WithTag(tag) msg.WithShardingKey(shardingKey) // 添加顺序相关属性 properties := msg.GetProperties() if properties == nil { properties = make(map[string]string) } properties["ORDER_KEY"] = shardingKey msg.WithProperties(properties) // 使用队列选择器保证消息顺序 // res, err := p.producer.SendSync(context.Background(), msg, // producer.WithQueueSelector(p.selector)) res, err := p.producer.SendSync(context.Background(), msg) if err != nil { return nil, fmt.Errorf("发送顺序消息失败: %v", err) } return res, nil }