广告位联系
返回顶部
分享到

使用Go快速验证RocketMQ是否正常的完整过程

python 来源:互联网 作者:佚名 发布时间:2026-06-28 19:00:13 人浏览
摘要

最近需要验证一套 RocketMQ 环境是否可用。最开始的诉求很简单:我有一个 RocketMQ NameServer 地址,端口是默认端口9876,想确认它到底是不是正常。 很多时候我们会先用 telnet、nc 或者简单的 TC

最近需要验证一套 RocketMQ 环境是否可用。最开始的诉求很简单:我有一个 RocketMQ NameServer 地址,端口是默认端口 9876,想确认它到底是不是正常。

很多时候我们会先用 telnet、nc 或者简单的 TCP 脚本测一下端口。这个动作当然有价值,但它只能说明:

当前机器到 RocketMQ NameServer 的网络是通的。

它不能证明 broker 正常、topic 可用,也不能证明消息真的可以成功生产和消费。

所以这次我做了一个小工具,分三层验证 RocketMQ:

  1. TCP 连通性测试
  2. 发送一条消息
  3. 发送并消费同一条消息,完成生产消费闭环

下面记录一下完整过程。

环境信息

本文中的内网地址做了脱敏处理,实际使用时替换成自己的 RocketMQ 地址即可。

1

2

3

NameServer: 10.x.x.x:9876

Go: go1.25.0 linux/amd64

RocketMQ Go Client: github.com/apache/rocketmq-client-go/v2

测试机器上 Go 环境正常:

1

go version

输出类似:

1

go version go1.25.0 linux/amd64

为什么没有继续用 Python

一开始我尝试过 Python 版本,先做 TCP 测试,再用 rocketmq-client-python 做生产消费测试。

TCP 测试是正常的:

1

[OK] TCP connected to 10.x.x.x:9876

但是 Python 客户端遇到了两个问题。

在 Linux 上:
rocketmq-client-python 依赖 native 动态库,如果机器上缺少 librocketmq.so,会报:

1

rocketmq dynamic library not found

Windows 上则更直接:

1

rocketmq-python does not support Windows

所以最后我换成了 Go。Go 版本更适合这种轻量健康检查脚本,不需要额外处理 Python native 动态库的问题。

项目结构

新建一个目录:

1

2

mkdir rocketmq-go-healthcheck

cd rocketmq-go-healthcheck

目录结构如下:

1

2

3

4

rocketmq-go-healthcheck/

├── go.mod

├── go.sum

└── main.go

go.mod

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

module rocketmq-go-healthcheck

 

go 1.20

 

require github.com/apache/rocketmq-client-go/v2 v2.1.2

 

require (

    github.com/emirpasic/gods v1.12.0 // indirect

    github.com/golang/mock v1.3.1 // indirect

    github.com/google/uuid v1.3.0 // indirect

    github.com/json-iterator/go v1.1.12 // indirect

    github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect

    github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect

    github.com/modern-go/reflect2 v1.0.2 // indirect

    github.com/patrickmn/go-cache v2.1.0+incompatible // indirect

    github.com/pkg/errors v0.8.1 // indirect

    github.com/sirupsen/logrus v1.4.0 // indirect

    github.com/tidwall/gjson v1.13.0 // indirect

    github.com/tidwall/match v1.1.1 // indirect

    github.com/tidwall/pretty v1.2.0 // indirect

    go.uber.org/atomic v1.5.1 // indirect

    golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect

    golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect

    golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect

    golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect

    gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect

    stathat.com/c/consistent v1.0.0 // indirect

)

然后执行:

1

go mod tidy # 只需要go.mod 和 main.go 然后执行这个会自动生成go.sum的

核心代码

下面是完整的 main.go。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

package main

 

