如何在 Python3 中使用 MQTT 客户端库 Paho Client

目录
引言
MQTT 是一种轻量级的消息传输协议,采用发布/订阅模式,使用最少的代码和带宽提供可靠的实时通信。它特别适用于资源有限和低带宽网络的设备,因此在物联网、移动互联网、车联网和电力行业得到了广泛应用。
Python 因其灵活性、易用性和丰富的库而在物联网中被广泛使用。由于能够处理大量数据,Python 非常适合智能家居自动化、环境监测和工业控制。它与微控制器兼容,使其成为开发物联网设备的重要工具。
本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端,实施与 MQTT 代理之间的连接、订阅、消息传递等功能。
为什么选择 Paho MQTT Python 客户端?
Paho Python 客户端提供了一个支持 MQTT 5.0、3.1.1 和 3.1 的客户端,适用于 Python 2.7 或 3.x。它还提供了一些辅助函数,使得向 MQTT 服务器发布单条消息变得非常简单。
作为 Python 社区中最受欢迎的 MQTT 客户端库,Paho MQTT Python 客户端具有以下优点:
- 开源且得到社区支持;
- 简洁易用的 API,便于连接 MQTT 服务器并进行消息的发布/订阅;
- 支持多种安全机制;
- 积极开发和维护,以适应快速发展的物联网环境。
Python MQTT 项目准备
Python 版本
该项目在 Python 3.11 中开发和测试。请确认您安装了正确的 Python 版本,可以使用以下命令:
$ python3 --version
Python 3.11.8
安装 Paho MQTT 客户端
paho-mqtt 在 2024 年 2 月发布了 2.0.0 版本,相比 1.X 版本有一些重要更新。本文主要演示 1.X 版本的代码,同时也会提供 2.0.0 版本的相应代码,供读者选择合适的 paho-mqtt 版本。
有关 2.0.0 版本的详细变更,请参阅文档:Migrations — Eclipse paho-mqtt documentation
使用 Pip 安装 paho-mqtt 1.X
pip3 install "paho-mqtt<2.0.0"
使用 Pip 安装 paho-mqtt 2.X
pip3 install paho-mqtt
如果您需要安装 Pip 的帮助,请参考官方文档:Installation - pip documentation v24.2 。该资源提供了在不同操作系统和环境中安装 Pip 的详细说明。
准备 MQTT Broker
在开始之前,请确保您有一个 MQTT Broker 用于通信和测试。我们建议使用 EMQX Platform 的 Serverless 版本。
EMQX Platform 是一个全托管的 MQTT 消息云服务,可以无缝连接您的物联网设备到任何云端,无需维护基础设施。EMQX Serverless 在安全、可扩展的集群上提供 MQTT 服务,并采用按量计费的定价模式,是适合快速开启 MQTT 项目的灵活经济的解决方案。
为简化流程,本文将使用免费的公共 MQTT 服务器:
- 服务器:
http://broker.emqx.io - TCP 端口:
1883 - WebSocket 端口:
8083 - SSL/TLS 端口:
8883 - 安全 WebSocket 端口:
8084
Paho MQTT Python 客户端使用
导入 Paho MQTT 客户端:
from paho.mqtt import client as mqtt_client
创建 MQTT 连接
TCP 连接
我们需要指定 MQTT 连接的代理地址、端口和主题。此外,我们可以使用 Python 的 random.randint 函数生成随机的客户端 ID。
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
# username = 'emqx'
# password = 'public'
了解更多请查看博客: 创建 MQTT 连接时如何设置参数
接下来,我们需要编写 on_connect 回调函数,以便连接代理。此函数在客户端成功连接后被调用,我们可以使用 rc 参数检查连接状态。通常,我们还会创建一个同时连接到 broker.emqx.io 的客户端对象。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
# For paho-mqtt 2.0.0, you need to add the properties parameter.
# def on_connect(client, userdata, flags, rc, properties):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
# For paho-mqtt 2.0.0, you need to set callback_api_version.
# client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
# client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
自动重连
在 MQTT 客户端库中,自动重连功能确保在不稳定的网络条件下,设备与代理之间可靠的通信,无需人工干预。当网络连接中断或代理暂时不可用时,客户端可以恢复发布或订阅主题,这对于汽车系统和医疗设备等高可靠性应用至关重要。
Paho MQTT 客户端的自动重连代码如下:
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
def on_disconnect(client, userdata, rc):
logging.info("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
logging.info("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
然后,将其设置为客户端对象的 on_disconnect。
client.on_disconnect = on_disconnect
客户端自动重连的完整代码请见:GitHub.
TLS/SSL
在 MQTT 中使用 TLS 可以确保信息的机密性和完整性,防止信息泄露和篡改。TLS 认证可以分为单向认证和双向认证。
单向认证
Paho MQTT 客户端的单向认证代码如下:
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
双向认证
Paho MQTT 客户端的双向认证代码如下:
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.tls_set(
ca_certs='./server-ca.crt',
certfile='./client.crt',
keyfile='./client.key'
)
发布消息
创建一个 while 循环,每秒向主题 /python/mqtt 发送一条消息,并在发送 5 条消息后退出循环。
def publish(client):
msg_count = 1
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
if msg_count > 5:
break
订阅
创建消息回调函数 on_message,当客户端收到来自 MQTT Broker 的消息时触发。我们将在此函数中打印订阅主题的名称和收到的消息。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
完整代码
MQTT 消息发布代码
# python 3.11
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# Generate a Client ID with the publish prefix.
client_id = f'publish-{random.randint(0, 1000)}'
# username = 'emqx'
# password = 'public'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
# client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 1
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
if msg_count > 5:
break
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
client.loop_stop()
if __name__ == '__main__':
run()
MQTT 订阅代码
# python 3.11
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# Generate a Client ID with the subscribe prefix.
client_id = f'subscribe-{random.randint(0, 100)}'
# username = 'emqx'
# password = 'public'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
# client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
测试
订阅
运行 MQTT 订阅脚本 sub.py,我们将看到客户端成功连接并开始等待发布者发布消息。
python3 sub.py

发布消息
运行 MQTT 消息发布脚本 pub.py,我们会看到客户端成功连接并发布了 5 条消息。同时 sub.py 也会成功收到 5 条消息。
python3 pub.py

Paho MQTT Python 客户端的常见问题
如果不执行 loop_stop() 会发生什么?
Loop_stop() 用于停止 MQTT 客户端的消息循环并将其标记为已停止。此过程可确保客户端正常关闭,从而降低消息丢失、连接泄漏和异常程序行为等问题的风险。
例如,在本文提供的 pub.py 示例中,删除 client.loop_stop() 方法可能会导致 sub.py 脚本接收到的消息少于 5 条。
因此,正确使用 loop_stop() 方法来确保 MQTT 客户端正常关闭并防止由于未关闭连接而可能出现的任何潜在问题至关重要。
connect_async() 是用来做什么的?
connect_async() 在 MQTT 客户端应用程序需要长期 MQTT 连接或需要在后台保持 MQTT 连接处于活动状态而不阻塞主线程的情况下很有用。其主要使用场景有:
- 长期 MQTT 连接:
connect_async()有助于防止需要长期 MQTT 连接的 MQTT 客户端应用程序停滞或无响应,例如在工业应用程序中。 - 网络连接不稳定:在网络连接不确定或不稳定的环境中,可以使用
connect_async()通过重试和延迟建立连接来提高应用程序的可靠性。 - 频繁的连接和参数更改:当连接参数或其他设置频繁更改时,
connect_async()有助于提高应用程序响应能力并防止卡顿。 - 后台 MQTT 连接:
connect_async()允许在应用程序运行其他进程时在后台建立 MQTT 连接,从而增强用户体验。
Python MQTT 应用的最佳实践
要构建稳健、高效且安全的 Python MQTT 应用程序,除了基本功能之外,还必须遵循一些最佳实践。这些建议将帮助您避免常见的陷阱,并确保您的物联网解决方案保持稳定可靠。
1. 唯一客户端 ID 至关重要
在 MQTT 协议中,客户端 ID ( client_id) 是连接到 MQTT Broker 的每个设备的唯一标识符。它的唯一性对于正确的会话管理和消息传递至关重要。
为什么需要唯一性?如果两个客户端通过同一个
client_id连接到 Broker,新连接的客户端将迫使旧客户端断开连接。这可能会导致您的设备频繁断开连接或关键消息无法正确传递。How to Generate Unique IDs: Avoid using hardcoded fixed IDs or simple random numbers. The best practice involves combining elements like device serial numbers, MAC addresses, or UUIDs (Universally Unique Identifiers). Python's
uuidmodule is excellent for generating globally unique IDs.如何生成唯一 ID:避免使用硬编码的固定 ID 或简单的随机数。最佳做法是组合设备序列号、MAC 地址或 UUID(通用唯一标识符)等元素。Python 的
uuid模块非常适合生成全局唯一 ID。import uuid # ... other imports and setup # Generate a unique client ID based on the MAC address client_id = f'python-mqtt-client-{uuid.getnode()}' # Or generate a random UUID # client_id = f'python-mqtt-client-{uuid.uuid4()}'
2. 资源管理和优雅关闭
当你的 Python MQTT 客户端不再需要活动连接时,必须优雅地关闭连接并释放资源。这可以防止资源泄漏,并确保 Broker 正确管理客户端状态。
使用
client.disconnect():当您的应用程序退出或不再需要 MQTT 连接时,调用此方法主动断开连接。这会向 Broker 发送 DISCONNECT 数据包,表示客户端正常退出。
使用
client.loop_stop():如果您在后台线程中运行消息循环(例如,使用
client.loop_start()),请确保在程序退出之前调用client.loop_stop()。这会停止线程,防止程序挂起或资源被释放。def run(): client = connect_mqtt() client.loop_start() publish(client) # When the program ends or the connection is no longer needed client.loop_stop() # Stop the message loop thread client.disconnect() # Disconnect from the Broker print("MQTT Client disconnected gracefully.") if __name__ == '__main__': run()
3. 强大的错误处理和日志记录
在任何生产就绪的应用程序中,错误处理与日志记录都是不可或缺的。它们有助于追踪应用程序行为、诊断问题以及监控系统健康状况。
处理
on_connect返回代码:on_connect回调中的rc参数表示连接结果。请检查此值,以了解导致连接失败的原因。def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") # Potentially subscribe to topics here after successful connection else: print(f"Failed to connect, return code {rc}. Please check connection parameters and network.") # Implement retry logic or exit if critical异常处理:网络问题、身份验证失败或格式错误的消息都可能引发异常。建议在网络操作,例如:
client.connect()、client.publish()周围使用try-except代码块,以优雅地处理这些异常。try: client.connect(broker, port) except Exception as e: print(f"Connection attempt failed: {e}") # Log the error, perhaps retry after a delay利用 Python
logging模块:配置logging模块以记录 Paho MQTT 客户端的活动,包括连接状态、发布/订阅事件以及任何错误或警告。这对于在生产环境中进行调试和监控至关重要。import logging # ... other imports # Configure logging (e.g., to console and/or file) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # In your on_connect function: def on_connect(client, userdata, flags, rc): if rc == 0: logging.info("Connected to MQTT Broker!") else: logging.error(f"Failed to connect, return code {rc}") # In your publish function: def publish(client): # ... status = result[0] if status == 0: logging.info(f"Sent `{msg}` to topic `{topic}`") else: logging.warning(f"Failed to send message to topic {topic}. Status: {status}") # ...
4. 理解并使用服务质量 (QoS) 级别
QoS(服务质量)是 MQTT 中的一个基本概念,用于保证消息的传递。理解并正确设置 QoS 级别对于消息的可靠性至关重要。
- QoS 0(最多一次):消息投递时不保证到达。该模式快速轻量,但消息可能会丢失。非常适合偶尔丢失数据的传感器读数(例如,每隔几秒测量一次温度)。
client.publish(topic, msg, qos=0)
- QoS 1(至少一次):消息保证到达,但可能出现重复。发送方会重新发送,直到收到确认。适用于可以处理重复数据的重要数据(例如控制命令)。
client.publish(topic, msg, qos=1)
- QoS 2(恰好一次):保证消息恰好到达一次。这是最慢但最可靠的级别。适用于不允许丢失或重复的关键操作(例如金融交易)。
client.publish(topic, msg, qos=2)
最佳实践:选择满足应用程序可靠性要求的最低 QoS 级别,以优化带宽和延迟。
5. 利用保留消息与遗嘱消息(LWT)功能
这两项 MQTT 功能显著增强了应用程序的可靠性和用户体验。
保留消息:
用途:保留消息是 Broker 针对特定主题存储的常规 MQTT 消息。当新订阅者订阅该主题时,他们会立即收到最后一条保留消息。这非常适合广播当前状态(例如,“门开/关”、“灯亮/关”)。
用途:发布带有
retain=True标志的消息 。client.publish("home/door/status", "open", qos=1, retain=True) # Any new subscriber to "home/door/status" will immediately get "open"
遗嘱消息(LWT):
用途:LWT,也称为“遗嘱消息”,是指当客户端意外断开连接(例如断电、网络故障)且未发送 DISCONNECT 数据包时,Broker 会自动在预定义主题上发布一条消息。它就像是您设备状态的一份数字“遗嘱”。
用途:设置连接时的 LWT 消息。
# Set the Last Will message client.will_set("device/status", "offline", qos=1, retain=True) # Then connect as usual client.connect(broker, port)如果该客户端意外断开连接,“离线”将被发布到“设备/状态”。
6. 数据序列化和反序列化
通过 MQTT 发送数据时,通常以字节数组的形式发送。对于复杂数据,您需要在发布前将其序列化,并在接收时将其反序列化。
JSON 为王: JSON(JavaScript 对象表示法)是一种广受欢迎的格式,因为它易于人类阅读并且易于在 Python 中解析。
例如:
import json # --- Publisher Side --- data_to_send = {"sensor_id": "temp_001", "temperature": 25.5, "unit": "C"} json_payload = json.dumps(data_to_send) # Serialize Python dict to JSON string client.publish(topic, json_payload, qos=1) # --- Subscriber Side --- def on_message(client, userdata, msg): try: received_data = json.loads(msg.payload.decode('utf-8')) # Decode bytes, then deserialize JSON print(f"Received JSON data from `{msg.topic}`: {received_data}") print(f"Temperature: {received_data['temperature']} {received_data['unit']}") except json.JSONDecodeError: print(f"Received non-JSON message from `{msg.topic}`: {msg.payload.decode()}") except KeyError as e: print(f"Missing key in JSON payload: {e}")
7. 安全性(TLS/SSL和身份验证)
虽然文章已经涉及了 TLS/SSL,但值得再次强调它与身份验证的重要性。
始终使用 TLS/SSL:对于任何敏感数据或生产环境,请始终使用 TLS/SSL (
client.tls_set()) 加密客户端与 Broker 之间的通信。未加密的 MQTT 容易受到窃听。实施身份验证:大多数公共服务器(以及所有生产服务器)都需要用户名和密码身份验证(
client.username_pw_set())。切勿使用默认或空的凭证。# Ensure you uncomment and set these for real applications username = 'your_mqtt_username' password = 'your_mqtt_password' client.username_pw_set(username, password)
通过将这些最佳实践集成到您的 Python MQTT 项目中,您将构建更强大、更易于维护、更安全的物联网应用程序。
结语
本文介绍了如何使用 paho-mqtt 客户端连接到免费的公共 MQTT Broker。我们成功实现了连接过程,使用 publish() 方法将消息从测试客户端发送到 Broker,并使用 subscribe() 方法从 Broker 订阅消息。
接下来,您可以查看由 EMQ 提供的《MQTT 教程》系列,了解 MQTT 协议的特性,探索更多 MQTT 的高级应用,进行 MQTT 应用与服务开发。
