[Java]SpringBoot2整合mqtt服务器EMQ实现消息订阅发布入库(二)

   日期:2021-04-16     浏览:345    评论:0    
核心提示:大纲一.数据入库1.数据入库解决方案二.开发实时订阅发布展示页面1.及时通讯技术2.技术整合

 

又到了显摆分享技术的时候了

没有看过上一篇文章的可以先看一下,这篇是在上一篇基础上接着添加功能

SpringBoot2整合mqtt服务器EMQ实现消息订阅发布入库(一)

这篇文章的流程为:

1.Springboot将订阅的数据入库

2.开发实时订阅/发布展示页面

这篇文章是整合mqtt服务器的消息订阅发布的最后一篇,在掌握这两篇文章后可以搭建自己的mqtt业务处理服务器,强调一下,做这些的目的就是为了搭建自己的物联网服务器,自己开发数据接入逻辑,自己开发数据处理逻辑,自己开发页面展示,而不是直接依赖无脑的傻瓜式阿里云的物联网整合方案,这篇文章就是为了自己搭建出相同功能的项目, 不要再问我能不能接入阿里云的数据处理展示服务了,如果没看懂我写的文章,我很抱歉,右上角有个叉,欢迎点击

 

接着上一篇文章继续写一下订阅的数据如何入库

目录

一.Springboot将订阅的数据入库

1.使用ApplicationContextAware

2.重写PushCallback类

二.开发实时订阅/发布展示页面

1.页面最终展示效果

2.实现 订阅主题 列出已订阅主题 取消订阅 数据发布 功能

3.实时订阅展示 使用WebSocket实现

三.总结

后记:

预告:

一.Springboot将订阅的数据入库

由于直接使用注解注入方法会抛出异常报空,我的理解就是因为spring的容器加载顺序的原因,用于订阅的PushCallback类实现的MqttCallback接口包括具体方法已经注入到spring的容器中,而@Autowired注解的入库方法是后注入容器的结果导致实现MqttCallback接口的方法时读取不到才抛出空指针

1.使用ApplicationContextAware

使用该类实例化后就可以手动获取Bean的注入对象,首先创建一个类来实现这个接口,具体代码如下

后台SpringUtil 类 用于手动注入其他类,用于解决使用@Autowired注入报空指针问题

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;


@Component
public class SpringUtil implements ApplicationContextAware {
    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if(SpringUtil.applicationContext == null){
            SpringUtil.applicationContext  = applicationContext;
        }
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public static Object getBean(String name){
        return getApplicationContext().getBean(name);
    }

    public static <T> T getBean(Class<T> clazz){
        return getApplicationContext().getBean(clazz);
    }

    public static <T> T getBean(String name,Class<T> clazz){
        return getApplicationContext().getBean(name, clazz);
    }


}

这样我们就通过实例化调用getBean()方法就可以在接收到订阅数据后进行入库逻辑

XXXServiceImpl XXX = SpringUtil.getBean(XXXServiceImpl.class);

需要修改PushCallback类,就可以调用无法注入的方法

2.重写PushCallback类

由于解决了将入库方法注入到容器中的问题,接下来就需要改写PushCallback类的messageArrived方法

修改后的代码如下

后台PushCallback类

@Slf4j
@Component
public class PushCallback implements MqttCallback {
 
 
    private MqttPushClient client;
    private MqttConfiguration mqttConfiguration;
 
 
    public PushCallback(MqttPushClient client ,MqttConfiguration mqttConfiguration) {
        this.client = client;
        this.mqttConfiguration = mqttConfiguration;
    }
 
    @Override
    public void connectionLost(Throwable cause) {
        
        if(client != null) {
            while (true) {
                try {
                    log.info("==============》》》[MQTT] 连接断开,5S之后尝试重连...");
                    Thread.sleep(5000);
                    MqttPushClient mqttPushClient = new MqttPushClient();
                    mqttPushClient.connect(mqttConfiguration);
                    if(MqttPushClient.getClient().isConnected()){
                        log.info("=============>>重连成功");
                    }
                    break;
                } catch (Exception e) {
                    log.error("=============>>>[MQTT] 连接断开,重连失败!<<=============");
                    continue;
                }
            }
        }
        log.info(cause.getMessage());
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //publish后会执行到这里
        log.info("publish后会执行到这里");
        log.info("pushComplete==============>>>" + token.isComplete());
    }
 
 
    
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        String Payload = new String(message.getPayload());
 