import (

    "context"

    "flag"

    "fmt"

    "net"

    "os"

    "strings"

    "sync"

    "time"

 

    rocketmq "github.com/apache/rocketmq-client-go/v2"

    "github.com/apache/rocketmq-client-go/v2/consumer"

    "github.com/apache/rocketmq-client-go/v2/primitive"

    "github.com/apache/rocketmq-client-go/v2/producer"

)

 

func main() {

    var (

        // NameServer 地址,默认端口是 9876。

        namesrv = flag.String("namesrv", "10.x.x.x:9876", "RocketMQ NameServer address")

 

        // 用于测试的 topic,需要替换成实际环境中存在的 topic。

        topic = flag.String("topic", "TopicTest", "topic used for send/roundtrip")

 

        // 测试消息使用的 tag,消费者也会按这个 tag 订阅。

        tag = flag.String("tag", "healthcheck", "message tag")

 

        // 支持三种模式:tcp 只测端口,send 只发消息,roundtrip 发送并消费。

        mode = flag.String("mode", "roundtrip", "tcp, send, or roundtrip")

 

        producerGroup = flag.String("producer-group", "PID_HEALTHCHECK_GO", "producer group")

 

        // consumer group 默认带时间戳,避免多次测试时和已有消费者互相影响。

        consumerGroup = flag.String("consumer-group", fmt.Sprintf("CID_HEALTHCHECK_GO_%d", time.Now().UnixNano()), "consumer group")

 

        tcpTimeout  = flag.Duration("tcp-timeout", 3*time.Second, "tcp connection timeout")

        waitTimeout = flag.Duration("wait", 20*time.Second, "roundtrip consume wait timeout")

 

        // consumer 启动后稍等一下再发送消息,降低订阅尚未完成导致误判的概率。

        warmup = flag.Duration("warmup", 2*time.Second, "consumer warmup time before send")

    )

    flag.Parse()

 

    fmt.Printf("[INFO] NameServer: %s\n", *namesrv)

    fmt.Printf("[INFO] Mode: %s\n", *mode)

 

    // 第一层检查:先测 TCP。

    // 如果这里不通,优先排查网络、防火墙、安全组、端口等问题。

    if err := checkTCP(*namesrv, *tcpTimeout); err != nil {

        fmt.Printf("[FAIL] TCP connection failed: %v\n", err)

        os.Exit(2)

    }

    fmt.Println("[OK] TCP connected")

 

    switch *mode {

    case "tcp":

        return

    case "send":

        // 第二层检查:只发送一条消息,用于验证 producer 到 broker 的链路。

        if err := sendMessage(*namesrv, *topic, *tag, *producerGroup, marker()); err != nil {

            fmt.Printf("[FAIL] Send failed: %v\n", err)

            os.Exit(3)

        }

    case "roundtrip":

        // 第三层检查:发送并消费同一条消息,更接近真实可用性验证。

        if err := roundtrip(*namesrv, *topic, *tag, *producerGroup, *consumerGroup, *waitTimeout, *warmup); err != nil {

            fmt.Printf("[FAIL] Roundtrip failed: %v\n", err)

            os.Exit(4)

        }

    default:

        fmt.Printf("[FAIL] Unknown mode %q, use tcp, send, or roundtrip\n", *mode)

        os.Exit(1)

    }

}

 

func checkTCP(address string, timeout time.Duration) error {

    if !strings.Contains(address, ":") {

        address += ":9876"

    }

 

    start := time.Now()

    conn, err := net.DialTimeout("tcp", address, timeout)

    if err != nil {

        return err

    }

    _ = conn.Close()

 

    fmt.Printf("[OK] TCP connected to %s in %d ms\n", address, time.Since(start).Milliseconds())

    return nil

}

 

