又到了显摆分享技术的时候了
这篇文章基于前两篇文章之上,使用物联网行业开源的MQTT服务器接收数据,使Web行业热门的Springboot框架项目订阅与发布与数据入库与展示
如果对数据上传不是很了解的可以看我之前的文章
Stm32f103c8t6+ESP8266-01s+DHT11 实现向服务器上传温湿度数据
Springboot+STM32+ESP8266 使用HTTP的GET与POST发送请求向Springboot项目上传数据并展示
这篇文章的流程为:
1.搭建开源的mqtt服务器
2.创建Springboot项目
3.Springboot中整合MQTT
4.使用Springboot订阅与发布数据
5.Springboot将订阅的数据入库
6.开发实时订阅/发布展示页面
我会分为两篇文章分享,当然如果两篇放不下就再来一篇
目录
一.开源物联网服务器EMQ
为什么用MQTT
搭建mqtt服务
EMQ管理页面
小结
二.搭建Springboot2+mqtt服务工程
搭建mqtt环境
实现mqtt功能代码
小结
三.总结
一.开源物联网服务器EMQ
为什么用MQTT
提到物联网能想到的就是万物互联上云,而所谓的上云就是数据上传到网络服务器,上传多使用的是MQTT协议
刚刚接触ESP8266模块的时候首先知道的数据发送协议为TCP UDP,而这两个协议都是基于HTTP协议基础上使用的
而MQTT也是基于HTTP协议的TCP之上诞生的
那为什么物联网行业都推荐使用Mqtt来发送数据呢
我的理解两个方面一个是省流量省带宽,一个是心跳包
省流量省带宽:因为数据报文头字节数小,协议的字节小,由于一条数据的字节数非常少,发送所需的流量少,有些移动设备只需要2G网络就可以上传数据
心跳包: 物联网设备需要时刻知道设备是否在线,如果设备离线没有及时反馈会影响小则影响数据上传,大则影响设备生产,通过发送小巧的心跳包让服务器知道设备是否在线
其实上面都是都是废话用来占文章长度的,真正的干货在下面
搭建mqtt服务
直接使用开源的Mqtt搭建方案 官网地址: EMQ 开源mqtt服务器搭建方案官网
这页面,这样式,这感觉,我就做不出来
直接点击免费试用 会看到有三个版本,作为勤俭节约的好青年,我选择了 EMQ X Broker
有财力的或者为解决企业级问题的可以选择试用EMQ X Enterprise 并且直接关掉这篇文章了,我就是为了用免费也能将数据入库才写的文章
我这里对各版本的差别描述的比价笼统具体的差异可以看官方文档 : 具体不同版本差异可见文档
没问题了就下一步
我们选择第一个 EMQ X Broker
下面就会让你选择要下载的版本,使用的什么平台以及平台版本什么的,下载也是直接在服务器上直接执行写好的指令,等待下载完成,按照安装运行的步骤走完,mqtt就搭建好并启动了
EMQ管理页面
启动后在浏览器输入你的IP:18083
如172.0.0.1:18038,就会跳转到mqtt的管理页面,初始账号admin 密码public
登录成功后会看到
非常酷炫是吧,老酷炫了,我都看着累眼睛,而且都是英文这是啥啊(地铁老头手机),不是我英语不好我是为了大众考虑,我们需要修改一下
小结
Mqtt已经搭建起来了,如有物联网设备都可用通过ip:1883来访问平台
如172.0.0.1:1883向平台发送指定主题的数据,也可以通过平台订阅指定的主题
具体EMQ平台上左侧菜单都有什么功能可以自行百度,如我有时间(除非我真闲得慌)也会整理一下
因为这个版本功能很少,多数功能都是通过Springboot来实现
二.搭建Springboot2+mqtt服务工程
由于考虑有的开发人员是用eclipse有的使用idea所以这里
是不是以为我要把两种编译器创建Springboot的方法写出来,是不是让我猜中了,恭喜你这段我不写
搭建mqtt环境
在pom.xml里引入mqtt包
<!--MQTT使用包-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
在application.yml里添加mqtt配置参数
spring:
mqtt:
broker: tcp://{这里写EMQ部署的地址}:1883
clientId: 123456 #客户端的id
username: 登录账号
password: 登录密码
timeout: 2000
KeepAlive: 20
topics: ServerId_02 #主题
qos: 1 #心跳包级别
实现mqtt功能代码
创建一个类读取application.yml中的配置
这里说一下我这里用了@Getter与@Setter注解会自动创建getset方法,用不了就用idea自带的方法创建getset方法
@Getter
@Setter
@Component
@Configuration
public class MqttConfiguration {
@Value("${spring.mqtt.broker}")
private String host;
@Value("${spring.mqtt.clientId}")
private String clientId;
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.timeout}")
private int timeout;
@Value("${spring.mqtt.KeepAlive}")
private int KeepAlive;
@Value("${spring.mqtt.topics}")
private String topics;
@Value("${spring.mqtt.qos}")
private int qos;
}
在启动类Application添加mqtt初始化方法
@SpringBootApplication
public class Application implements ApplicationRunner{
//读取mqtt配置
@Autowired
private MqttConfiguration mqttConfiguration;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
if(true){
if (log.isInfoEnabled()){
log.info("===============>>>Mqtt is run starting:<<==================");
}
MqttPushClient mqttPushClient = new MqttPushClient();
mqttPushClient.connect(mqttConfiguration);
}
}
}
Mqttbeanbak类
@Component
public class Mqttbeanbak {
@Autowired
private MqttConfiguration mqttConfiguration;
@Bean("mqttPushClient")
public MqttPushClient getMqttPushClient() {
MqttPushClient mqttPushClient = new MqttPushClient();
return mqttPushClient;
}
}
MqttPushClient类
@Slf4j
public class MqttPushClient {
private static MqttClient client;
public static MqttClient getClient() {
return client;
}
public static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) {
//MQTT连接设置
MqttConnectOptions option = new MqttConnectOptions();
//设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
option.setCleanSession(false);
//设置连接的用户名
option.setUserName(userName);
//设置连接的密码
option.setPassword(password.toCharArray());
//设置超时时间 单位为秒
option.setConnectionTimeout(outTime);
//设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
option.setKeepAliveInterval(KeepAlive);
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
//option.setWill(topic, "close".getBytes(StandardCharsets.UTF_8), 2, true);
option.setMaxInflight(1000);
return option;
}
public void connect(MqttConfiguration mqttConfiguration) {
MqttClient client;
try {
client = new MqttClient(mqttConfiguration.getHost(), mqttConfiguration.getClientId(), new MemoryPersistence());
MqttConnectOptions options = getOption(mqttConfiguration.getUsername(), mqttConfiguration.getPassword(),
mqttConfiguration.getTimeout(), mqttConfiguration.getKeepAlive());
MqttPushClient.setClient(client);
try {
client.setCallback(new PushCallback(this, mqttConfiguration));
if (!client.isConnected()) {
client.connect(options);
log.info("================>>>MQTT连接成功<<======================");
} else {//这里的逻辑是如果连接不成功就重新连接
client.disconnect();
client.connect(options);
log.info("===================>>>MQTT断连成功<<<======================");
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public Boolean reConnect() throws Exception {
Boolean isConnected = false;
if (null != client) {
client.connect();
if (client.isConnected()) {
isConnected = true;
}
}
return isConnected;
}
public void publish(String topic, String pushMessage) {
publish(0, false, topic, pushMessage);
}
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
try {
message.setPayload(pushMessage.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
if (null == mTopic) {
log.error("===============>>>MQTT topic 不存在<<=======================");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
public void publish(int qos, String topic, String pushMessage) {
publish(qos, false, topic, pushMessage);
}
public void subscribe(String[] topic, int[] qos) {
try {
MqttPushClient.getClient().unsubscribe(topic);
MqttPushClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void cleanTopic(String topic) {
try {
MqttPushClient.getClient().unsubscribe(topic);
} catch (MqttException e) {
log.error(e.getMessage());
e.printStackTrace();
}
}
}
MqttSender类
@Component(value = "mqttSender")
@Slf4j
public class MqttSender
{
@Async
public void send(String queueName, String msg) {
log.info("=====================>>>>发送主题"+queueName);
publish(2,queueName, msg);
}
public void publish(String topic,String pushMessage){
publish(1, false, topic, pushMessage);
}
public void publish(int qos,boolean retained,String topic,String pushMessage){
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
try {
message.setPayload(pushMessage.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
if(null == mTopic){
log.error("===================>>>MQTT topic 不存在<<=================");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
log.error("============>>>publish fail",e);
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
public void publish(int qos, String topic, String pushMessage){
publish(qos, false, topic, pushMessage);
}
}
PushCallback类
这个类要多留意,因为订阅的数据会进入messageArrived方法,如果想将数据处理并保存需要在这个类里操作
由于该类不能使用@Autowired调用数据处理数据入库方法,所以需要特殊处理
下一篇文章会特别详细的讲解
@Slf4j
@Component
public class PushCallback implements MqttCallback {
private MqttPushClient client;
private MqttConfiguration mqttConfiguration;
public PushCallback(MqttPushClient client ,MqttConfiguration mqttConfiguration) {
this.client = client;
this.mqttConfiguration = mqttConfiguration;
}
@Override
public void connectionLost(Throwable cause) {
if(client != null) {
while (true) {
try {
log.info("==============》》》[MQTT] 连接断开,5S之后尝试重连...");
Thread.sleep(5000);
MqttPushClient mqttPushClient = new MqttPushClient();
mqttPushClient.connect(mqttConfiguration);
if(MqttPushClient.getClient().isConnected()){
log.info("=============>>重连成功");
}
break;
} catch (Exception e) {
log.error("=============>>>[MQTT] 连接断开,重连失败!<<=============");
continue;
}
}
}
log.info(cause.getMessage());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
log.info("publish后会执行到这里");
log.info("pushComplete==============>>>" + token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
String Payload = new String(message.getPayload());
log.info("============》》接收消息主题 : " + topic);
log.info("============》》接收消息Qos : " + message.getQos());
log.info("============》》接收消息内容 : " + Payload);
log.info("============》》接收ID : " + message.getId());
log.info("接收数据结束 下面可以执行数据处理操作");
}
}
MqttController类
@Controller
@Slf4j
@ResponseBody
@RequestMapping("/mqtt")
public class MqttController {
//发送逻辑
@Autowired
private MqttSender mqttSender;
//订阅逻辑
@Autowired
private MqttPushClient mqttPushClient;
@RequestMapping("/sendmqtt")
public String sendmqtt(){
String TOPIC1="testtest1";
String JSON = "{'gender':'man','hobby':'girl'}";
log.info(" 本机主题:"+TOPIC1+" 发送数据为:"+JSONObject.toJSONString(JSON));
mqttSender.send(TOPIC1, JSON);
log.info(" 发送结束");
return "发送结束";
}
@RequestMapping("/subscriptionmqtt")
public String subscriptionmqtt(){
int Qos=1;
String TOPIC1="testtest1";
String[] topics={TOPIC1};
int[] qos={Qos};
mqttPushClient.subscribe(topics,qos);
return "订阅主题";
}
}
整合的mqtt的代码就是这些,下面是测试是否可以与mqtt服务器通信
在没有订阅时直接发送数据
发送数据访问 http://127.0.0.1:8080/mqtt/sendmqtt
现在订阅该主题并在次访问 http://127.0.0.1:8085/mqtt/subscriptionmqtt
再次发送数据就会看到由于订阅了该主题,我们向该主题发送的数据就会被订阅,就会接收到发送的
小结
可以看到订阅方法已经可以读到发送的数据了,
有人可能想到在接收方法中使用@Autowired注解注入数据入库方法,这样数据就可以存入数据库了,
那么恭喜你你会看到你的mqtt连接会不断的短线重连并且报空指针
你会发现使用@Autowired注入的方法是空的,这就是下一篇文章要讲的内容了
三.总结
到目前为止简单的EMQ服务部署以及Springboot的mqtt环境已经搭建好了,可以实现简单的数据发送主题订阅
也算是回归我的本行进行开发,从硬件单片机到软件程序开发,有趣非常有趣
我也希望能与志同道合的人相互交流分享技术,最好能交个女朋友
哎,可能写博客是我为数不多的可以展示自我的方式了
想要交流技术的可以直接私信我,我脾气很好的(点头)
下一篇 解决注入方法报空指针并将订阅接到的数据入库 , 再开发一个实时的可发送/订阅的工具页面
先把页面发出来显摆展示一下