        log.info("============》》接收消息主题 : " + topic);
        log.info("============》》接收消息Qos : " + message.getQos());
        log.info("============》》接收消息内容 : " + Payload);
        log.info("============》》接收ID : " + message.getId());
        log.info("接收数据结束 下面可以执行数据处理操作");
        //将json转map,方便读取数据
        JSONObject json  =  JSONObject.parseObject(Payload);
        Map<String,Object> MapJson = json.getInnerMap();
        //实例化入库方法 这里就用到SpringUtil类 来手动的注入
        WeatherServiceImpl mqttDataService = SpringUtil.getBean(WeatherServiceImpl.class);
        //调用入库方法
        mqttDataService.WeatherStorage(MapJson);
        
        //与页面实时通信使用,下面会讲
        WebSocketServiceImpl socketService = SpringUtil.getBean(WebSocketServiceImpl.class);
        socketService.SendRealTimeData(topic , json.toString() );
    }
}

其实就是在引入SpringUtil 类后直接调用入库方法即可

接下来的入库方法想必大家都会编写,这里就不在赘述了
 

到目前为止已经可以满足一个中小型的物联网服务器数据交互使用,数据入库等操作已经可以满足,不要在跟我讲有没有想过接入阿里云的物联网了,你都阿里云的物联网还看我文章干什么

 

二.开发实时订阅/发布展示页面

到现在为止一个拥有完整的mqtt数据发布订阅入库功能的物联网服务器已经开发的差不多了,目前已经可以实现物联网服务器的基础功能,可能有的人会认为实现的功能太过简单,对没错是很简单,我说的是实现的技术很简单,一个简单的技术还需要依赖别人给你提供解决方案吗,作为开发人员当有了必要的基础功能时,就可以根据自己的需求构思开发,开发自己的程序

那么接下来就是开发一个方便调试的页面将mqtt的发布订阅功能整合到页面中

功能清单:

1.订阅主题

2.列出已订阅主题

3.取消订阅

4.数据发布

5.实时订阅展示

1.页面最终展示效果

 

 

2.实现 订阅主题 列出已订阅主题 取消订阅 数据发布 功能

前四点都是上一篇文章里写好了调用方法的,这里直接写Controller方法,来调用这些方法

有一点需要注意,也是我当时偷懒和学习不足导致的,原本我的订阅与取消订阅用的是get方法接收通过url传递的参数,结果没想到mqtt的主题不单单是一个名称那么简单

mqtt有分隔符与通配符

主题层级分隔符  "/"

单层通配符  "+"

多层通配符  "#"

常见的主题写法为 : test1

但也有部分使用这些分隔符通配符就会出现 如 test1/names

结果导致get的url传参收到影响,所以已经将方法改为post+json方式传递主题

 后台MqttController类

import com.alibaba.fastjson.JSONObject;
import com.zdr.ahairteeter.mod_MQTT.MQTTTOOL.MqttPushClient;
import com.zdr.ahairteeter.mod_MQTT.MQTTTOOL.MqttSender;
import com.zdr.ahairteeter.mod_Tools.Tool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.ModelAndView;

import javax.servlet.http.HttpSession;
import java.util.Map;


@Controller
@Slf4j

@ResponseBody
@RequestMapping("/mqtt")
public class MqttController {

    //发送逻辑
    @Autowired
    private MqttSender mqttSender;

    //订阅逻辑
    @Autowired
    private MqttPushClient mqttPushClient;






    @PostMapping("/sendmqtt")
    public String sendmqttTexts(@RequestBody Map<String,String> typelist , HttpSession session){
        if (whatJson(typelist.get("json"))){
            log.info("    本机主题:"+typelist.get("topic")+" 发送数据为:"+JSONObject.toJSONString(typelist.get("json")));
            mqttSender.send(typelist.get("topic"), typelist.get("json"));
        } else {
            log.info("发送的数据非JSON格式");
            return "非JSON格式发送失败";
        }
        return "发送结束";
    }



    
    @RequestMapping("/getsubscribetopic")
    public Object getsubscribetopic( @RequestBody Map<String,String> typelist  , HttpSession session ){
        int Qos=1;
        String key = "";
        if(typelist!=null && typelist.size() > 0){
            key = typelist.get("topic");
            log.info("订阅主题为:"+key);
            String[] topics={key};
            int[] qos={Qos};
            mqttPushClient.subscribe(topics,qos);
            return key;
        }else{
            return "主题不能为空";
        }
    }

    
    @RequestMapping("/getcallsubscribe")
    public Object getcallsubscribe( @RequestBody Map<String,String> typelist  , HttpSession session ){
        String key = "";
        if(typelist!=null && typelist.size() > 0){
            key = typelist.get("topic");
            mqttPushClient.cleanTopic(key);
        }
        return "";
    }


    
    public boolean whatJson(String StrJson) {
        if(StringUtils.isEmpty(StrJson)){
            return false;
        }
        boolean isJsonObject = true;
        boolean isJsonArray = true;
        try {
            com.alibaba.fastjson.JSONObject.parseObject(StrJson);
        } catch (Exception e) {
            isJsonObject = false;
        }
        try {
            com.alibaba.fastjson.JSONObject.parseArray(StrJson);
        } catch (Exception e) {
            isJsonArray = false;
        }
        if(!isJsonObject && !isJsonArray){ //不是json格式
            return false;
        }
        return true;
    }




}

 

3.实时订阅展示 使用WebSocket实现

但想要做到在前端页面实时展示订阅的数据就不太好实现

简单的办法是前端页面定时查询订阅到的数据并展示在前端,能做到页面加载数据,但做不到实时展示

有些数据的发送间隔会很短,就会出现定时的时间比发送间隔要长,很不推荐

这里使用WebSocket方法实现数据的实时加载

首先在pom.xml里添加依赖

 配置文件pom.xml

            <!--webSocket实时通信 socket依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>

 

开发实时通信的程序

后台WebSocketServiceImpl 

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;


@Service
public class WebSocketServiceImpl {

        @Autowired
        private SimpMessagingTemplate messagingTemplate;

        
        public void SendRealTimeData(String topic , String texts){
            SimpleDateFormat DT7 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
            String DateTime = "";
            Date date = new Date();
            DateTime = DT7.format(date);
            messagingTemplate.convertAndSend("/topic/mqttreal", DateTime+" 主题 :"+topic+"  参数 : "+texts );
        }

}

 

后台WebSocketConfig类 

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {


    // 设置socket连接
    @Override
    public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
        stompEndpointRegistry.addEndpoint("/simple").withSockJS();
        //.setAllowedOrigins("*") //解决跨域问题
    }


    // 设置发布订阅的主题
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic", "/top");
    }



}

 

后台的代码就是这些,一切准别就绪就可以写前端的页面了,由于本人前端很垃圾,只用了简单的html css与js,不会高大上的vue

 

HTML代码

注意的是导入css与js的地方要写自己放文件的地方,不要直接复制启动一看没有加载来问我为什么,我只知道我的没有问题

html注意这里从外部引入了两个js,是前端的WebSocket需要引入的

前端HTML文件 

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.min.js"></script>
    <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
    <script th:src="@{/static/js/jquery-3.4.1.min.js}"></script>

    <script th:src="@{/static/mod_MQTT/js/browsesubscribe.js}"></script>
    <link rel="stylesheet" th:href="@{/static/mod_MQTT/css/browsesubscribe.css}"/>



</head>
<body>
<h2>MQTT订阅浏览与取消订阅</h2>

<div id='left_side_div'>
<input type="text" id="topicname" />
<button id="subscribebutt" onclick="subscribe();">订阅主题</button>
<p id="subscribelist"></p>

<input type="text" id="calltopicname" />
<button id="callsubscribebutt" onclick="callsubscribe();">取消订阅</button>

<br>
<br>

<span>主题</span><input type="text" id="topic" /><br>
<textarea rows="15" cols="40" type="text" id="testjson">编辑JSON数据</textarea><br>
<button id="sendmqtt" onclick="sendmqtt();">模拟mqtt协议发送数据</button>


</div>


<div id='right_side_div'>
    <button id="eliminate" onclick="eliminate();">清屏</button>
    <p id="callback"></p>
</div>

</body>
</html>

 

前端JS文件

var stompClient = null;

$(function(){
    connect();
    $("#subscribelist").html("");

});


