前言
mqtt属于上层协议,与http属于同一层。建立在TCP之上的,因此所有的可以建立TCP通信的模组都可以实现mqtt
MQTT / HTTP
————————————
TLS
————————————
TCP
————————————
IP
————————————
数据链路层
————————————
物理层
————————————
硬件准备
MCU:STM32F103RC
GPRS模组:有人GM5
工具:keil_MDK
mqtt源代码:https://github.com/eclipse/paho.mqtt.embedded-c
代码实现
GRRS部分采用:原创 NB-IOT开发|nbiot开发教程《三》AT指令类模组驱动-STM32实现AT指令状态机实现TCP透传连接。
mqtt移植与实现:
mqtt为纯软件的协议,不涉及到任何的硬件,paho.mqtt.embedded-c本质就是协议的解析。通过调用api创建要发送的数据,或者调用相应api解析报文。
mqtt的移植主要难点在transport.c文件的移植与实现
transport.c文件的实现
mqtt状态定义:
- 等待tcp连接完成-> MQTT_STATE_OPEN_LINK
- 创建mqtt连接 -> MQTT_STATE_CONNECT
- mqtt订阅->MQTT_STATE_SUBCRIBLE
- 等待数据接收 ->MQTT_STATE_LOOP
- 断开连接->MQTT_STATE_DISCONNECTED
状态管理
static const mqtt_state_machine_state_op_t mqtt_state_machine_states[] =
{
mqtt_machine_state(MQTT_STATE_OPEN_LINK, MQTT_STATE_CONNECT, 1000, 100, mqtt_open_link ),
mqtt_machine_state(MQTT_STATE_CONNECT, MQTT_STATE_SUBCRIBLE, 20000, 10, mqtt_connect ),
mqtt_machine_state(MQTT_STATE_SUBCRIBLE, MQTT_STATE_LOOP, 20000, 10, mqtt_subscribe ),
mqtt_machine_state(MQTT_STATE_LOOP, MQTT_STATE_LOOP, 0, 10, mqtt_loop ),
mqtt_machine_state(MQTT_STATE_DISCONNECTED, MQTT_STATE_OPEN_LINK, 0, 10, mqtt_disconnected ),
};
transport.h文件的实现
#ifndef _TRANSPORT_H_
#define _TRANSPORT_H_
#include <MQTTPacket.h>
//int transport_sendPacketBuffer(int sock, unsigned char* buf, int buflen);
//int transport_getdata(unsigned char* buf, int count);
int transport_getdatanb(void *sck, unsigned char* buf, int count);
//int transport_open(char* host, int port);
//int transport_close(int sock);
enum
{
QOS0 =0x00,
QOS1,
QOS2
};
typedef void (*MqttCallback)(void *args);
typedef void (*MqttDataCallback)(void *args, const char* topic, uint32_t topic_len, const char *data, uint32_t lengh);
#define MQTT_MAX_SUBTOPIC 4
typedef struct MQTTSubs_topic
{
int sub_num;
int sub_index;
MQTTString subtopic[MQTT_MAX_SUBTOPIC];
int subtopic_qos[MQTT_MAX_SUBTOPIC];
}MQTTSub_topic;
typedef struct MQTTClient MQTTClient;
struct MQTTClient
{
//const char *uri; //暂时用不到
int sock;
int port;
unsigned int timeout ;//连接超时时间
MQTTPacket_connectData condata; //mqtt连接数据,一定要先初始化再赋值
MQTTSub_topic subtopicdata;//订阅主题
MqttCallback connect; //连接成功回调
MqttCallback disconnect;
MqttDataCallback datacallback;//接收到数据回调
};
void mqtt_task_init(MQTTClient *c);
int MQTT_Publish(MQTTClient *client, const char* topic, const char* data, int data_length, int qos, int retain);
应用与测试
由于受单片机内存等限制,mqtt发布只支持QOS0
#include <string.h>
#include "app_mqtt.h"
#include "transport.h"
static stimer mqtt_timer;
MQTTClient mqtt = { 0};
char usr_topic[] = "substopic";
char usr_topic2[] = "substopic2";
char pub_topic[] = "pubtopic";
char pub_data[] = "0987654321";
static uint8_t link = 0;
static uint32_t link_cnt = 0;
static uint32_t eror = 0;
void mqtt_pub_cb(void *arg,uint32_t event)
{
if(link == 0)
return ;
link_cnt++;
char pubdata[128] = { 0};
sprintf(pubdata,"mqtt publish count:%d,error:%d\r\n",link_cnt,eror);
MQTT_Publish(&mqtt, pub_topic, pubdata, strlen(pubdata), 0, 0);
}
void mqtt_connected(void *arg)
{
MQTT_Publish(&mqtt, pub_topic, pub_data, strlen(pub_data), 0, 0);
os_log("Mqtt connect succeed\r\n");
link = 1;
}
void mqtt_disconnected(void *arg)
{
//MQTT_Publish(&mqtt, pub_topic, pub_data, strlen(pub_data), 0, 0);
os_log("Mqtt disconnect\r\n");
link = 0;
eror++;
link_cnt = 0;
}
void mqtt_recv(void *args, const char* topic, uint32_t topic_len, const char *data, uint32_t lengh)
{
uint8_t mqtt_topic[32] = { 0};
uint8_t mqtt_data[128] = { 0};
memcpy(mqtt_topic,topic,topic_len);
memcpy(mqtt_data,data,lengh);
os_log("MQTT->%s:%d,%s\r\n",mqtt_topic,lengh,mqtt_data);
}
void mqtt_test_init(void)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
memcpy(&mqtt.condata,&data,sizeof(data));
mqtt.condata.clientID.cstring = "me"; //
mqtt.condata.keepAliveInterval = 60; //
mqtt.condata.cleansession = 0; //
mqtt.condata.username.cstring = "cmvxsubk";//
mqtt.condata.password.cstring = "1KRDBWdpbY3Z";//
mqtt.timeout = 30;
mqtt.connect = mqtt_connected;
mqtt.datacallback = mqtt_recv;
mqtt.disconnect = mqtt_disconnected;
mqtt.subtopicdata.subtopic[0].cstring = usr_topic;
mqtt.subtopicdata.subtopic_qos[0] = 0;
mqtt.subtopicdata.subtopic[1].cstring = usr_topic2;
mqtt.subtopicdata.subtopic_qos[1] = 1;
mqtt_task_init(&mqtt);
cola_timer_create(&mqtt_timer,mqtt_pub_cb);
cola_timer_start(&mqtt_timer,TIMER_ALWAYS,300000);
}
测试结果
代码不涉及任何复杂RTOS,采用cola_os (前后台任务管理).
最后
mqtt中的加密一般采用TLS,嵌入式系统中对应的为Mbed tls,tls加密是建立在tcp之上的,首先tcp建立连接,然后tls认证,认证完成才进行mqtt连接,目前本人并没有实现tls的认证。有兴趣的可以合作实现。