| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- package utils
- import (
- "context"
- "fmt"
- "time"
- "github.com/apache/rocketmq-client-go/v2"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- "github.com/apache/rocketmq-client-go/v2/producer"
- )
- // RocketMQProducer RocketMQ生产者结构体
- type RocketMQProducer struct {
- producer rocketmq.Producer
- options []producer.Option
- }
- // NewRocketMQProducer 创建生产者
- func NewRocketMQProducer(nameServer []string, groupName string) (*RocketMQProducer, error) {
- // 生产者配置选项
- opts := []producer.Option{
- producer.WithNameServer(nameServer),
- producer.WithRetry(2), // 重试次数
- producer.WithGroupName(groupName),
- }
- // 创建生产者实例
- p, err := rocketmq.NewProducer(opts...)
- if err != nil {
- return nil, fmt.Errorf("创建生产者失败: %v", err)
- }
- return &RocketMQProducer{
- producer: p,
- options: opts,
- }, nil
- }
- // Start 启动生产者
- func (p *RocketMQProducer) Start() error {
- if err := p.producer.Start(); err != nil {
- return fmt.Errorf("启动生产者失败: %v", err)
- }
- fmt.Println("RocketMQ生产者启动成功")
- return nil
- }
- // Shutdown 关闭生产者
- func (p *RocketMQProducer) Shutdown() error {
- if err := p.producer.Shutdown(); err != nil {
- return fmt.Errorf("关闭生产者失败: %v", err)
- }
- fmt.Println("RocketMQ生产者已关闭")
- return nil
- }
- // SendSyncMessage 同步发送消息
- func (p *RocketMQProducer) SendSyncMessage(topic, body string, tags ...string) (*primitive.SendResult, error) {
- tag := "default"
- if len(tags) > 0 {
- tag = tags[0]
- }
- msg := &primitive.Message{
- Topic: topic,
- Body: []byte(body),
- }
- msg.WithTag(tag)
- // 设置消息属性(可选)
- msg.WithProperties(map[string]string{
- "source": "go-producer",
- "time": time.Now().Format("2006-01-02 15:04:05"),
- })
- // 同步发送消息
- res, err := p.producer.SendSync(context.Background(), msg)
- if err != nil {
- return nil, fmt.Errorf("发送消息失败: %v", err)
- }
- return res, nil
- }
- // SendAsyncMessage 异步发送消息
- func (p *RocketMQProducer) SendAsyncMessage(topic, body string, callback func(*primitive.SendResult, error), tags ...string) error {
- tag := "default"
- if len(tags) > 0 {
- tag = tags[0]
- }
- msg := &primitive.Message{
- Topic: topic,
- Body: []byte(body),
- }
- msg.WithTag(tag)
- // 异步发送消息
- err := p.producer.SendAsync(context.Background(), func(ctx context.Context, result *primitive.SendResult, err error) {
- if callback != nil {
- callback(result, err)
- }
- }, msg)
- if err != nil {
- return fmt.Errorf("异步发送消息失败: %v", err)
- }
- return nil
- }
- // SendDelayMessage 发送延迟消息
- func (p *RocketMQProducer) SendDelayMessage(topic, body string, delayLevel int, tags ...string) (*primitive.SendResult, error) {
- tag := "default"
- if len(tags) > 0 {
- tag = tags[0]
- }
- msg := &primitive.Message{
- Topic: topic,
- Body: []byte(body),
- }
- msg.WithTag(tag)
-
- // 设置延迟级别(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
- msg.WithDelayTimeLevel(delayLevel)
- res, err := p.producer.SendSync(context.Background(), msg)
- if err != nil {
- return nil, fmt.Errorf("发送延迟消息失败: %v", err)
- }
- return res, nil
- }
- // SendOrderMessage 发送顺序消息
- func (p *RocketMQProducer) SendOrderMessage(topic, body, shardingKey string, tags ...string) (*primitive.SendResult, error) {
- tag := "default"
- if len(tags) > 0 {
- tag = tags[0]
- }
- msg := &primitive.Message{
- Topic: topic,
- Body: []byte(body),
- }
- msg.WithTag(tag)
- msg.WithShardingKey(shardingKey)
- // 添加顺序相关属性
- properties := msg.GetProperties()
- if properties == nil {
- properties = make(map[string]string)
- }
- properties["ORDER_KEY"] = shardingKey
- msg.WithProperties(properties)
- // 使用队列选择器保证消息顺序
- // res, err := p.producer.SendSync(context.Background(), msg,
- // producer.WithQueueSelector(p.selector))
- res, err := p.producer.SendSync(context.Background(), msg)
-
- if err != nil {
- return nil, fmt.Errorf("发送顺序消息失败: %v", err)
- }
- return res, nil
- }
|