Eclipse Paho MQTT Python Client 使用手册
原文地址:https://www.cooooder.com/archives/20210303
目录
- 介绍
- 环境
- 准备
- 快速开始
- 常用API
- Client
- 回调函数
- 示例
介绍
paho.mqtt.python 是一个MQTT客户端python库,能够让应用程序简单方便的连接到MQTT代理进行消息发布、订阅主题和消息接收。
目前 paho.mqtt.python-1.5.1 版本支持5.0、3.1.1和3.1 MQTT协议,同时支持Python 2.7.9+或3.5+。
环境
- MQTT代理:EMQ X Broker 4.2.6
- Python 3.9.0
- paho-mqtt 1.5.1
准备
-
参照 EMQ X Broker安装启动教程
成功启动EMQ后,可通过浏览器访问 http://localhost:18083 admin/public 进入EMQ控制台,在【工具 > Websocket】模块可方便进行客户端连接、订阅、消息接收、发布等测试和调试工作 -
Python 安装省略
-
paho-mqtt 安装
pip install paho-mqtt
快速开始
Python快速实现MQTT主题订阅和消息接收
import paho.mqtt.client as mq_tt
def on_connect(client, userdata, flags, rc):
"""
回调函数:当MQTT代理响应客户端连接请求时触发
:param client: 回调返回的客户端实例
:param userdata: Client()或user_data_set()中设置的私有用户数据
:param flags: MQTT代理发送的响应标识
:param rc: 连接结果
0:连接成功
1:连接被拒绝 - 协议版本
2: 连接被拒绝 - 客户端标识符无效
3:连接被拒绝 - 服务器不可用
4:连接被拒绝 - 用户名或密码错误
5:连接被拒绝 - 未授权6-255:当前未使用
:return:
"""
print("Connected with result code "+str(rc))
# 在on_connect()中进行消息订阅,是因为如果丢失连接进行重连,主题也会重新被订阅
client.subscribe("testTopic/#")
def on_message(client, userdata, message):
"""
回调函数:当接收到MQTT代理发布的消息时触发
:param client: 回调返回的客户端实例
:param userdata: Client()或user_data_set()中设置的私有用户数据
:param message: MQTTMessage的一个实例,这是一个包含主题,有效负载,qos,retain的类
:return:
"""
print(message.topic+" "+str(message.payload))
mq_client = mq_tt.Client(client_id='www.cooooder.com')
mq_client.on_connect = on_connect
mq_client.on_message = on_message
# 连接到EMQX Broker MQTT代理
mq_client.connect("127.0.0.1", 1883, 60)
# 阻塞式自动处理收发数据、自动处理重新连接,所有的数据处理逻辑都在预先设定好的回调函数中进行的
mq_client.loop_forever()
在 EMQ X Broker - 【客户端】可以看到客户端已连接
查看原图
在 EMQ X Broker - 【Websocket】发布testTopic主题消息
查看原图
Python程序打印出接收到的消息
Connected with result code 0
testTopic b'{ "msg": "Hello, World!" }'
testTopic b'{ "msg": "Hello, World2!" }'
常用API
Client
Client类实例常规用法流程如下:
- 创建一个客户端实例
- 使用任一 connect*() 方法连接到MQTT代理
- 调用任一 loop*() 方法保持与MQTT代理通讯
- 使用 subscribe() 方法订阅一个主题并接收消息
- 使用 publish() 方法向MQTT代理发布消息
- 使用 disconnect() 中断与MQTT代理的连接
client()
# 构造方法
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
- client_id
- 连接到MQTT代理时使用的唯一客户端ID字符串。如果为0或者为None,将随机生成分配一个,这种情况下clean_session参数必须为True
- clean_session
- 布尔值类型,用来确定客户端类型。如果为True,当断开连接时,MQTT代理将移除该客户端的所有信息;如果为False,客户端则为持久客户端,当断开连接时,订阅信息和消息队列将被MQTT保存
- 当断开连接时,客户端不会丢弃自己发送的消息。调用 connect() 或者 reconnect() 将导致重新发送消息,只有使用 reinitialise() 可以将客户端重置为初始状态
- userdata
- 用户定义的任意类型数据作为 userdata 参数传递给回调函数,可以通过调用user_data_set() 方法进行更新,不过会有点延迟
- protocol
- 客户端使用的MQTT协议版本,可以是 MQTTv31 或 MQTTv311
- transport
- 传输形式,设置为websockets,则会通过websockets发送给MQTT,默认tcp
- 示例
import paho.mqtt.client as mqtt
mqttc = mqtt.Client()
connect()
connect(host, port=1883, keepalive=60, bind_address="")
客户端连接MQTT代理,这是一个阻塞函数
- host
- 代理的主机名或者IP地址
- port
- 连接服务的端口,默认1883
- keepalive
- 心跳检测时长
- bind_address
- 绑定此客户端本地网络的IP地址
- 回调函数
- on_connect()
connect_async()
connect_async(host, port=1883, keepalive=60, bind_address="")
与 loop_start() 结合使用以非阻塞的形式进行连接,在调用 loop_start() 之前,连接不会完成
disconnect()
disconnect()
彻底与MQTT代理断开,使用该方法断开连接不会让代理发送遗嘱消息
- 回调函数
- on_disconnect()
enable_logger()
enable_logger(logger=None)
使用标准的Python日志包启用日志记录,可以与on_log回调方法同时使用
reconnect()
reconnect()
使用之前的信息配置重新连接代理,在调用之前必须先调用 connect*() 方法
reinitialise()
重置客户端为初始化状态,参数与 client() 一致
- 示例
mqttc.reinitialise()
loop()
loop(timeout=1.0, max_packets=1)
定期调用处理事件
- timeout
- 最大阻塞的秒数
- max_packets
- 已过期,不设置
- 示例
while True:
mqttc.loop()
loop_start() / loop_stop()
loop_start()
loop_stop(force=False)
这些函数实现了网络循环的线程接口,在执行connect*()之前或者之后调用一次 loop_start() ,后台会自动运行一个线程调用 loop() ,这样就释放了主线程去执行其它工作,避免发生阻塞,这个调用也处理重新连接到代理。调用 loop_stop() 停止后台线程
mqttc.connect("127.0.0.1")
mqttc.loop_start()
while True:
mqttc.publish("topicTest", 'test')
loop_forever()
阻塞式网络循环处理事件,直到客户端调用 disconnect() 才会返回,它会自动重连
publish()
publish(topic, payload=None, qos=0, retain=False)
客户端向MQTT代理发送一条消息
- topic
- 消息发布的主题,不能为None或者空字符
- payload
- 发送的消息内容,如果没有赋值或者赋值为None,则将使用零长度的消息。传递int或者float将会被转换为该数字的字符串, 如果想发送真正的int或者float数据,使用 struct.pack() 去创建
- qos
- 消息的服务质量等级,必须为0 or 1 or 2
- retain
- 设置为True,MQTT代理保留最后一条消息,以便分发给消息发布后的订阅者
- 回调函数
- on_publish()
Return MQTTMessageInfo对象
reconnect_delay_set()
reconnect_delay_set(min_delay=1, max_delay=120)
断开连接后,客户端将自动尝试连接,每次尝试间隔 [min_delay, max_delay] 秒,从min_delay开始逐渐加倍至max_delay,连接成功后,延迟重置为min_delay
subscribe()
subscribe(topic, qos=0)
订阅一个或多个主题,该方法有三种不同的调用方式:
# 1. 字符串和整数
subscribe("my/topic", 2)
# 2. 字符串和整数元组
subscribe(("my/topic", 1))
# 3. 字符串和整数元组的列表
# 单次调用多个主题,比多次调用subscribe更有效
subscribe([("my/topic", 0), ("another/topic", 2)])
Return 一个元组 (result, mid)
- result
- 成功:MQTT_ERR_SUCCESS
- 失败:(MQTT_ERR_NO_CONN, None)
- mid
- 消息ID
- 回调函数
- on_subscribe()
unsubscribe()
unsubscribe(topic)
取消一个或多个主题
- topic
- 主题字符串或者字符串列表
Return 一个元组 (result, mid)
- 主题字符串或者字符串列表
- result
- 成功:MQTT_ERR_SUCCESS
- 失败:(MQTT_ERR_NO_CONN, None)
- mid
- 消息ID
- 回调函数
- on_unsubscribe()
user_data_set()
user_data_set(userdata)
设置传递给回调函数的用户私有数据
username_pw_set()
username_pw_set(username,password = None)
设置用户名和密码(可选)供MQTT代理验证,必须在 connect*() 之前调用
will_set()
will_set(topic, payload=None, qos=0, retain=False)
设置遗嘱发送给MQTT代理,如果客户端在没有调用 disconnect() 的情况下断开连接,则MQTT代理将会代表它发送该消息
- topic
- 遗嘱消息发布的主题,不能为None或者空字符
- payload
- 遗嘱发送的消息内容,如果没有赋值或者赋值为None,则将使用零长度的消息作为遗嘱。传递int或者float将会被转换为该数字的字符串, 如果想发送真正的int或者float数据,使用 struct.pack() 去创建
- qos
- 遗嘱消息的服务质量等级,必须为0 or 1 or 2
- retain
- 设置为True,MQTT代理保留最后一条消息,以便分发给消息发布后的订阅者
回调函数
on_connect()
on_connect(client, userdata, flags, rc)
MQTT代理响应客户端连接请求时( connect*() )调用
- client
- 回调返回的客户端实例
- userdata
- Client() 或 user_data_set() 中设置的私有用户数据
- flags
- MQTT代理发送的响应标识
- rc
- 连接结果
- 0:连接成功
- 1:连接被拒绝 - 协议版本
- 2:连接被拒绝 - 客户端标识符无效
- 3:连接被拒绝 - 服务器不可用
- 4:连接被拒绝 - 用户名或密码错误
- 5:连接被拒绝 - 未授权6-255:当前未使用
- 连接结果
- 示例
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
mqttc.on_connect = on_connect
on_disconnect()
当客户端与MQTT代理断开连接时 (disconnect()) 调用
on_disconnect(client, userdata, rc)
- client
- 回调返回的客户端实例
- userdata
- Client() 或 user_data_set() 中设置的私有用户数据
- rc
- 断开结果,如果是 MQTT_ERR_SUCCESS(0),则是响应disconnect()调用
- 如果是其它值,则是意外关闭
- 示例
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
mqttc.on_disconnect = on_disconnect
on_message()
on_message(client, userdata, message)
在客户端收到已订阅主题的消息,并且该消息没有被主题过滤器 message_callback_add() 匹配时调用
- client
- 回调返回的客户端实例
- userdata
- Client() 或 user_data_set() 中设置的私有用户数据
- message
- MQTTMessage实例,包含 topic、payload、qos、retain
- 示例
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
mqttc.on_message = on_message
message_callback_add()
message_callback_add(sub, callback)
定义特定订阅主题传入的消息回调,包括通配符,比如:客户端订阅了 sensor/#主题,一个回调处理 sensor/temperature,另一个回调处理 sensor/humidity
- sub
- 待过滤的主题,只能定义一个回调
- callback
- 回调函数,与 on_message() 相同形式
- 示例
# 处理温度消息回调
def temperature_callback(client, userdata, message):
print(message.topic+" "+str(message.payload))
# 处理湿度消息回调
def humidity_callback(client, userdata, message):
print(message.topic+" "+str(message.payload))
mqttc.subscribe('sensor/#')
mqttc.message_callback_add('sensor/temperature', temperature_callback)
mqttc.message_callback_add('sensor/humidity', humidity_callback)
message_callback_remove()
message_callback_remove(sub)
删除先前注册的主题/订阅特定回调
on_publish()
on_publish(client, userdata, mid)
当客户端调用 publish() 发布一条消息至MQTT代理后调用。Qos=1或2时,意味着客户端和代理完成握手,Qos=0时,仅表示消息离开客户端。
- mid
- mid变量与从相应的 publish() 返回的mid变量匹配,以允许跟踪传出的消息。
即使 publish() 调用返回,也不总意味着消息已发送
on_subscribe()
on_subscribe(client, userdata, mid, granted_qos)
当MQTT代理响应订阅请求时被调用
- mid
- mid变量匹配从相应的 subscribe() 返回的mid变量
- granted_qos
- 整数列表,它提供了代理为每个不同的订阅请求授予的QoS级别
on_unsubscribe()
on_unsubscribe(client, userdata, mid)
当代理响应取消订阅请求时调用
- mid
- mid匹配从相应的 unsubscribe() 返回的mid变量
on_log()
on_log(client, userdata, level, buf)
当客户端有日志信息时调用
- level
- 消息严重性
- MQTT_LOG_INFO
- MQTT_LOG_NOTICE
- MQTT_LOG_WARNING
- MQTT_LOG_ERR
- MQTT_LOG_DEBUG
- 消息严重性
- buf
- 该消息本身就在buf里
可以与标准的Python logging同时使用,通过enable_logger()方法启用
- 该消息本身就在buf里
示例
import paho.mqtt.client as mq_tt
import logging
logging.basicConfig(level='DEBUG', format='%(asctime)s [%(name)s:%(lineno)d] [%(levelname)s]- %(message)s')
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("topicTest/#")
def topic_one_callback(client, userdata, message):
print(message.topic+" "+str(message.payload))
def topic_two_callback(client, userdata, message):
print(message.topic+" "+str(message.payload))
mq_client = mq_tt.Client(client_id='www.cooooder.com')
mq_client.enable_logger()
mq_client.on_connect = on_connect
mq_client.message_callback_add("topicTest/one", topic_one_callback)
mq_client.message_callback_add("topicTest/two", topic_two_callback)
# 连接到EMQX Broker MQTT代理
mq_client.connect("127.0.0.1", 1883, 60)
mq_client.loop_start()