基于RabbitMQ 的 Web MQTT插件进行前端消息实时推送

   日期:2020-09-02     浏览:116    评论:0    
核心提示:目录RabbitMQWhat is AMQP, MQTT, STOMP ?How to use RabbitMQ with MQTT ?1. Docker 安装RabbitMQ2. MQTT插件启用3. 查看 rabbitmq_mqtt 默认配置4. 采用JS 前端订阅发布mqtt消息5. 采用Java 订阅发布mqtt消息6. 稍作改动可切换阿里云MQTT消息队列RabbitMQ不多说了,它是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。用它解决微服务各种服务的解耦等

目录

  • RabbitMQ
  • What is AMQP, MQTT, STOMP ?
  • How to use RabbitMQ with MQTT ?
    • 1. Docker 安装RabbitMQ
    • 2. MQTT插件启用
    • 3. 查看 rabbitmq_mqtt 默认配置
    • 4. 采用JS 前端订阅发布mqtt消息
    • 5. 采用Java 订阅发布mqtt消息
    • 6. 稍作改动可切换阿里云MQTT消息队列

RabbitMQ

不多说了,它是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。用它解决微服务各种服务的解耦等等。

What is AMQP, MQTT, STOMP ?

先搞懂一些协议层面上的东西 AMQP, MQTT,STOMP

  • AMQP代表高级消息队列协议
  • MQTT(消息队列遥测传输)
  • STOMP(简单/流式文本导向的消息传递协议)是这三种协议中唯——种基于文本的协议

说的直白了,
AMQP 用在后端微服务中比较多,RocketMQ、 Kafka等这些消息软件都实现了这种高级协议
MQTT 能传递文本、语音、图片、视频等二进制数据
STOMP 简单文本传输

MQTT协议 用在终端消息推送比较多,现在的物联网产品里面大都有MQTT技术的身影。

RabbitMQ 支持以上三种协议

选择你的消息协议 AMQP, MQTT,STOMP,请看大神博客:
https://www.yuque.com/noobwo/mq/hpiop0

How to use RabbitMQ with MQTT ?

首先请确保你对 rabbitmq使用已经熟悉了,不熟悉的请自行学习,很简单的使用。

如果不想看英文官方文档,网上不错的rabbitmq基本使用 博客
https://www.cnblogs.com/refuge/category/1395422.html

1. Docker 安装RabbitMQ

docker run -d --hostname myrabbit-test --name mq-rabbit -p 15674:15674 -p 15675:15675 -p 5672:5672 -p 15672:15672 -p 1883:1883 -e  RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=Admin123 rabbitmq:latest

端口说明:

端口 说明
5672 后台SDK TCP方式连接rabbitmq的端口,发送rabbitmq AMQP协议的消息
15672 rabbitmq 管理界面登录访问的端口
15674 rabbitmq STOMP WebSocket方式 访问的端口 ,比如JS订阅发布mqtt消息
15675 rabbitmq MQTT WebSocket方式 访问的端口 ,比如JS订阅发布mqtt消息
1883 rabbitmq MQTT TCP方式访问的端口 ,比如java订阅发布mqtt消息

RABBITMQ_DEFAULT_VHOST 相当于一个相对独立的RabbitMQ服务器,每个VirtualHost
之间是相互隔离的,里面的exchange、queue、message不能互通, VirtualHost可以看成相当于mysql中的一个Database.

2. MQTT插件启用

镜像 rabbitmq:latest 启动的容器里面,默认没有启用各个插件,请进入容器启用各个插件
STOMP 不用可以不启用该插件。

rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_web_mqtt

3. 查看 rabbitmq_mqtt 默认配置

rabbitmq的配置文件在哪里?
一般在 /etc/rabbitmq/下面,Windows可查看README.txt文,一般在 %APPDATA%\RabbitMQ\rabbitmq.config

具体说明,请查看官方文档:
https://www.rabbitmq.com/configure.html#config-file-location

