package utils import ( "fmt" "log" "os" "os/signal" "syscall" "testing" "time" "github.com/apache/rocketmq-client-go/v2/primitive" ) func aaaaa(t *testing.T) { // 配置RocketMQ NameServer地址 nameServer := []string{"172.16.1.150:9876"} // 替换为实际地址 groupName := "test-producer-group" // 创建生产者 producer, err := NewRocketMQProducer(nameServer, groupName) if err != nil { log.Fatalf("创建生产者失败: %v", err) } // 启动生产者 if err := producer.Start(); err != nil { log.Fatalf("启动生产者失败: %v", err) } defer producer.Shutdown() // 示例1:同步发送消息 fmt.Println("=== 同步发送消息 ===") result, err := producer.SendSyncMessage("TestTopic", "Hello RocketMQ!", "TestTag") if err != nil { log.Printf("同步发送失败: %v", err) } else { fmt.Printf("同步发送成功: %+v\n", result) } // 示例2:异步发送消息 fmt.Println("\n=== 异步发送消息 ===") err = producer.SendAsyncMessage("TestTopic", "Hello Async RocketMQ!", func(result *primitive.SendResult, err error) { if err != nil { fmt.Printf("异步发送失败: %v\n", err) } else { fmt.Printf("异步发送成功: %+v\n", result) } }, "AsyncTag") if err != nil { log.Printf("启动异步发送失败: %v", err) } // 示例3:发送延迟消息(延迟10秒) fmt.Println("\n=== 发送延迟消息 ===") result, err = producer.SendDelayMessage("DelayTopic", "This is a delayed message", 3, "DelayTag") if err != nil { log.Printf("发送延迟消息失败: %v", err) } else { fmt.Printf("延迟消息发送成功: %+v\n", result) } // 示例4:发送顺序消息 fmt.Println("\n=== 发送顺序消息 ===") for i := 1; i <= 5; i++ { body := fmt.Sprintf("Order message %d", i) result, err = producer.SendOrderMessage("OrderTopic", body, "order-key-1", "OrderTag") if err != nil { log.Printf("发送顺序消息失败: %v", err) } else { fmt.Printf("顺序消息发送成功 [%d]: %+v\n", i, result) } time.Sleep(100 * time.Millisecond) } // 等待中断信号 waitForInterruptA() } // waitForInterrupt 等待中断信号 func waitForInterruptA() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan fmt.Println("\n接收到中断信号,程序退出...") }