b_test.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package utils
  2. import (
  3. "fmt"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "syscall"
  8. "testing"
  9. "time"
  10. "github.com/apache/rocketmq-client-go/v2/primitive"
  11. )
  12. func aaaaa(t *testing.T) {
  13. // 配置RocketMQ NameServer地址
  14. nameServer := []string{"172.16.1.150:9876"} // 替换为实际地址
  15. groupName := "test-producer-group"
  16. // 创建生产者
  17. producer, err := NewRocketMQProducer(nameServer, groupName)
  18. if err != nil {
  19. log.Fatalf("创建生产者失败: %v", err)
  20. }
  21. // 启动生产者
  22. if err := producer.Start(); err != nil {
  23. log.Fatalf("启动生产者失败: %v", err)
  24. }
  25. defer producer.Shutdown()
  26. // 示例1:同步发送消息
  27. fmt.Println("=== 同步发送消息 ===")
  28. result, err := producer.SendSyncMessage("TestTopic", "Hello RocketMQ!", "TestTag")
  29. if err != nil {
  30. log.Printf("同步发送失败: %v", err)
  31. } else {
  32. fmt.Printf("同步发送成功: %+v\n", result)
  33. }
  34. // 示例2:异步发送消息
  35. fmt.Println("\n=== 异步发送消息 ===")
  36. err = producer.SendAsyncMessage("TestTopic", "Hello Async RocketMQ!", func(result *primitive.SendResult, err error) {
  37. if err != nil {
  38. fmt.Printf("异步发送失败: %v\n", err)
  39. } else {
  40. fmt.Printf("异步发送成功: %+v\n", result)
  41. }
  42. }, "AsyncTag")
  43. if err != nil {
  44. log.Printf("启动异步发送失败: %v", err)
  45. }
  46. // 示例3:发送延迟消息(延迟10秒)
  47. fmt.Println("\n=== 发送延迟消息 ===")
  48. result, err = producer.SendDelayMessage("DelayTopic", "This is a delayed message", 3, "DelayTag")
  49. if err != nil {
  50. log.Printf("发送延迟消息失败: %v", err)
  51. } else {
  52. fmt.Printf("延迟消息发送成功: %+v\n", result)
  53. }
  54. // 示例4:发送顺序消息
  55. fmt.Println("\n=== 发送顺序消息 ===")
  56. for i := 1; i <= 5; i++ {
  57. body := fmt.Sprintf("Order message %d", i)
  58. result, err = producer.SendOrderMessage("OrderTopic", body, "order-key-1", "OrderTag")
  59. if err != nil {
  60. log.Printf("发送顺序消息失败: %v", err)
  61. } else {
  62. fmt.Printf("顺序消息发送成功 [%d]: %+v\n", i, result)
  63. }
  64. time.Sleep(100 * time.Millisecond)
  65. }
  66. // 等待中断信号
  67. waitForInterruptA()
  68. }
  69. // waitForInterrupt 等待中断信号
  70. func waitForInterruptA() {
  71. sigChan := make(chan os.Signal, 1)
  72. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  73. <-sigChan
  74. fmt.Println("\n接收到中断信号,程序退出...")
  75. }