最近需要验证一套 RocketMQ 环境是否可用。最开始的诉求很简单:我有一个 RocketMQ NameServer 地址,端口是默认端口 9876,想确认它到底是不是正常。
很多时候我们会先用 telnet、nc 或者简单的 TCP 脚本测一下端口。这个动作当然有价值,但它只能说明:
当前机器到 RocketMQ NameServer 的网络是通的。
它不能证明 broker 正常、topic 可用,也不能证明消息真的可以成功生产和消费。
所以这次我做了一个小工具,分三层验证 RocketMQ:
下面记录一下完整过程。
本文中的内网地址做了脱敏处理,实际使用时替换成自己的 RocketMQ 地址即可。
|
1 2 3 |
NameServer: 10.x.x.x:9876 Go: go1.25.0 linux/amd64 RocketMQ Go Client: github.com/apache/rocketmq-client-go/v2 |
测试机器上 Go 环境正常:
|
1 |
go version |
输出类似:
|
1 |
go version go1.25.0 linux/amd64 |
一开始我尝试过 Python 版本,先做 TCP 测试,再用 rocketmq-client-python 做生产消费测试。
TCP 测试是正常的:
|
1 |
[OK] TCP connected to 10.x.x.x:9876 |
但是 Python 客户端遇到了两个问题。
在 Linux 上:
rocketmq-client-python 依赖 native 动态库,如果机器上缺少 librocketmq.so,会报:
|
1 |
rocketmq dynamic library not found |
Windows 上则更直接:
|
1 |
rocketmq-python does not support Windows |
所以最后我换成了 Go。Go 版本更适合这种轻量健康检查脚本,不需要额外处理 Python native 动态库的问题。
新建一个目录:
|
1 2 |
mkdir rocketmq-go-healthcheck cd rocketmq-go-healthcheck |
目录结构如下:
|
1 2 3 4 |
rocketmq-go-healthcheck/ ├── go.mod ├── go.sum └── main.go |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
module rocketmq-go-healthcheck
go 1.20
require github.com/apache/rocketmq-client-go/v2 v2.1.2
require ( github.com/emirpasic/gods v1.12.0 // indirect github.com/golang/mock v1.3.1 // indirect github.com/google/uuid v1.3.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pkg/errors v0.8.1 // indirect github.com/sirupsen/logrus v1.4.0 // indirect github.com/tidwall/gjson v1.13.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect go.uber.org/atomic v1.5.1 // indirect golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect stathat.com/c/consistent v1.0.0 // indirect ) |
然后执行:
|
1 |
go mod tidy # 只需要go.mod 和 main.go 然后执行这个会自动生成go.sum的 |
下面是完整的 main.go。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
package main
import ( "context" "flag" "fmt" "net" "os" "strings" "sync" "time"
rocketmq "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" )
func main() { var ( // NameServer 地址,默认端口是 9876。 namesrv = flag.String("namesrv", "10.x.x.x:9876", "RocketMQ NameServer address")
// 用于测试的 topic,需要替换成实际环境中存在的 topic。 topic = flag.String("topic", "TopicTest", "topic used for send/roundtrip")
// 测试消息使用的 tag,消费者也会按这个 tag 订阅。 tag = flag.String("tag", "healthcheck", "message tag")
// 支持三种模式:tcp 只测端口,send 只发消息,roundtrip 发送并消费。 mode = flag.String("mode", "roundtrip", "tcp, send, or roundtrip")
producerGroup = flag.String("producer-group", "PID_HEALTHCHECK_GO", "producer group")
// consumer group 默认带时间戳,避免多次测试时和已有消费者互相影响。 consumerGroup = flag.String("consumer-group", fmt.Sprintf("CID_HEALTHCHECK_GO_%d", time.Now().UnixNano()), "consumer group")
tcpTimeout = flag.Duration("tcp-timeout", 3*time.Second, "tcp connection timeout") waitTimeout = flag.Duration("wait", 20*time.Second, "roundtrip consume wait timeout")
// consumer 启动后稍等一下再发送消息,降低订阅尚未完成导致误判的概率。 warmup = flag.Duration("warmup", 2*time.Second, "consumer warmup time before send") ) flag.Parse()
fmt.Printf("[INFO] NameServer: %s\n", *namesrv) fmt.Printf("[INFO] Mode: %s\n", *mode)
// 第一层检查:先测 TCP。 // 如果这里不通,优先排查网络、防火墙、安全组、端口等问题。 if err := checkTCP(*namesrv, *tcpTimeout); err != nil { fmt.Printf("[FAIL] TCP connection failed: %v\n", err) os.Exit(2) } fmt.Println("[OK] TCP connected")
switch *mode { case "tcp": return case "send": // 第二层检查:只发送一条消息,用于验证 producer 到 broker 的链路。 if err := sendMessage(*namesrv, *topic, *tag, *producerGroup, marker()); err != nil { fmt.Printf("[FAIL] Send failed: %v\n", err) os.Exit(3) } case "roundtrip": // 第三层检查:发送并消费同一条消息,更接近真实可用性验证。 if err := roundtrip(*namesrv, *topic, *tag, *producerGroup, *consumerGroup, *waitTimeout, *warmup); err != nil { fmt.Printf("[FAIL] Roundtrip failed: %v\n", err) os.Exit(4) } default: fmt.Printf("[FAIL] Unknown mode %q, use tcp, send, or roundtrip\n", *mode) os.Exit(1) } }
func checkTCP(address string, timeout time.Duration) error { if !strings.Contains(address, ":") { address += ":9876" }
start := time.Now() conn, err := net.DialTimeout("tcp", address, timeout) if err != nil { return err } _ = conn.Close()
fmt.Printf("[OK] TCP connected to %s in %d ms\n", address, time.Since(start).Milliseconds()) return nil }
func sendMessage(namesrv, topic, tag, group, key string) error { p, err := rocketmq.NewProducer( producer.WithNameServer([]string{namesrv}), producer.WithGroupName(group), ) if err != nil { return err } if err := p.Start(); err != nil { return err } defer p.Shutdown()
// 每条测试消息都带唯一 key,方便 roundtrip 模式精确识别本次发送的消息。 body := fmt.Sprintf("rocketmq-go-healthcheck %s", key) msg := primitive.NewMessage(topic, []byte(body)) msg.WithTag(tag) msg.WithKeys([]string{key})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()
result, err := p.SendSync(ctx, msg) if err != nil { return err }
fmt.Printf("[OK] Sent message: status=%v msg_id=%s queue=%s offset=%d\n", result.Status, result.MsgID, result.MessageQueue.String(), result.QueueOffset) return nil }
func roundtrip(namesrv, topic, tag, producerGroup, consumerGroup string, waitTimeout, warmup time.Duration) error { // 本次健康检查的唯一标识,避免误消费 topic 中已有的历史消息。 key := marker() found := make(chan *primitive.MessageExt, 1) var once sync.Once
c, err := rocketmq.NewPushConsumer( consumer.WithNameServer([]string{namesrv}), consumer.WithGroupName(consumerGroup), consumer.WithConsumerModel(consumer.Clustering), ) if err != nil { return err }
// 只订阅指定 tag 的消息,减少无关消息干扰。 err = c.Subscribe(topic, consumer.MessageSelector{ Type: consumer.TAG, Expression: tag, }, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for _, msg := range msgs { body := string(msg.Body) if body == "" { continue }
// 只认本次发送的测试消息。 if containsKey(msg, key) || strings.Contains(body, key) { once.Do(func() { found <- msg }) } } return consumer.ConsumeSuccess, nil }) if err != nil { return err }
if err := c.Start(); err != nil { return err } defer c.Shutdown()
fmt.Printf("[INFO] Consumer started: group=%s topic=%s tag=%s\n", consumerGroup, topic, tag) time.Sleep(warmup)
// consumer 启动并完成短暂预热后,再发送测试消息。 if err := sendMessage(namesrv, topic, tag, producerGroup, key); err != nil { return err }
select { case msg := <-found: fmt.Printf("[OK] Consumed healthcheck message: msg_id=%s queue=%s offset=%d body=%s\n", msg.MsgId, msg.Queue.String(), msg.QueueOffset, string(msg.Body)) return nil case <-time.After(waitTimeout): return fmt.Errorf("message was sent but not consumed within %s", waitTimeout) } }
func marker() string { return fmt.Sprintf("healthcheck-%d", time.Now().UnixNano()) }
func containsKey(msg *primitive.MessageExt, key string) bool { keys := msg.GetKeys() return keys == key || strings.Contains(keys, key) } |
这个工具支持三种模式。
|
1 |
go run . -mode tcp -namesrv 10.x.x.x:9876 |
如果成功,会看到类似输出:
|
1 2 3 4 |
[INFO] NameServer: 10.x.x.x:9876 [INFO] Mode: tcp [OK] TCP connected to 10.x.x.x:9876 in 40 ms [OK] TCP connected |
这个结果只能说明网络和端口没问题,不能说明 RocketMQ 生产消费一定正常。
|
1 |
go run . -mode send -namesrv 10.x.x.x:9876 -topic TopicTest |
成功输出类似:
|
1 |
[OK] Sent message: status=SendOK msg_id=xxx queue=xxx offset=123 |
如果这一步失败,常见原因包括:
|
1 |
go run . -namesrv 10.x.x.x:9876 -topic TopicTest |
默认模式就是 roundtrip,等价于:
|
1 |
go run . -mode roundtrip -namesrv 10.x.x.x:9876 -topic TopicTest |
成功输出类似:
|
1 2 3 |
[INFO] Consumer started: group=CID_HEALTHCHECK_GO_xxx topic=TopicTest tag=healthcheck [OK] Sent message: status=SendOK msg_id=xxx queue=xxx offset=123 [OK] Consumed healthcheck message: msg_id=xxx queue=xxx offset=123 body=rocketmq-go-healthcheck healthcheck-xxx |
看到这类输出,基本就可以说明:
这比单纯测端口更有说服力。
代码一开始先调用 checkTCP。
这样做的好处是可以快速区分问题类型:
|
1 2 3 |
TCP 不通:优先查网络、防火墙、安全组、端口 TCP 通但发送失败:优先查 RocketMQ 路由、topic、broker、ACL 发送成功但消费失败:优先查 consumer group、tag、订阅、broker 到客户端网络 |
排障时分层非常重要,不然很容易一上来就陷入客户端报错细节里。
代码里是先启动 consumer,再发送消息:
|
1 2 3 4 5 6 7 8 9 |
if err := c.Start(); err != nil { return err }
time.Sleep(warmup)
if err := sendMessage(namesrv, topic, tag, producerGroup, key); err != nil { return err } |
这是为了避免消费者还没完成订阅初始化,消息就已经发出去了。虽然 RocketMQ 本身是消息队列,但对于这种一次性的健康检查脚本来说,先启动 consumer,等一小会儿,再发送消息,结果更稳定。
每次运行会生成一个类似这样的 key:
|
1 |
healthcheck-171xxxxxxxxxxxxx |
消费回调里只接受包含这个 marker 的消息:
|
1 2 3 |
if containsKey(msg, key) || strings.Contains(body, key) { found <- msg } |
这样可以避免误把 topic 里其他历史消息当成本次健康检查结果。
这种情况说明 NameServer 端口能访问,但 RocketMQ 链路还不完整。
可以重点检查:
|
1 2 3 4 5 |
topic 是否存在 broker 是否启动 broker 是否注册到了 NameServer 客户端机器是否能访问 broker 地址 是否开启了 ACL |
尤其要注意:客户端连接的是 NameServer,但真正发送消息时,还会根据路由连接 broker。所以 NameServer 通,不代表 broker 一定通。
可以检查:
|
1 2 3 4 5 |
topic 是否正确 tag 是否匹配 consumer group 是否被其他程序占用 等待时间是否太短 broker 到客户端网络是否正常 |
可以适当把等待时间调大:
|
1 |
go run . -topic TopicTest -wait 60s |
直接用 -topic 参数指定真实 topic:
|
1 |
go run . -namesrv 10.x.x.x:9876 -topic YourRealTopic |
这次验证 RocketMQ 的过程可以总结成一句话:
TCP 通只能说明端口可达,生产消费闭环成功,才能更接近真实可用。
最终我用 Go 写了一个小工具,支持:
实际排障时建议按这个顺序来:
|
1 2 3 |
go run . -mode tcp -namesrv 10.x.x.x:9876 go run . -mode send -namesrv 10.x.x.x:9876 -topic TopicTest go run . -mode roundtrip -namesrv 10.x.x.x:9876 -topic TopicTest |
这样能快速判断问题到底在网络层、RocketMQ 路由层,还是生产消费链路上。