MQTT是一种轻量级的发布/订阅消息传输协议,设计用于低带宽和高延迟的网络环境,非常适合物联网设备之间的通信。其主要特点包括:
本文的示例代码实现了一个基于Python的MQTT客户端。以下功能涵盖在代码中:
以下是代码中的主要功能与模块解析:
1 2 3 4 5 6 7 |
class MQTTClient: def __init__(self, broker, port, username, password, ca_cert, topics): self.client = mqtt.Client() self.client.username_pw_set(self.username, self.password) self.client.tls_set(ca_certs=self.ca_cert) self.client.on_connect = self.on_connect self.client.on_message = self.on_message |
tls_set:启用SSL/TLS以确保通信安全。
主题订阅:在连接成功时,自动订阅指定的主题。
1 2 |
def set_message_handler(self, handler): self.custom_message_handler = handler |
用户可通过该方法传入自定义的回调函数,从而根据业务逻辑处理消息。
1 2 3 |
async def start_async(self): self.connect() await asyncio.get_event_loop().run_in_executor(None, self.client.loop_forever) |
通过异步事件循环确保消息的高效处理,同时避免阻塞主线程。
在主文件main.py中,定义了如下流程:
1 2 3 4 5 |
async def on_mqtt_message(topic, payload): print(f"Custom handler: {topic} -> {payload}")
mqtt_client.set_message_handler(on_mqtt_message) await mqtt_client.start_async() |
确保安装了paho-mqtt库:
1 |
pip install paho-mqtt |
更新代码中的代理地址、端口、用户名、密码和证书路径。
使用以下命令运行程序:
1 |
python main.py |
快速搭建一个基于MQTT协议的实时通信系统。这种架构不仅适用于物联网场景,也可以在各种需要实时数据推送的应用中发挥作用,例如聊天应用和实时监控系统。
mqtt.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
import paho.mqtt.client as mqtt from datetime import datetime import asyncio
class MQTTClient: def __init__(self, broker, port, username, password, ca_cert, topics): """ 初始化 MQTT 客户端 """ self.broker = broker self.port = port self.username = username self.password = password self.ca_cert = ca_cert self.topics = topics self.client = mqtt.Client()
# 配置 MQTT 客户端 self.client.username_pw_set(self.username, self.password) self.client.tls_set(ca_certs=self.ca_cert) self.client.on_connect = self.on_connect self.client.on_message = self.on_message
self.custom_message_handler = None # 自定义消息处理器
def set_message_handler(self, handler): """ 设置自定义消息处理回调函数 """ self.custom_message_handler = handler
def on_connect(self, client, userdata, flags, rc): """ 连接成功时的回调 """ if rc == 0: print("SSL连接成功") for topic in self.topics: client.subscribe(topic) print(f"已订阅主题: {topic}") else: print(f"连接失败,返回码: {rc}")
def on_message(self, client, userdata, msg): """ 收到消息时的回调 """ current_time = datetime.now() payload = msg.payload.decode() print(f"收到消息: {msg.topic} -> {payload} 时间: {current_time}")
if self.custom_message_handler and self.event_loop: asyncio.run_coroutine_threadsafe( self.custom_message_handler(msg.topic, payload), self.event_loop )
def connect(self): """ 连接到 MQTT 服务器 """ self.client.connect(self.broker, self.port, keepalive=60)
async def start_async(self): """ 异步运行 MQTT 客户端 """ self.connect() # 确保连接到 MQTT 服务器 print("Starting MQTT client loop...")
# 异步运行 MQTT 客户端的事件循环 loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.client.loop_forever) |
main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
import asyncio from mqtt import MQTTClient
# MQTT 配置 MQTT_BROKER = "你的服务器地址" MQTT_PORT = 8883 # 使用 SSL 的端口 MQTT_USERNAME = "用户名" MQTT_PASSWORD = "密码" CA_CERT = "./emqxsl-ca.crt" # CA 证书路径 TOPICS = ["clients/disconnect", "uhome/esp32"] # 订阅的主题列表
async def main(): loop = asyncio.get_running_loop()
mqtt_client = MQTTClient( broker=MQTT_BROKER, port=MQTT_PORT, username=MQTT_USERNAME, password=MQTT_PASSWORD, ca_cert=CA_CERT, topics=TOPICS )
async def on_mqtt_message(topic, payload): print(f"Custom handler: {topic} -> {payload}")
mqtt_client.set_message_handler(on_mqtt_message) mqtt_client.event_loop = loop # 将事件循环传递给 MQTT 客户端 await mqtt_client.start_async()
await asyncio.gather(websocket_task, periodic_task)
if __name__ == "__main__": asyncio.run(main()) |