如果你想使用默认配置文件时,请确保如下配置已经做好:

  • mqtt 默认vhost 为“/” 请登录rabbitmq管理后台创建该 vhost
  • mqtt 默认 exchange 为 “amq.topic”
  • mqtt 默认 用户名和密码 为 guest/guest,当然连接的时候可以指定其他用户,比如docker创建的时候指定的 admin用户,请确保该用户有访问 vhost “/”的权限

4. 采用JS 前端订阅发布mqtt消息

不多说,直接上代码,html代码如下:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="zh-CN">
<head>
    <title>Mqtt Websockets</title>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>

    <style type="text/css"> .button { background-color: #4CAF50; border: none; color: white; padding: 8px 20px; text-align: center; text-decoration: none; display: inline-block; font-size: 16px; margin: 4px 2px; cursor: pointer; } </style>
    <script type="text/javascript"> // 工具类 日期函数 function getFormatDate() { var date = new Date(); var month = date.getMonth() + 1; var strDate = date.getDate(); var strHours = date.getHours(); var strMinutes = date.getMinutes(); var strSeconds = date.getSeconds(); if (month >= 1 && month <= 9) { month = "0" + month; } if (strDate >= 0 && strDate <= 9) { strDate = "0" + strDate; } if (strHours >= 0 && strHours <= 9) { strHours = "0" + strHours; } if (strMinutes >= 0 && strMinutes <= 9) { strMinutes = "0" + strMinutes; } if (strSeconds >= 0 && strSeconds <= 9) { strSeconds = "0" + strSeconds; } return date.getFullYear() + "-" + month + "-" + strDate + " " + strHours + ":" + strMinutes + ":" + strSeconds; } </script>

    <script type="text/javascript"> // RabbitMq 服务器IP地址或域名地址 const host = '127.0.0.1'; //WebSocket 协议服务端口,如果是走 HTTPS,设置443端口 const port = 15675; //需要操作的 Topic,第一级父级 topic const topic = 'MIDDOL-TEST'; //设备唯一id const clientId = "myclientid_" + parseInt(Math.random() * 10000, 10); //服务连接失败后重新尝试连接的时间间隔 const reconnectTimeout = 10000; // RabbitMq 用户名密码,确保该用户可以访问mqtt资源权限 const cleansession = true; // RabbitMq 用户名密码,确保该用户可以访问mqtt资源权限 const username = 'admin'; const password = 'Admin123'; //是否走加密 HTTPS,如果走 HTTPS,设置为 true const useTLS = false; // MQTT 客户端引用对象 let mqtt; function MQTTconnect() { mqtt = new Paho.MQTT.Client(host, port, "/ws", clientId); mqtt.onConnectionLost = onConnectionLost; mqtt.onMessageArrived = onMessageArrived; let options = { timeout: 3, keepAliveInterval: 60, mqttVersion: 4, cleanSession: cleansession, onSuccess: onConnect, onFailure: onFailure, useSSL: useTLS, userName: username, password: password }; mqtt.connect(options); } // 连接服务器失败 function onFailure(message) { console.log("onFailure : " + message); $("#arrivedDiv").append("<div style='color: red'> " + getFormatDate() + " onFailure : " + message.errorMessage + " </div>"); setTimeout(MQTTconnect, reconnectTimeout); } // 连接上服务器 function onConnect() { // 订阅某个主题 mqtt.subscribe(topic, {qos: 0}); // 订阅P2P主题 mqtt.subscribe(topic + "/p2p/" + clientId, {qos: 0}); // 发布主题消息 let message = new Paho.MQTT.Message("Connect success, Hello mqtt!"); // set topic message.destinationName = topic; mqtt.send(message); //发送 P2P 消息 message = new Paho.MQTT.Message("Connect success, Hello mqtt P2P Msg!"); // set topic message.destinationName = topic + "/p2p/" + clientId; mqtt.send(message); } // 未连接服务器 function onConnectionLost(response) { console.log("onConnectionLost : " + response); setTimeout(MQTTconnect, reconnectTimeout); } // 接收到mqtt消息 function onMessageArrived(message) { let topic = message.destinationName; let payload = message.payloadString; // console.log("recv msg : " + topic + " " + payload); $("#arrivedDiv").append("<div> " + getFormatDate() + " recv msg : topic=" + topic + " ,payload=" + payload + " </div>") } // MQTT 连接初始化 MQTTconnect(); // 发送测试 function testMessageSend() { let msg = $("#testMessage").val(); if (msg == null || msg === '') { $("#arrivedDiv").append("<div style='color: red'>请在文本框输入消息后点击发送按钮</div>") return; } // 发布主题消息 //set body let message = new Paho.MQTT.Message(msg); // set topic message.destinationName = topic; mqtt.send(message); } function testMessageClean() { $("#arrivedDiv").html("<br/>"); } $(function () { $("#myClientIdSpan").html("" + clientId); }) </script>
</head>

<div style="margin-top: 30px;">
    <span>MQTT设备ID(clientId):</span><span id="myClientIdSpan" style="font-weight: bold;"></span>
    <br/><br/>
    <label style="font-weight: bold" for="testMessage">MQTT消息:</label>
    <input style="height: 25px; width: 180px;" maxlength="60" value="这是一条测试消息" id="testMessage"/>
    <button class="button" id="mySendBtn" onclick="testMessageSend()"> 点击发送</button>

    <button class="button" id="cleanSendBtn" onclick="testMessageClean()"> 清空日志</button>
</div>

<div style="margin-top: 30px">

    <div style="font-size: 20px;color: darkcyan"> 接收到的mqtt消息日志</div>
    <hr/>
    <div id="arrivedDiv" style="height:600px; width:1000px; overflow:scroll; background:#EEEEEE;">
        <br/>
    </div>

</div>
</html>

主要查看上面js里面 RabbitMq 服务器IP地址或域名地址和端口号主要信息,运行如下界面:

5. 采用Java 订阅发布mqtt消息

mqtt作为行业通用协议,支持很多种语言开发,具体SDK请查看如下:

https://www.alibabacloud.com/help/zh/doc-detail/44866.htm?spm=a2c63.p38356.b99.49.18af6586hItGUj

本次采用 org.eclipse.paho.client.mqttv3 SDK进行订阅发布MQTT消息。

POM.xml引入Maven依赖:

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>

JAVA 主要代码如下:

package com.test.rabbitmq.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class MyMqttService {

    private static org.slf4j.Logger logger = LoggerFactory.getLogger(MyMqttService.class);

    private MqttClient client;
    private String defaultTopic;

    
    public static class Builder {
        private String host;
        private String userName;
        private String passWord;
        private String clientId;
        private String defaultTopic = "MyMqttTopic";
        private MqttCallback callback;
        private boolean cleanSession;

        public Builder host(String host) {
            this.host = host;
            return this;
        }

        public Builder userName(String userName) {
            this.userName = userName;
            return this;
        }

        public Builder passWord(String passWord) {
            this.passWord = passWord;
            return this;
        }

        public Builder clientId(String clientId) {
            this.clientId = clientId;
            return this;
        }

        public Builder defaultTopic(String defaultTopic) {
            this.defaultTopic = defaultTopic;
            return this;
        }

        public Builder callback(MqttCallback callback) {
            this.callback = callback;
            return this;
        }

        public Builder cleanSession(boolean cleanSession) {
            this.cleanSession = cleanSession;
            return this;
        }

        public MyMqttService build() {
            return new MyMqttService(this);
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    private MyMqttService(Builder builder) {
        defaultTopic = builder.defaultTopic;
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), r -> {
            Thread t = new Thread(r);
            t.setName("MyMQTT线程");
            return t;
        });
        try {
            //id应该保持唯一性
            client = new MqttClient(builder.host, builder.clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(builder.cleanSession);
            options.setUserName(builder.userName);
            options.setPassword(builder.passWord.toCharArray());
            options.setConnectionTimeout(10);
            options.setKeepAliveInterval(20);
            if (builder.callback == null) {
                client.setCallback(new MqttCallbackExtended() {

                    @Override
                    public void connectComplete(boolean reconnect, String serveruri) {
                        // 客户端连接成功后就需要尽快订阅需要的 topic
                        logger.debug(builder.clientId + " connectComplete reconnect=" + reconnect + ", serveruri=" + serveruri);

                        // 参考阿里云mqtt文档 https://www.alibabacloud.com/help/zh/doc-detail/42420.htm?spm=a2c63.p38356.b99.12.87851d06uHImcQ
                        
                        final String[] topicFilter = {builder.defaultTopic, builder.defaultTopic.concat("/p2p/").concat(builder.clientId)};
                        final int[] qos = {0, 0};
                        executorService.submit(() -> subscribe(topicFilter, qos));
                    }

                    @Override
                    public void connectionLost(Throwable arg0) {
                        logger.debug(builder.clientId + " connectionLost " + arg0);
                    }

                    @Override
                    public void deliveryComplete(IMqttDeliveryToken arg0) {

                        logger.debug(builder.clientId + " deliveryComplete " + arg0);
                    }

                    @Override
                    public void messageArrived(String arg0, MqttMessage arg1) {

                        logger.debug(builder.clientId + " messageArrived: " + arg1.toString());
                    }
                });
            } else {
                client.setCallback(builder.callback);
            }
            client.connect(options);
        } catch (MqttException e) {
            logger.error("MyMqttService 初始化异常 ", e);
        }
    }

    
    public void sendMessage(String msg) {
        sendMessage(defaultTopic, msg);
    }

    
    public void sendMessage(String topic, String msg) {
        try {
            MqttMessage message = new MqttMessage(msg.getBytes());
            message.setQos(0);
            message.setRetained(false);
            client.publish(topic, message);
            logger.info("发送消息成功 topic={},msg={}", topic, msg);
        } catch (MqttException e) {
            logger.error("发送主题消息异常 topic={} ,msg={}", topic, msg, e);
        }
    }

    
    public void subscribe(String[] topicFilters, int[] qos) {
        try {
            client.subscribe(topicFilters, qos);
            for (int i = 0; i < topicFilters.length; i++) {
                logger.info("subscribe success topicFilters={}, qos={}", topicFilters[i], qos[i]);
            }

        } catch (MqttException e) {
            logger.error("订阅主题", e);
        }
    }

    
    public void unsubscribe(String[] topicFilters) {
        try {
            client.unsubscribe(topicFilters);
        } catch (MqttException e) {
            logger.error("取消订阅某个主题", e);
        }
    }

    public void closeClient(boolean force) {
        try {
            client.close(force);
        } catch (MqttException e) {
            logger.error("closeClient异常", e);
        }
    }
}

写个main方法测试一下哦:

package com.test.rabbitmq.mqtt;

public class MyMqttTestService {

    public static void main(String[] args) throws InterruptedException {

        MyMqttService service = MyMqttService.builder()
                .host("tcp://127.0.0.1:1883")
                .userName("admin")
                .passWord("Admin123")
                .clientId("myclientid_10001")
                .defaultTopic("MIDDOL-TEST")
                .cleanSession(true).build();

        Thread.sleep(3000L);
        service.sendMessage("这是java后端发送的消息");

        // service.closeClient(false);
    }
}

运行一下看发生了什么?
控制台信息如下:

刚刚运行的html页面也同时受到了java后端发送的消息。

查看rabbitmq 管理后台的channel 出现两个链接

前端发送一个消息看看:

后端同样可以受到消息:

6. 稍作改动可切换阿里云MQTT消息队列

阿里云的MQTT消息队列的SDK代码几乎和上面一致,多了 accessKey权限校验等。
https://www.alibabacloud.com/help/zh/doc-detail/59721.htm?spm=a2c63.p38356.b99.48.28d04a9bVXkQM4

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服