消息队列学习-ActiveMQ(四)
- 8 SpringBoot整合ActiveMQ
- 8.1 队列(Queue)
- 8.1.1 队列生产者
- 8.1.2 队列消费者
- 8.2 主题发布订阅(Topic)
- 8.2.1 Topic生产者
- 8.2.2 Topic消费者
- 9 ActiveMQ的传输协议
- 9.1 面试题
- 9.2 官网
- 9.3 是什么
- 9.4 有哪些
- 9.4.1 Transmission Control Protocol(TCP)默认
- 9.4.2 New I/O API Protocol(NIO)
- 9.4.3 AMQP协议
- 9.4.4 Stomp协议
- 9.4.5 Secure Sockets Layer Protocol(SSL)
- 9.4.6 MQTT协议
- 9.4.7 WS协议(websocket)
- 9.4.8 小总结
- 9.5 NIO案例演示
- 9.5.1 修改配置文件
- 9.5.2 生产和消费两端协议代码修改
- 9.6 nio案例演示增强
8 SpringBoot整合ActiveMQ
8.1 队列(Queue)
8.1.1 队列生产者
- 新建Maven工程并设置包名类名
工程名:boot_mq_produce
包名:com.sky.boot.activemq - 配置POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath />
</parent>
<groupId>com.sky.boot.activemq</groupId>
<artifactId>boot_mq_produce</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 配置yml文件
server:
port: 7777
spring:
activemq:
broker-url: tcp://192.168.188.131:61616
user: admin
password: admin
jms:
pub-sub-domain: false # false = Queue(默认) true = Topic
# 自己定义队列名称
myqueue: boot-active-queue
- 配置bean
@Component
@EnableJms
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
@Bean
public Queue queue(){
return new ActiveMQQueue(myQueue);
}
}
- Queue_Producer
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg(){
jmsMessagingTemplate.convertAndSend(queue, "*****:"
+ UUID.randomUUID().toString().substring(0, 6));
}
}
- 主启动类
@SpringBootApplication
public class MainAppProduce {
public static void main(String[] args) {
SpringApplication.run(MainAppProduce.class);
}
}
- 测试单元
@SpringBootTest(classes = MainAppProduce.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {
@Resource
private Queue_Produce queue_produce;
@Test
public void testSend() throws Exception{
queue_produce.produceMsg();
}
}
执行后,结果如下:
新需求:要求每隔3秒钟,往MQ推送消息
- 修改Queue_Produce
@Scheduled(fixedDelay = 3000L)
public void produceMsgScheduled(){
jmsMessagingTemplate.convertAndSend(queue, "**scheduled:"
+ UUID.randomUUID().toString().substring(0, 6));
System.out.println("***** produceMsgScheduled send ok");
}
- 修改主启动类的MainAppProduce
@SpringBootApplication
@EnableScheduling
public class MainAppProduce {
public static void main(String[] args) {
SpringApplication.run(MainAppProduce.class, args);
}
}
直接开启主启动类,间隔发送消息
8.1.2 队列消费者
- 新建Mavaen工程并设置包名类名,同上一节
- POM文件,同上一节
- Yml文件,同上一节,注意:端口改了,eg:8888
- springboot的消息监听注解
@Component
public class Queue_Consumer {
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws JMSException{
System.out.println("****消费者收到消息:"+textMessage.getText());
}
}
执行主方法后:
8.2 主题发布订阅(Topic)
8.2.1 Topic生产者
- 新建Maven工程并设置包名类名:略
- POM文件:略
- Yml文件
server:
port: 6666
spring:
activemq:
broker-url: tcp://192.168.188.131:61616
user: admin
password: admin
jms:
pub-sub-domain: true # false = Queue(默认) true = Topic
# 自己定义队列名称
myTopic: boot-active-topic
- 配置bean
@Component
public class ConfigBean {
@Value("${myTopic}")
private String topicName;
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
}
- Topic_Producer
@Component
public class Topic_Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 3000L)
public void produceTopic(){
jmsMessagingTemplate.convertAndSend(topic, "主题消息:"
+ UUID.randomUUID().toString().substring(0, 6));
}
}
- 主启动类
@SpringBootApplication
@EnableScheduling
public class BootMqToppicProducerApplication {
public static void main(String[] args) {
SpringApplication.run(BootMqToppicProducerApplication.class, args);
}
}
先启动消费者,后启动生产者
8.2.2 Topic消费者
- 新建Maven工程并设置包名类名:略
- POM文件:略
- Yml文件
server:
port: 5555 # 启动第二个时,改成5566
spring:
activemq:
broker-url: tcp://192.168.188.131:61616
user: admin
password: admin
jms:
pub-sub-domain: true # false = Queue(默认) true = Topic
# 自己定义队列名称
myTopic: boot-active-topic
- Topic_Consumer
@Component
public class Topic_Consumer {
@JmsListener(destination = "${myTopic}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("****消费者收到主题消息:"+textMessage.getText());
}
}
- 主启动类
@SpringBootApplication
public class BootMqTopicConsumerApplication5555 {
public static void main(String[] args) {
SpringApplication.run(BootMqTopicConsumerApplication5555.class,
args);
}
}
9 ActiveMQ的传输协议
9.1 面试题
- 默认的61616端口如何更改
- 你生产上的连接协议如何配置的?使用tcp吗?
9.2 官网
http://activemq.apache.org/configuring-version-5-transports.html
9.3 是什么
ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。
其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。
见下图实际配置:
在上文给出的配置信息中,
URI描述信息的头部都是采用协议名称:例如
描述amqp协议的监听端口时,采用的URI描述格式为“amqp://······”;
描述Stomp协议的监听端口时,采用URI描述格式为“stomp://······”;
唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire
9.4 有哪些
注意:前两个较为重要,其余了解即可
9.4.1 Transmission Control Protocol(TCP)默认
- 这是默认的Broker配置,TCP的Client监听端口61616
- 在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。
- TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。
- TCP传输的的优点:
(4.1)TCP协议传输可靠性高,稳定性强
(4.2)高效率:字节流方式传递,效率很高
(4.3)有效性、可用性:应用广泛,支持任何平台 - 关于Transport协议的可选配置参数可以参考官网: TCP详情
9.4.2 New I/O API Protocol(NIO)
- NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
- 适合使用NIO协议的场景:
(2.1)可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
(2.2)可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。 - NIO连接的URI形式:nio://hostname:port?key=value&key=value
- 关于Transport协议的可选配置参数可以参考官网: NIO详情
9.4.3 AMQP协议
Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件限制。
9.4.4 Stomp协议
STOP,Streaming Text Orientation Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。
9.4.5 Secure Sockets Layer Protocol(SSL)
9.4.6 MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
扩展Github:https://github.com/fusesource/mqtt-client
9.4.7 WS协议(websocket)
9.4.8 小总结
9.5 NIO案例演示
9.5.1 修改配置文件
在transportConnectors
里添加
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
记得重启activemq
9.5.2 生产和消费两端协议代码修改
都只改动这两行即可
private static final String ActiveMQ_URL = "nio://192.168.188.131:61618";
private static final String QUEUE_NAME = "Protocol";
运行结果与前面一样,略。
9.6 nio案例演示增强
问题:URI格式以"nio"开头,代表这个端口使用TCP协议为基础的NIO网络模型。
但是这样的设置方式,只能使这个端口支持Openwire协议。
我们怎么能够让这个端口既支持NIO网络模型,又让他支持多个协议呢?
解决:使用auto关键字、使用"+"符号来为端口设置多种特性
在transportConnectors
中添加
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50"/>
重启activeMQ后,