MQTT订阅发布主题
前言:
- 因为tcp协议正常会出现丢包、卡死等现象,所以最近需要在项目中添加mqtt协议,mqtt协议的qos机制保证在网络条件比较差的情况下也能保持良好通信,反正它的各种好处网上有很多,所以学习了一下。
- 网上对于mqtt的订阅发布的例子大多将mqtt客户端分为client和server,一个发布主题,一个订阅主题,在我看来并没有什么client和server之分,一个客户端既可以发布主题也可以订阅主题,而服务端应该是类似于emqx这种的消息服务器,所以这里实现的是同一个客户端的订阅发布主题。
准备:
-
mqtt服务端: emqx
-
mqtt客户端模拟器: mqtt-spy
-
springBoot配置:
mqtt: # 是否启用 enable: true # mqtt服务地址 broker: tcp://localhost:1883 # mqtt服务用户名 username: admin # mqtt服务密码 password: public
-
maven依赖:
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency>
对于软件的安装使用及其他依赖(lombok、springBoot等)网上有很多,这里就不做说明了。
实现:
-
参数配置类:
@Getter @Component public class IoVariable { @Value("${mqtt.enable:true}") private boolean mqttEnable; @Value("${mqtt.broker:tcp://localhost:1883}") private String broker; @Value("${mqtt.host.userName:admin}") private String userName; @Value("${mqtt.host.passWord:public}") private String passWord; }
-
mqtt客户端:
-
对外只提供两个方法。
- 创建客户端并订阅主题
- 发布主题
-
因为创建客户端耗费资源,所以订阅发布主题用同一个客户端(后续进行连接池改造)
-
定义一个
private static MqttClient client;
这种方式不符合面向对象的编程思想,所以弃用
-
这里用实现接口并依赖的方式
-
@FunctionalInterface public interface IMqttPublish { void publish(String topic, String content); }
public class ClientMqtt implements IMqttPublish{ private MqttClient client; private final String broker; private final String clientId; private final String terminalTopic; private final String systemTopic; private final String userName; private final String passWord; public ClientMqtt(String broker, String clientId, String terminalTopic, String systemTopic, String userName, String passWord, TerminalManager manager) { this.broker = broker; this.clientId = clientId; this.terminalTopic = terminalTopic; this.systemTopic = systemTopic; this.userName = userName; this.passWord = passWord; this.manager = manager; } public void connect(){ try { client = new MqttClient(broker, clientId, new MemoryPersistence()); client.connect(getOptions()); //将自身注入回调就可以调用发布主题了 client.setCallback(new PushCallback(this)); //qos级别 int[] qos = {1, 1}; String[] topics = {terminalTopic, systemTopic}; client.subscribe(topics, qos); } catch (MqttException e) { e.printStackTrace(); } } private MqttConnectOptions getOptions(){ MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 设置超时时间 options.setConnectionTimeout(10); // 设置会话心跳时间 options.setKeepAliveInterval(5); //自动重连 options.setAutomaticReconnect(true); return options; } @Override public void publish(String topic, String content){ try { //mqttMessage配置 MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(1); mqttMessage.setRetained(true); mqttMessage.setPayload(content.getBytes()); //发布主题 MqttDeliveryToken token = client.getTopic(topic).publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } }
-
-
回调函数:接受到消息在这里处理
- 这里就可以用同一个客户端发布主题了
- 其他对象需要使用发布主题用相同的方式即可。这里在处理其他客户端发布的主题时创建发布主题的对象
@Slf4j public class PushCallback implements MqttCallbackExtended { private final IMqttPublish mqttPublish; public PushCallback(IMqttPublish mqttPublish) { this.mqttPublish = mqttPublish; } @Override public void connectionLost(Throwable cause) { log.error("mqtt连接断开,正在重新连接:" + cause); } @Override public void deliveryComplete(IMqttDeliveryToken token) { log.debug("通信完成"); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) { String systemTopicFlag = "$SYS"; // 处理系统主题(客户端上下线) if (topic.startsWith(systemTopicFlag)) { systemTopicHandler(topic); return; } // 处理其他客户端发布的主题 terminalTopicHandler(mqttMessage); } @Override public void connectComplete(boolean reconnect, String serverUrl) { log.debug("已重新连接"); } private void systemTopicHandler(String topic) { String[] topicSplit = topic.split("/"); // 系统主题拆分后长度:$SYS/brokers/emqx@127.0.0.1/clients/{clientId}/{clientState} int systemTopicLength = 6; if (topicSplit.length < systemTopicLength) { return; } // 客户端id String clientIdInTopic = topicSplit[4]; // 主题上下线类型 String clientState = topicSplit[5]; // 处理上下线消息 switch (ClientStateEnum.valueOf(clientState)) { // 客户端上线 case connected: log.debug("客户端上线:" + clientIdInTopic); break; // 客户端下线 case disconnected: log.debug("客户端下线:" + clientIdInTopic); break; default: break; } } private void terminalTopicHandler(MqttMessage mqttMessage) { PublishTest publishTest = new PublishTest(mqttPublish); } private enum ClientStateEnum { connected, disconnected } }
-
启动项目时创建mqtt客户端并订阅主题
@Slf4j @Component @Order(value = 1) public class IoAppReadyListener implements ApplicationListener<ApplicationReadyEvent> { @Autowired private IoVariable variable; private static final String TERMINAL_TOPIC = "自定义主题"; private static final String SYSTEM_TOPIC = "$SYS/brokers/+/clients/#"; @Override public void onApplicationEvent(ApplicationReadyEvent event) { if (variable.isMqttEnable()){ startMqtt(); } } private void startMqtt(){ String clientId = IdUtil.fastSimpleUUID(); MqttClientSingle mqttClientSingle = new MqttClientSingle(variable.getBroker(), clientId, TERMINAL_TOPIC, SYSTEM_TOPIC, variable.getUserName(), variable.getPassWord(), manager); mqttClientSingle.connect(); } }
发布主题:
public class PublishTest{
private final IMqttPublish mqttPublish;
public MqttTerminal(IMqttPublish mqttPublish) {
this.mqttPublish = mqttPublish;
}
public void publishTest(){
String content = "主题内容";
String topic = "自定义主题";
mqttPublish.publish(topic, content);
}
}
另:
- 代码业务部分有删改,可能会有部分错误。
- 由于作者水平有限,若有不足之处,欢迎指正。
参考资料
emqx官方文档