架构设计:
如使用EMQ企业版,企业版支持数据转发Kafka的插件,但企业版收费。
现需要使用代码的方式将EMQ接收的数据转发到Kafka。
EMQ准备:
EMQ安装部署,部署好之后大致就是这个样子的:
这里EMQ不需要做任何操作,也不需要提前创建topic等。
设备模拟:
使用MQTTX模拟设备采集装置向EMQ发送数据:
代码实现EMQ数据转发Kafka
首先导入以下maven依赖:
<!--mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.22</version>
</dependency>
主程序类实现连接 EMQ Broker,并进行消息接收的代码:
package com.zhbr.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class App {
public static void main(String[] args) {
String subTopic = "testtopic/#";
String broker = "tcp://192.168.72.141:1883";
String clientId = "mqttjs_efadb873";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
//connOpts.setUserName("emqx_test");
//connOpts.setPassword("emqx_test_password".toCharArray());
// 保留会话
connOpts.setCleanSession(true);
// 设置回调
client.setCallback(new OnMessageCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
//System.out.println("Publishing message: " + content);
// 订阅
client.subscribe(subTopic);
// 消息发布所需参数
// MqttMessage message = new MqttMessage(content.getBytes());
// message.setQos(qos);
// client.publish(pubTopic, message);
// System.out.println("Message published");
//
// client.disconnect();
// System.out.println("Disconnected");
// client.close();
// System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
回调消息处理类OnMessageCallback:
package com.zhbr.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback{
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
//接收到的消息发送到Kafka
MqttKafkaProducer.pushData(new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
Kafka消息发送类MqttKafkaProducer:
package com.zhbr.mqtt;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MqttKafkaProducer {
public static void pushData(String msgData) {
Properties props = new Properties();
//集群地址,多个服务器用","分隔
props.put("bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
//重新发送消息次数,到达次数返回错误
props.put("retries", 0);
//Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。
props.put("batch.size", 163840);
//Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。
props.put("linger.ms", 1);
//在Producer端用来存放尚未发送出去的Message的缓冲区大小
props.put("buffer.memory", 33554432);
//key、value的序列化,此处以字符串为例,使用kafka已有的序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//props.put("partitioner.class", "com.kafka.demo.Partitioner");//分区操作,此处未写
props.put("acks", "1");
props.put("request.timeout.ms", "60000");
props.put("compression.type","lz4");
//创建生产者
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//通过时间做轮循,均匀分布设置的partition,提高效率。
int partition = (int) (System.currentTimeMillis() % 3);
//写入名为"test-partition-1"的topic
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("emqtopic",partition, UUID.randomUUID().toString(), msgData);
try {
producer.send(producerRecord).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("写入emqtopic:" + msgData);
//producer.close();
}
}
实现的效果:
1、模拟设备消息采集:
2、Java代码中也从EMQ broker中接收到了数据,并将数据写入到了Kafka的emqtopic中:
3、Kafka的emqtopic中也已经有个这条数据: