|
package main
import (
"context"
"flag"
"fmt"
"net"
"os"
"strings"
"sync"
"time"
rocketmq "github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
var (
// NameServer 地址,默认端口是 9876。
namesrv = flag.String("namesrv", "10.x.x.x:9876", "RocketMQ NameServer address")
// 用于测试的 topic,需要替换成实际环境中存在的 topic。
topic = flag.String("topic", "TopicTest", "topic used for send/roundtrip")
// 测试消息使用的 tag,消费者也会按这个 tag 订阅。
tag = flag.String("tag", "healthcheck", "message tag")
// 支持三种模式:tcp 只测端口,send 只发消息,roundtrip 发送并消费。
mode = flag.String("mode", "roundtrip", "tcp, send, or roundtrip")
producerGroup = flag.String("producer-group", "PID_HEALTHCHECK_GO", "producer group")
// consumer group 默认带时间戳,避免多次测试时和已有消费者互相影响。
consumerGroup = flag.String("consumer-group", fmt.Sprintf("CID_HEALTHCHECK_GO_%d", time.Now().UnixNano()), "consumer group")
tcpTimeout = flag.Duration("tcp-timeout", 3*time.Second, "tcp connection timeout")
waitTimeout = flag.Duration("wait", 20*time.Second, "roundtrip consume wait timeout")
// consumer 启动后稍等一下再发送消息,降低订阅尚未完成导致误判的概率。
warmup = flag.Duration("warmup", 2*time.Second, "consumer warmup time before send")
)
flag.Parse()
fmt.Printf("[INFO] NameServer: %s\n", *namesrv)
fmt.Printf("[INFO] Mode: %s\n", *mode)
// 第一层检查:先测 TCP。
// 如果这里不通,优先排查网络、防火墙、安全组、端口等问题。
if err := checkTCP(*namesrv, *tcpTimeout); err != nil {
fmt.Printf("[FAIL] TCP connection failed: %v\n", err)
os.Exit(2)
}
fmt.Println("[OK] TCP connected")
switch *mode {
case "tcp":
return
case "send":
// 第二层检查:只发送一条消息,用于验证 producer 到 broker 的链路。
if err := sendMessage(*namesrv, *topic, *tag, *producerGroup, marker()); err != nil {
fmt.Printf("[FAIL] Send failed: %v\n", err)
os.Exit(3)
}
case "roundtrip":
// 第三层检查:发送并消费同一条消息,更接近真实可用性验证。
if err := roundtrip(*namesrv, *topic, *tag, *producerGroup, *consumerGroup, *waitTimeout, *warmup); err != nil {
fmt.Printf("[FAIL] Roundtrip failed: %v\n", err)
os.Exit(4)
}
default:
fmt.Printf("[FAIL] Unknown mode %q, use tcp, send, or roundtrip\n", *mode)
os.Exit(1)
}
}
func checkTCP(address string, timeout time.Duration) error {
if !strings.Contains(address, ":") {
address += ":9876"
}
start := time.Now()
conn, err := net.DialTimeout("tcp", address, timeout)
if err != nil {
return err
}
_ = conn.Close()
fmt.Printf("[OK] TCP connected to %s in %d ms\n", address, time.Since(start).Milliseconds())
return nil
}
func sendMessage(namesrv, topic, tag, group, key string) error {
p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{namesrv}),
producer.WithGroupName(group),
)
if err != nil {
return err
}
if err := p.Start(); err != nil {
return err
}
defer p.Shutdown()
// 每条测试消息都带唯一 key,方便 roundtrip 模式精确识别本次发送的消息。
body := fmt.Sprintf("rocketmq-go-healthcheck %s", key)
msg := primitive.NewMessage(topic, []byte(body))
msg.WithTag(tag)
msg.WithKeys([]string{key})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
result, err := p.SendSync(ctx, msg)
if err != nil {
return err
}
fmt.Printf("[OK] Sent message: status=%v msg_id=%s queue=%s offset=%d\n",
result.Status, result.MsgID, result.MessageQueue.String(), result.QueueOffset)
return nil
}
func roundtrip(namesrv, topic, tag, producerGroup, consumerGroup string, waitTimeout, warmup time.Duration) error {
// 本次健康检查的唯一标识,避免误消费 topic 中已有的历史消息。
key := marker()
found := make(chan *primitive.MessageExt, 1)
var once sync.Once
c, err := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{namesrv}),
consumer.WithGroupName(consumerGroup),
consumer.WithConsumerModel(consumer.Clustering),
)
if err != nil {
return err
}
// 只订阅指定 tag 的消息,减少无关消息干扰。
err = c.Subscribe(topic, consumer.MessageSelector{
Type: consumer.TAG,
Expression: tag,
}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
body := string(msg.Body)
if body == "" {
continue
}
// 只认本次发送的测试消息。
if containsKey(msg, key) || strings.Contains(body, key) {
once.Do(func() {
found <- msg
})
}
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
return err
}
if err := c.Start(); err != nil {
return err
}
defer c.Shutdown()
fmt.Printf("[INFO] Consumer started: group=%s topic=%s tag=%s\n", consumerGroup, topic, tag)
time.Sleep(warmup)
// consumer 启动并完成短暂预热后,再发送测试消息。
if err := sendMessage(namesrv, topic, tag, producerGroup, key); err != nil {
return err
}
select {
case msg := <-found:
fmt.Printf("[OK] Consumed healthcheck message: msg_id=%s queue=%s offset=%d body=%s\n",
msg.MsgId, msg.Queue.String(), msg.QueueOffset, string(msg.Body))
return nil
case <-time.After(waitTimeout):
return fmt.Errorf("message was sent but not consumed within %s", waitTimeout)
}
}
func marker() string {
return fmt.Sprintf("healthcheck-%d", time.Now().UnixNano())
}
func containsKey(msg *primitive.MessageExt, key string) bool {
keys := msg.GetKeys()
return keys == key || strings.Contains(keys, key)
}
|