又到了显摆分享技术的时候了
没有看过上一篇文章的可以先看一下,这篇是在上一篇基础上接着添加功能
SpringBoot2整合mqtt服务器EMQ实现消息订阅发布入库(一)
这篇文章的流程为:
1.Springboot将订阅的数据入库
2.开发实时订阅/发布展示页面
这篇文章是整合mqtt服务器的消息订阅发布的最后一篇,在掌握这两篇文章后可以搭建自己的mqtt业务处理服务器,强调一下,做这些的目的就是为了搭建自己的物联网服务器,自己开发数据接入逻辑,自己开发数据处理逻辑,自己开发页面展示,而不是直接依赖无脑的傻瓜式阿里云的物联网整合方案,这篇文章就是为了自己搭建出相同功能的项目, 不要再问我能不能接入阿里云的数据处理展示服务了,如果没看懂我写的文章,我很抱歉,右上角有个叉,欢迎点击
接着上一篇文章继续写一下订阅的数据如何入库
目录
一.Springboot将订阅的数据入库
1.使用ApplicationContextAware
2.重写PushCallback类
二.开发实时订阅/发布展示页面
1.页面最终展示效果
2.实现 订阅主题 列出已订阅主题 取消订阅 数据发布 功能
3.实时订阅展示 使用WebSocket实现
三.总结
后记:
预告:
一.Springboot将订阅的数据入库
由于直接使用注解注入方法会抛出异常报空,我的理解就是因为spring的容器加载顺序的原因,用于订阅的PushCallback类实现的MqttCallback接口包括具体方法已经注入到spring的容器中,而@Autowired注解的入库方法是后注入容器的结果导致实现MqttCallback接口的方法时读取不到才抛出空指针
1.使用ApplicationContextAware
使用该类实例化后就可以手动获取Bean的注入对象,首先创建一个类来实现这个接口,具体代码如下
后台SpringUtil 类 用于手动注入其他类,用于解决使用@Autowired注入报空指针问题
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(SpringUtil.applicationContext == null){
SpringUtil.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name, clazz);
}
}
这样我们就通过实例化调用getBean()方法就可以在接收到订阅数据后进行入库逻辑
XXXServiceImpl XXX = SpringUtil.getBean(XXXServiceImpl.class);
需要修改PushCallback类,就可以调用无法注入的方法
2.重写PushCallback类
由于解决了将入库方法注入到容器中的问题,接下来就需要改写PushCallback类的messageArrived方法
修改后的代码如下
后台PushCallback类
@Slf4j
@Component
public class PushCallback implements MqttCallback {
private MqttPushClient client;
private MqttConfiguration mqttConfiguration;
public PushCallback(MqttPushClient client ,MqttConfiguration mqttConfiguration) {
this.client = client;
this.mqttConfiguration = mqttConfiguration;
}
@Override
public void connectionLost(Throwable cause) {
if(client != null) {
while (true) {
try {
log.info("==============》》》[MQTT] 连接断开,5S之后尝试重连...");
Thread.sleep(5000);
MqttPushClient mqttPushClient = new MqttPushClient();
mqttPushClient.connect(mqttConfiguration);
if(MqttPushClient.getClient().isConnected()){
log.info("=============>>重连成功");
}
break;
} catch (Exception e) {
log.error("=============>>>[MQTT] 连接断开,重连失败!<<=============");
continue;
}
}
}
log.info(cause.getMessage());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
log.info("publish后会执行到这里");
log.info("pushComplete==============>>>" + token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
String Payload = new String(message.getPayload());
log.info("============》》接收消息主题 : " + topic);
log.info("============》》接收消息Qos : " + message.getQos());
log.info("============》》接收消息内容 : " + Payload);
log.info("============》》接收ID : " + message.getId());
log.info("接收数据结束 下面可以执行数据处理操作");
//将json转map,方便读取数据
JSONObject json = JSONObject.parseObject(Payload);
Map<String,Object> MapJson = json.getInnerMap();
//实例化入库方法 这里就用到SpringUtil类 来手动的注入
WeatherServiceImpl mqttDataService = SpringUtil.getBean(WeatherServiceImpl.class);
//调用入库方法
mqttDataService.WeatherStorage(MapJson);
//与页面实时通信使用,下面会讲
WebSocketServiceImpl socketService = SpringUtil.getBean(WebSocketServiceImpl.class);
socketService.SendRealTimeData(topic , json.toString() );
}
}
其实就是在引入SpringUtil 类后直接调用入库方法即可
接下来的入库方法想必大家都会编写,这里就不在赘述了
到目前为止已经可以满足一个中小型的物联网服务器数据交互使用,数据入库等操作已经可以满足,不要在跟我讲有没有想过接入阿里云的物联网了,你都阿里云的物联网还看我文章干什么
二.开发实时订阅/发布展示页面
到现在为止一个拥有完整的mqtt数据发布订阅入库功能的物联网服务器已经开发的差不多了,目前已经可以实现物联网服务器的基础功能,可能有的人会认为实现的功能太过简单,对没错是很简单,我说的是实现的技术很简单,一个简单的技术还需要依赖别人给你提供解决方案吗,作为开发人员当有了必要的基础功能时,就可以根据自己的需求构思开发,开发自己的程序
那么接下来就是开发一个方便调试的页面将mqtt的发布订阅功能整合到页面中
功能清单:
1.订阅主题
2.列出已订阅主题
3.取消订阅
4.数据发布
5.实时订阅展示
1.页面最终展示效果
2.实现 订阅主题 列出已订阅主题 取消订阅 数据发布 功能
前四点都是上一篇文章里写好了调用方法的,这里直接写Controller方法,来调用这些方法
有一点需要注意,也是我当时偷懒和学习不足导致的,原本我的订阅与取消订阅用的是get方法接收通过url传递的参数,结果没想到mqtt的主题不单单是一个名称那么简单
mqtt有分隔符与通配符
主题层级分隔符 "/"
单层通配符 "+"
多层通配符 "#"
常见的主题写法为 : test1
但也有部分使用这些分隔符通配符就会出现 如 test1/names
结果导致get的url传参收到影响,所以已经将方法改为post+json方式传递主题
后台MqttController类
import com.alibaba.fastjson.JSONObject;
import com.zdr.ahairteeter.mod_MQTT.MQTTTOOL.MqttPushClient;
import com.zdr.ahairteeter.mod_MQTT.MQTTTOOL.MqttSender;
import com.zdr.ahairteeter.mod_Tools.Tool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.ModelAndView;
import javax.servlet.http.HttpSession;
import java.util.Map;
@Controller
@Slf4j
@ResponseBody
@RequestMapping("/mqtt")
public class MqttController {
//发送逻辑
@Autowired
private MqttSender mqttSender;
//订阅逻辑
@Autowired
private MqttPushClient mqttPushClient;
@PostMapping("/sendmqtt")
public String sendmqttTexts(@RequestBody Map<String,String> typelist , HttpSession session){
if (whatJson(typelist.get("json"))){
log.info(" 本机主题:"+typelist.get("topic")+" 发送数据为:"+JSONObject.toJSONString(typelist.get("json")));
mqttSender.send(typelist.get("topic"), typelist.get("json"));
} else {
log.info("发送的数据非JSON格式");
return "非JSON格式发送失败";
}
return "发送结束";
}
@RequestMapping("/getsubscribetopic")
public Object getsubscribetopic( @RequestBody Map<String,String> typelist , HttpSession session ){
int Qos=1;
String key = "";
if(typelist!=null && typelist.size() > 0){
key = typelist.get("topic");
log.info("订阅主题为:"+key);
String[] topics={key};
int[] qos={Qos};
mqttPushClient.subscribe(topics,qos);
return key;
}else{
return "主题不能为空";
}
}
@RequestMapping("/getcallsubscribe")
public Object getcallsubscribe( @RequestBody Map<String,String> typelist , HttpSession session ){
String key = "";
if(typelist!=null && typelist.size() > 0){
key = typelist.get("topic");
mqttPushClient.cleanTopic(key);
}
return "";
}
public boolean whatJson(String StrJson) {
if(StringUtils.isEmpty(StrJson)){
return false;
}
boolean isJsonObject = true;
boolean isJsonArray = true;
try {
com.alibaba.fastjson.JSONObject.parseObject(StrJson);
} catch (Exception e) {
isJsonObject = false;
}
try {
com.alibaba.fastjson.JSONObject.parseArray(StrJson);
} catch (Exception e) {
isJsonArray = false;
}
if(!isJsonObject && !isJsonArray){ //不是json格式
return false;
}
return true;
}
}
3.实时订阅展示 使用WebSocket实现
但想要做到在前端页面实时展示订阅的数据就不太好实现
简单的办法是前端页面定时查询订阅到的数据并展示在前端,能做到页面加载数据,但做不到实时展示
有些数据的发送间隔会很短,就会出现定时的时间比发送间隔要长,很不推荐
这里使用WebSocket方法实现数据的实时加载
首先在pom.xml里添加依赖
配置文件pom.xml
<!--webSocket实时通信 socket依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
开发实时通信的程序
后台WebSocketServiceImpl
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
@Service
public class WebSocketServiceImpl {
@Autowired
private SimpMessagingTemplate messagingTemplate;
public void SendRealTimeData(String topic , String texts){
SimpleDateFormat DT7 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
String DateTime = "";
Date date = new Date();
DateTime = DT7.format(date);
messagingTemplate.convertAndSend("/topic/mqttreal", DateTime+" 主题 :"+topic+" 参数 : "+texts );
}
}
后台WebSocketConfig类
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
// 设置socket连接
@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
stompEndpointRegistry.addEndpoint("/simple").withSockJS();
//.setAllowedOrigins("*") //解决跨域问题
}
// 设置发布订阅的主题
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic", "/top");
}
}
后台的代码就是这些,一切准别就绪就可以写前端的页面了,由于本人前端很垃圾,只用了简单的html css与js,不会高大上的vue
HTML代码
注意的是导入css与js的地方要写自己放文件的地方,不要直接复制启动一看没有加载来问我为什么,我只知道我的没有问题
html注意这里从外部引入了两个js,是前端的WebSocket需要引入的
前端HTML文件
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.min.js"></script>
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
<script th:src="@{/static/js/jquery-3.4.1.min.js}"></script>
<script th:src="@{/static/mod_MQTT/js/browsesubscribe.js}"></script>
<link rel="stylesheet" th:href="@{/static/mod_MQTT/css/browsesubscribe.css}"/>
</head>
<body>
<h2>MQTT订阅浏览与取消订阅</h2>
<div id='left_side_div'>
<input type="text" id="topicname" />
<button id="subscribebutt" onclick="subscribe();">订阅主题</button>
<p id="subscribelist"></p>
<input type="text" id="calltopicname" />
<button id="callsubscribebutt" onclick="callsubscribe();">取消订阅</button>
<br>
<br>
<span>主题</span><input type="text" id="topic" /><br>
<textarea rows="15" cols="40" type="text" id="testjson">编辑JSON数据</textarea><br>
<button id="sendmqtt" onclick="sendmqtt();">模拟mqtt协议发送数据</button>
</div>
<div id='right_side_div'>
<button id="eliminate" onclick="eliminate();">清屏</button>
<p id="callback"></p>
</div>
</body>
</html>
前端JS文件
var stompClient = null;
$(function(){
connect();
$("#subscribelist").html("");
});
function eliminate(message) {
$("#callback").html("");
}
//实时通信 这里的/topic/mqttreal 就是后台定义的实时通信地址
function connect() {
var socket = new SockJS("/simple");
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/mqttreal', function(frame){
showcallback(frame.body);
});
});
}
function showcallback(message) {
$("#callback").append("<tr><td>" + message + "</td></tr>");
}
function sendmqtt() {
var topic = $("#topic").val();
var testjson = $("#testjson").val();
var json = {
'topic':topic,
'json':testjson
}
$.ajax({
type:"POST",
url:"/mqtt/sendmqtt/",
contentType:"application/json", //发送信息至服务器时内容编码类型。
dataType:"json", // 预期服务器返回的数据类型。如果不指定,jQuery 将自动根据 HTTP 包 MIME 信息来智能判断,比如XML MIME类型就被识别为XML。
data:JSON.stringify(json),
success:function(retdata){
}
});
}
function callsubscribe() {
$("#subscribelist").html("");
var topic = $("#calltopicname").val();
var json = {
'topic':topic
}
$.ajax({
type:"POST",
url:"/mqtt/getcallsubscribe",
contentType:"application/json", //发送信息至服务器时内容编码类型。
dataType:"json", // 预期服务器返回的数据类型。如果不指定,jQuery 将自动根据 HTTP 包 MIME 信息来智能判断,比如XML MIME类型就被识别为XML。
data:JSON.stringify(json),
error: function(data){
appsubscribelist(data.responseText);
}
});
}
function subscribe() {
var topic = $("#topicname").val();
var json = {
'topic':topic
}
$.ajax({
type:"POST",
url:"/mqtt/getsubscribetopic",
contentType:"application/json", //发送信息至服务器时内容编码类型。
dataType:"json", // 预期服务器返回的数据类型。如果不指定,jQuery 将自动根据 HTTP 包 MIME 信息来智能判断,比如XML MIME类型就被识别为XML。
data:JSON.stringify(json),
error: function(data){
appsubscribelist(data.responseText);
}
});
}
function appsubscribelist(message) {
$("#subscribelist").append("<div>" + message + "</div>");
}
css
#testjson{
font-size: 20px;
}
#left_side_div{
float:left;
display: inline-block;
width: 30%;
height: 500px;
}
#right_side_div{
float:left;
display: inline-block;
width: 50%;
height: 500px;
}
前端就是这样了,所有的准备功能做已经做好了,可以启动项目测试一下,首先要将搭建的开源mqtt服务器启动,其次启动Springboot项目,在浏览器输入项目地址以及页面的路由地址
最后页面就像
可以看右侧实时订阅展示,因为使用的是实时通信的方式接受服务器数据,时间都在毫秒级别,如果使用定时刷新肯定不能及时反馈
三.总结
mqtt物联网服务器的搭建已经到这里就结束了,只要满足数据入库,接下来的功能就可以自己编写了,测试的页面也有了方便测试物联网设备调试数据发送
我也休息休息,短时间内不会有新的单片机的开发想法,主要是没什么好玩的点子,如果有有趣的点子可以分享一下,好玩的话我也会做一做玩玩
后记:
写不下去了,好累啊,大好的周末时间竟然还在值班,我去,受不了了
给你看们看看我写这段话的时间
跟人事提提,点给他,问问单位发不发对象,不发我就不干了跳槽,****的****的我要跳槽,真受不了**********,啊啊啊啊啊*******
************
就是这样,文章写得不错,源码什么的如果有需要就先和我聊聊天,聊好了我会给原码的,就不上传浪费大家的积分了
还有想做伸手党的注意了,以后再有上来就理直气壮只要原码不聊天的一律装死不管
预告:
没想到三月就写差不多的文章拖到四月中旬才发布,有点懒了,下一篇文章我要显摆一下我的爬虫技术了
预告 : 使用爬虫做到需要登录的网站每日自动签到