func sendMessage(namesrv, topic, tag, group, key string) error {

    p, err := rocketmq.NewProducer(

        producer.WithNameServer([]string{namesrv}),

        producer.WithGroupName(group),

    )

    if err != nil {

        return err

    }

    if err := p.Start(); err != nil {

        return err

    }

    defer p.Shutdown()

 

    // 每条测试消息都带唯一 key,方便 roundtrip 模式精确识别本次发送的消息。

    body := fmt.Sprintf("rocketmq-go-healthcheck %s", key)

    msg := primitive.NewMessage(topic, []byte(body))

    msg.WithTag(tag)

    msg.WithKeys([]string{key})

 

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    defer cancel()

 

    result, err := p.SendSync(ctx, msg)

    if err != nil {

        return err

    }

 

    fmt.Printf("[OK] Sent message: status=%v msg_id=%s queue=%s offset=%d\n",

        result.Status, result.MsgID, result.MessageQueue.String(), result.QueueOffset)

    return nil

}

 

func roundtrip(namesrv, topic, tag, producerGroup, consumerGroup string, waitTimeout, warmup time.Duration) error {

    // 本次健康检查的唯一标识,避免误消费 topic 中已有的历史消息。

    key := marker()

    found := make(chan *primitive.MessageExt, 1)

    var once sync.Once

 

    c, err := rocketmq.NewPushConsumer(

        consumer.WithNameServer([]string{namesrv}),

        consumer.WithGroupName(consumerGroup),

        consumer.WithConsumerModel(consumer.Clustering),

    )

    if err != nil {

        return err

    }

 

    // 只订阅指定 tag 的消息,减少无关消息干扰。

    err = c.Subscribe(topic, consumer.MessageSelector{

        Type:       consumer.TAG,

        Expression: tag,

    }, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

        for _, msg := range msgs {

            body := string(msg.Body)

            if body == "" {

                continue

            }

 

            // 只认本次发送的测试消息。

            if containsKey(msg, key) || strings.Contains(body, key) {

                once.Do(func() {

                    found <- msg

                })

            }

        }

        return consumer.ConsumeSuccess, nil

    })

    if err != nil {

        return err

    }

 

    if err := c.Start(); err != nil {

        return err

    }

    defer c.Shutdown()

 

    fmt.Printf("[INFO] Consumer started: group=%s topic=%s tag=%s\n", consumerGroup, topic, tag)

    time.Sleep(warmup)

 

    // consumer 启动并完成短暂预热后,再发送测试消息。

    if err := sendMessage(namesrv, topic, tag, producerGroup, key); err != nil {

        return err

    }

 

    select {

    case msg := <-found:

        fmt.Printf("[OK] Consumed healthcheck message: msg_id=%s queue=%s offset=%d body=%s\n",

            msg.MsgId, msg.Queue.String(), msg.QueueOffset, string(msg.Body))

        return nil

    case <-time.After(waitTimeout):

        return fmt.Errorf("message was sent but not consumed within %s", waitTimeout)

    }

}

 

func marker() string {

    return fmt.Sprintf("healthcheck-%d", time.Now().UnixNano())

}

 

func containsKey(msg *primitive.MessageExt, key string) bool {

    keys := msg.GetKeys()

    return keys == key || strings.Contains(keys, key)

}

支持的测试模式

这个工具支持三种模式。

只测试 TCP 连通性

1

go run . -mode tcp -namesrv 10.x.x.x:9876

如果成功,会看到类似输出:

1

2

3

4

[INFO] NameServer: 10.x.x.x:9876

[INFO] Mode: tcp

[OK] TCP connected to 10.x.x.x:9876 in 40 ms

[OK] TCP connected

这个结果只能说明网络和端口没问题,不能说明 RocketMQ 生产消费一定正常。

只发送一条消息

1

go run . -mode send -namesrv 10.x.x.x:9876 -topic TopicTest

成功输出类似:

1

[OK] Sent message: status=SendOK msg_id=xxx queue=xxx offset=123

如果这一步失败,常见原因包括:

  1. topic 不存在
  2. broker 没有正常注册到 NameServer
  3. 客户端到 broker 网络不通
  4. ACL 权限不足

发送并消费闭环测试

1

go run . -namesrv 10.x.x.x:9876 -topic TopicTest