function eliminate(message) {
    $("#callback").html("");
}

//实时通信 这里的/topic/mqttreal 就是后台定义的实时通信地址 
function connect() {
    var socket = new SockJS("/simple");
    stompClient = Stomp.over(socket);
    stompClient.connect({}, function(frame) {
        console.log('Connected: ' + frame);
        stompClient.subscribe('/topic/mqttreal', function(frame){
            showcallback(frame.body);
        });
    });
}

function showcallback(message) {
    $("#callback").append("<tr><td>" + message + "</td></tr>");
}


function sendmqtt() {

    var topic = $("#topic").val();
    var testjson = $("#testjson").val();

    var json = {
        'topic':topic,
        'json':testjson
    }

    $.ajax({
        type:"POST",
        url:"/mqtt/sendmqtt/",
        contentType:"application/json",  //发送信息至服务器时内容编码类型。
        dataType:"json",  // 预期服务器返回的数据类型。如果不指定,jQuery 将自动根据 HTTP 包 MIME 信息来智能判断,比如XML MIME类型就被识别为XML。
        data:JSON.stringify(json),
        success:function(retdata){

        }
    });
}



function callsubscribe() {
    $("#subscribelist").html("");
    var topic = $("#calltopicname").val();
    var json = {
        'topic':topic
    }
    $.ajax({
        type:"POST",
        url:"/mqtt/getcallsubscribe",
        contentType:"application/json",  //发送信息至服务器时内容编码类型。
        dataType:"json",  // 预期服务器返回的数据类型。如果不指定,jQuery 将自动根据 HTTP 包 MIME 信息来智能判断,比如XML MIME类型就被识别为XML。
        data:JSON.stringify(json),
        error: function(data){
            appsubscribelist(data.responseText);
        }
    });
}



function subscribe() {
    var topic = $("#topicname").val();
    var json = {
        'topic':topic
    }
    $.ajax({
        type:"POST",
        url:"/mqtt/getsubscribetopic",
        contentType:"application/json",  //发送信息至服务器时内容编码类型。
        dataType:"json",  // 预期服务器返回的数据类型。如果不指定,jQuery 将自动根据 HTTP 包 MIME 信息来智能判断,比如XML MIME类型就被识别为XML。
        data:JSON.stringify(json),
        error: function(data){
            appsubscribelist(data.responseText);
        }
    });
}


function appsubscribelist(message) {
    $("#subscribelist").append("<div>" + message + "</div>");
}

 

css

#testjson{
    font-size: 20px;

}

#left_side_div{
    float:left;
    display: inline-block;
    width: 30%;
    height: 500px;
}
#right_side_div{
    float:left;
    display: inline-block;
    width: 50%;
    height: 500px;
}

 

 

 

前端就是这样了,所有的准备功能做已经做好了,可以启动项目测试一下,首先要将搭建的开源mqtt服务器启动,其次启动Springboot项目,在浏览器输入项目地址以及页面的路由地址

最后页面就像

可以看右侧实时订阅展示,因为使用的是实时通信的方式接受服务器数据,时间都在毫秒级别,如果使用定时刷新肯定不能及时反馈

 

三.总结

mqtt物联网服务器的搭建已经到这里就结束了,只要满足数据入库,接下来的功能就可以自己编写了,测试的页面也有了方便测试物联网设备调试数据发送

我也休息休息,短时间内不会有新的单片机的开发想法,主要是没什么好玩的点子,如果有有趣的点子可以分享一下,好玩的话我也会做一做玩玩

 

后记:

写不下去了,好累啊,大好的周末时间竟然还在值班,我去,受不了了

给你看们看看我写这段话的时间

跟人事提提,点给他,问问单位发不发对象,不发我就不干了跳槽,****的****的我要跳槽,真受不了**********,啊啊啊啊啊*******

************

就是这样,文章写得不错,源码什么的如果有需要就先和我聊聊天,聊好了我会给原码的,就不上传浪费大家的积分了

还有想做伸手党的注意了,以后再有上来就理直气壮只要原码不聊天的一律装死不管

 

预告:

没想到三月就写差不多的文章拖到四月中旬才发布,有点懒了,下一篇文章我要显摆一下我的爬虫技术了

预告 : 使用爬虫做到需要登录的网站每日自动签到

 

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
更多>相关资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服