目录
- RabbitMQ
- What is AMQP, MQTT, STOMP ?
- How to use RabbitMQ with MQTT ?
- 1. Docker 安装RabbitMQ
- 2. MQTT插件启用
- 3. 查看 rabbitmq_mqtt 默认配置
- 4. 采用JS 前端订阅发布mqtt消息
- 5. 采用Java 订阅发布mqtt消息
- 6. 稍作改动可切换阿里云MQTT消息队列
RabbitMQ
不多说了,它是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。用它解决微服务各种服务的解耦等等。
What is AMQP, MQTT, STOMP ?
先搞懂一些协议层面上的东西 AMQP, MQTT,STOMP
- AMQP代表高级消息队列协议
- MQTT(消息队列遥测传输)
- STOMP(简单/流式文本导向的消息传递协议)是这三种协议中唯——种基于文本的协议
说的直白了,
AMQP 用在后端微服务中比较多,RocketMQ、 Kafka等这些消息软件都实现了这种高级协议
MQTT 能传递文本、语音、图片、视频等二进制数据
STOMP 简单文本传输
MQTT协议 用在终端消息推送比较多,现在的物联网产品里面大都有MQTT技术的身影。
RabbitMQ 支持以上三种协议
选择你的消息协议 AMQP, MQTT,STOMP,请看大神博客:
https://www.yuque.com/noobwo/mq/hpiop0
How to use RabbitMQ with MQTT ?
首先请确保你对 rabbitmq使用已经熟悉了,不熟悉的请自行学习,很简单的使用。
如果不想看英文官方文档,网上不错的rabbitmq基本使用 博客
https://www.cnblogs.com/refuge/category/1395422.html
1. Docker 安装RabbitMQ
docker run -d --hostname myrabbit-test --name mq-rabbit -p 15674:15674 -p 15675:15675 -p 5672:5672 -p 15672:15672 -p 1883:1883 -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=Admin123 rabbitmq:latest
端口说明:
端口 | 说明 |
---|---|
5672 | 后台SDK TCP方式连接rabbitmq的端口,发送rabbitmq AMQP协议的消息 |
15672 | rabbitmq 管理界面登录访问的端口 |
15674 | rabbitmq STOMP WebSocket方式 访问的端口 ,比如JS订阅发布mqtt消息 |
15675 | rabbitmq MQTT WebSocket方式 访问的端口 ,比如JS订阅发布mqtt消息 |
1883 | rabbitmq MQTT TCP方式访问的端口 ,比如java订阅发布mqtt消息 |
RABBITMQ_DEFAULT_VHOST 相当于一个相对独立的RabbitMQ服务器,每个VirtualHost
之间是相互隔离的,里面的exchange、queue、message不能互通, VirtualHost可以看成相当于mysql中的一个Database.
2. MQTT插件启用
镜像 rabbitmq:latest 启动的容器里面,默认没有启用各个插件,请进入容器启用各个插件。
STOMP 不用可以不启用该插件。
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_web_mqtt
3. 查看 rabbitmq_mqtt 默认配置
rabbitmq的配置文件在哪里?
一般在 /etc/rabbitmq/下面,Windows可查看README.txt文,一般在 %APPDATA%\RabbitMQ\rabbitmq.config
具体说明,请查看官方文档:
https://www.rabbitmq.com/configure.html#config-file-location
如果你想使用默认配置文件时,请确保如下配置已经做好:
- mqtt 默认vhost 为“/” 请登录rabbitmq管理后台创建该 vhost
- mqtt 默认 exchange 为 “amq.topic”
- mqtt 默认 用户名和密码 为 guest/guest,当然连接的时候可以指定其他用户,比如docker创建的时候指定的 admin用户,请确保该用户有访问 vhost “/”的权限
4. 采用JS 前端订阅发布mqtt消息
不多说,直接上代码,html代码如下:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="zh-CN">
<head>
<title>Mqtt Websockets</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
<style type="text/css"> .button { background-color: #4CAF50; border: none; color: white; padding: 8px 20px; text-align: center; text-decoration: none; display: inline-block; font-size: 16px; margin: 4px 2px; cursor: pointer; } </style>
<script type="text/javascript"> // 工具类 日期函数 function getFormatDate() { var date = new Date(); var month = date.getMonth() + 1; var strDate = date.getDate(); var strHours = date.getHours(); var strMinutes = date.getMinutes(); var strSeconds = date.getSeconds(); if (month >= 1 && month <= 9) { month = "0" + month; } if (strDate >= 0 && strDate <= 9) { strDate = "0" + strDate; } if (strHours >= 0 && strHours <= 9) { strHours = "0" + strHours; } if (strMinutes >= 0 && strMinutes <= 9) { strMinutes = "0" + strMinutes; } if (strSeconds >= 0 && strSeconds <= 9) { strSeconds = "0" + strSeconds; } return date.getFullYear() + "-" + month + "-" + strDate + " " + strHours + ":" + strMinutes + ":" + strSeconds; } </script>
<script type="text/javascript"> // RabbitMq 服务器IP地址或域名地址 const host = '127.0.0.1'; //WebSocket 协议服务端口,如果是走 HTTPS,设置443端口 const port = 15675; //需要操作的 Topic,第一级父级 topic const topic = 'MIDDOL-TEST'; //设备唯一id const clientId = "myclientid_" + parseInt(Math.random() * 10000, 10); //服务连接失败后重新尝试连接的时间间隔 const reconnectTimeout = 10000; // RabbitMq 用户名密码,确保该用户可以访问mqtt资源权限 const cleansession = true; // RabbitMq 用户名密码,确保该用户可以访问mqtt资源权限 const username = 'admin'; const password = 'Admin123'; //是否走加密 HTTPS,如果走 HTTPS,设置为 true const useTLS = false; // MQTT 客户端引用对象 let mqtt; function MQTTconnect() { mqtt = new Paho.MQTT.Client(host, port, "/ws", clientId); mqtt.onConnectionLost = onConnectionLost; mqtt.onMessageArrived = onMessageArrived; let options = { timeout: 3, keepAliveInterval: 60, mqttVersion: 4, cleanSession: cleansession, onSuccess: onConnect, onFailure: onFailure, useSSL: useTLS, userName: username, password: password }; mqtt.connect(options); } // 连接服务器失败 function onFailure(message) { console.log("onFailure : " + message); $("#arrivedDiv").append("<div style='color: red'> " + getFormatDate() + " onFailure : " + message.errorMessage + " </div>"); setTimeout(MQTTconnect, reconnectTimeout); } // 连接上服务器 function onConnect() { // 订阅某个主题 mqtt.subscribe(topic, {qos: 0}); // 订阅P2P主题 mqtt.subscribe(topic + "/p2p/" + clientId, {qos: 0}); // 发布主题消息 let message = new Paho.MQTT.Message("Connect success, Hello mqtt!"); // set topic message.destinationName = topic; mqtt.send(message); //发送 P2P 消息 message = new Paho.MQTT.Message("Connect success, Hello mqtt P2P Msg!"); // set topic message.destinationName = topic + "/p2p/" + clientId; mqtt.send(message); } // 未连接服务器 function onConnectionLost(response) { console.log("onConnectionLost : " + response); setTimeout(MQTTconnect, reconnectTimeout); } // 接收到mqtt消息 function onMessageArrived(message) { let topic = message.destinationName; let payload = message.payloadString; // console.log("recv msg : " + topic + " " + payload); $("#arrivedDiv").append("<div> " + getFormatDate() + " recv msg : topic=" + topic + " ,payload=" + payload + " </div>") } // MQTT 连接初始化 MQTTconnect(); // 发送测试 function testMessageSend() { let msg = $("#testMessage").val(); if (msg == null || msg === '') { $("#arrivedDiv").append("<div style='color: red'>请在文本框输入消息后点击发送按钮</div>") return; } // 发布主题消息 //set body let message = new Paho.MQTT.Message(msg); // set topic message.destinationName = topic; mqtt.send(message); } function testMessageClean() { $("#arrivedDiv").html("<br/>"); } $(function () { $("#myClientIdSpan").html("" + clientId); }) </script>
</head>
<div style="margin-top: 30px;">
<span>MQTT设备ID(clientId):</span><span id="myClientIdSpan" style="font-weight: bold;"></span>
<br/><br/>
<label style="font-weight: bold" for="testMessage">MQTT消息:</label>
<input style="height: 25px; width: 180px;" maxlength="60" value="这是一条测试消息" id="testMessage"/>
<button class="button" id="mySendBtn" onclick="testMessageSend()"> 点击发送</button>
<button class="button" id="cleanSendBtn" onclick="testMessageClean()"> 清空日志</button>
</div>
<div style="margin-top: 30px">
<div style="font-size: 20px;color: darkcyan"> 接收到的mqtt消息日志</div>
<hr/>
<div id="arrivedDiv" style="height:600px; width:1000px; overflow:scroll; background:#EEEEEE;">
<br/>
</div>
</div>
</html>
主要查看上面js里面 RabbitMq 服务器IP地址或域名地址和端口号主要信息,运行如下界面:
5. 采用Java 订阅发布mqtt消息
mqtt作为行业通用协议,支持很多种语言开发,具体SDK请查看如下:
https://www.alibabacloud.com/help/zh/doc-detail/44866.htm?spm=a2c63.p38356.b99.49.18af6586hItGUj
本次采用 org.eclipse.paho.client.mqttv3 SDK进行订阅发布MQTT消息。
POM.xml引入Maven依赖:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
JAVA 主要代码如下:
package com.test.rabbitmq.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyMqttService {
private static org.slf4j.Logger logger = LoggerFactory.getLogger(MyMqttService.class);
private MqttClient client;
private String defaultTopic;
public static class Builder {
private String host;
private String userName;
private String passWord;
private String clientId;
private String defaultTopic = "MyMqttTopic";
private MqttCallback callback;
private boolean cleanSession;
public Builder host(String host) {
this.host = host;
return this;
}
public Builder userName(String userName) {
this.userName = userName;
return this;
}
public Builder passWord(String passWord) {
this.passWord = passWord;
return this;
}
public Builder clientId(String clientId) {
this.clientId = clientId;
return this;
}
public Builder defaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
return this;
}
public Builder callback(MqttCallback callback) {
this.callback = callback;
return this;
}
public Builder cleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
return this;
}
public MyMqttService build() {
return new MyMqttService(this);
}
}
public static Builder builder() {
return new Builder();
}
private MyMqttService(Builder builder) {
defaultTopic = builder.defaultTopic;
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), r -> {
Thread t = new Thread(r);
t.setName("MyMQTT线程");
return t;
});
try {
//id应该保持唯一性
client = new MqttClient(builder.host, builder.clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(builder.cleanSession);
options.setUserName(builder.userName);
options.setPassword(builder.passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
if (builder.callback == null) {
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serveruri) {
// 客户端连接成功后就需要尽快订阅需要的 topic
logger.debug(builder.clientId + " connectComplete reconnect=" + reconnect + ", serveruri=" + serveruri);
// 参考阿里云mqtt文档 https://www.alibabacloud.com/help/zh/doc-detail/42420.htm?spm=a2c63.p38356.b99.12.87851d06uHImcQ
final String[] topicFilter = {builder.defaultTopic, builder.defaultTopic.concat("/p2p/").concat(builder.clientId)};
final int[] qos = {0, 0};
executorService.submit(() -> subscribe(topicFilter, qos));
}
@Override
public void connectionLost(Throwable arg0) {
logger.debug(builder.clientId + " connectionLost " + arg0);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
logger.debug(builder.clientId + " deliveryComplete " + arg0);
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) {
logger.debug(builder.clientId + " messageArrived: " + arg1.toString());
}
});
} else {
client.setCallback(builder.callback);
}
client.connect(options);
} catch (MqttException e) {
logger.error("MyMqttService 初始化异常 ", e);
}
}
public void sendMessage(String msg) {
sendMessage(defaultTopic, msg);
}
public void sendMessage(String topic, String msg) {
try {
MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(0);
message.setRetained(false);
client.publish(topic, message);
logger.info("发送消息成功 topic={},msg={}", topic, msg);
} catch (MqttException e) {
logger.error("发送主题消息异常 topic={} ,msg={}", topic, msg, e);
}
}
public void subscribe(String[] topicFilters, int[] qos) {
try {
client.subscribe(topicFilters, qos);
for (int i = 0; i < topicFilters.length; i++) {
logger.info("subscribe success topicFilters={}, qos={}", topicFilters[i], qos[i]);
}
} catch (MqttException e) {
logger.error("订阅主题", e);
}
}
public void unsubscribe(String[] topicFilters) {
try {
client.unsubscribe(topicFilters);
} catch (MqttException e) {
logger.error("取消订阅某个主题", e);
}
}
public void closeClient(boolean force) {
try {
client.close(force);
} catch (MqttException e) {
logger.error("closeClient异常", e);
}
}
}
写个main方法测试一下哦:
package com.test.rabbitmq.mqtt;
public class MyMqttTestService {
public static void main(String[] args) throws InterruptedException {
MyMqttService service = MyMqttService.builder()
.host("tcp://127.0.0.1:1883")
.userName("admin")
.passWord("Admin123")
.clientId("myclientid_10001")
.defaultTopic("MIDDOL-TEST")
.cleanSession(true).build();
Thread.sleep(3000L);
service.sendMessage("这是java后端发送的消息");
// service.closeClient(false);
}
}
运行一下看发生了什么?
控制台信息如下:
刚刚运行的html页面也同时受到了java后端发送的消息。
查看rabbitmq 管理后台的channel 出现两个链接
前端发送一个消息看看:
后端同样可以受到消息:
6. 稍作改动可切换阿里云MQTT消息队列
阿里云的MQTT消息队列的SDK代码几乎和上面一致,多了 accessKey权限校验等。
https://www.alibabacloud.com/help/zh/doc-detail/59721.htm?spm=a2c63.p38356.b99.48.28d04a9bVXkQM4