默认模式就是 roundtrip,等价于:

1

go run . -mode roundtrip -namesrv 10.x.x.x:9876 -topic TopicTest

成功输出类似:

1

2

3

[INFO] Consumer started: group=CID_HEALTHCHECK_GO_xxx topic=TopicTest tag=healthcheck

[OK] Sent message: status=SendOK msg_id=xxx queue=xxx offset=123

[OK] Consumed healthcheck message: msg_id=xxx queue=xxx offset=123 body=rocketmq-go-healthcheck healthcheck-xxx

看到这类输出,基本就可以说明:

  1. NameServer 可连接
  2. broker 路由可获取
  3. topic 可用
  4. producer 可以发送消息
  5. consumer 可以订阅并消费消息

这比单纯测端口更有说服力。

几个实现细节

为什么先做 TCP 检查

代码一开始先调用 checkTCP。

这样做的好处是可以快速区分问题类型:

1

2

3

TCP 不通:优先查网络、防火墙、安全组、端口

TCP 通但发送失败:优先查 RocketMQ 路由、topic、broker、ACL

发送成功但消费失败:优先查 consumer group、tag、订阅、broker 到客户端网络

排障时分层非常重要,不然很容易一上来就陷入客户端报错细节里。

为什么 roundtrip 里先启动 consumer

代码里是先启动 consumer,再发送消息:

1

2

3

4

5

6

7

8

9

if err := c.Start(); err != nil {

    return err

}

 

time.Sleep(warmup)

 

if err := sendMessage(namesrv, topic, tag, producerGroup, key); err != nil {

    return err

}

这是为了避免消费者还没完成订阅初始化,消息就已经发出去了。虽然 RocketMQ 本身是消息队列,但对于这种一次性的健康检查脚本来说,先启动 consumer,等一小会儿,再发送消息,结果更稳定。

为什么每次消息都带唯一 marker

每次运行会生成一个类似这样的 key:

1

healthcheck-171xxxxxxxxxxxxx

消费回调里只接受包含这个 marker 的消息:

1

2

3

if containsKey(msg, key) || strings.Contains(body, key) {

    found <- msg

}

这样可以避免误把 topic 里其他历史消息当成本次健康检查结果。

常见问题

TCP 通,但 send 失败

这种情况说明 NameServer 端口能访问,但 RocketMQ 链路还不完整。

可以重点检查:

1

2

3

4

5

topic 是否存在

broker 是否启动

broker 是否注册到了 NameServer

客户端机器是否能访问 broker 地址

是否开启了 ACL

尤其要注意:客户端连接的是 NameServer,但真正发送消息时,还会根据路由连接 broker。所以 NameServer 通,不代表 broker 一定通。

send 成功,但 roundtrip 消费不到

可以检查:

1

2

3

4

5

topic 是否正确

tag 是否匹配

consumer group 是否被其他程序占用

等待时间是否太短

broker 到客户端网络是否正常

可以适当把等待时间调大:

1

go run . -topic TopicTest -wait 60s

topic 不是 TopicTest 怎么办

直接用 -topic 参数指定真实 topic:

1

go run . -namesrv 10.x.x.x:9876 -topic YourRealTopic

小结

这次验证 RocketMQ 的过程可以总结成一句话:

TCP 通只能说明端口可达,生产消费闭环成功,才能更接近真实可用。

最终我用 Go 写了一个小工具,支持:

  1. tcp:只测 NameServer 端口
  2. send:发送一条测试消息
  3. roundtrip:发送并消费同一条测试消息

实际排障时建议按这个顺序来:

1

2

3

go run . -mode tcp -namesrv 10.x.x.x:9876

go run . -mode send -namesrv 10.x.x.x:9876 -topic TopicTest

go run . -mode roundtrip -namesrv 10.x.x.x:9876 -topic TopicTest

这样能快速判断问题到底在网络层、RocketMQ 路由层,还是生产消费链路上。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计