文章目录
- 1. springcloud stream介绍
- 1.1 mq产品介绍
- 1.2 Spring Cloud Stream
- 2. 简单使用
- 2.1 环境准备
- 2.1.1 mq准备
- 2.1.2 服务准备
- 2.2 spring-cloud-parent 父工程pom文件
- 2.3 spring-cloud-eureka-server Eureka Server
- 2.3.1 pom文件
- 2.3.2 application.xml
- 2.3.3 主启动类
- 2.4 消息生产者
- 2.4.1 pom文件
- 2.4.2 appliction.yml
- 2.4.3 主启动类
- 2.4.4 编写生产消息
- 2.4.5 编写测试类
- 2.5 消息消费者
- 2.5.1 pom文件
- 2.5.2 appliction.yml
- 2.5.3 主启动类
- 2.5.3 编写消费消息
- 2.6 测试
- 3.自定义消息通道
- 3.1 改造消息消费者服务
- 3.1.1自定义类似Source Sink的接口
- 3.1.2 application.yml
- 3.1.3消息消费编码
- 3.2 测试
- 4.消息分组
1. springcloud stream介绍
1.1 mq产品介绍
我们在项目开发或多或少的用过或者听说过mq吧,官方名称叫消息中间件哈,现在主流的mq产品有activemq,rabbitmq,rocketmq,kafka,它们主要被用来应用解耦,流量削峰,消息传递,异步处理,日志处理等等。
我们先来看一下传统消息中间件的模型
由消息生产者,消息中间件产品,消息消费者组成。
消息生产者:顾名思义,就是生产消息的,也是消息的投递者,将消息投递给消息中间件。
消息中间件:这个其实就是咱们上面说的那些具体的产品了,只不过内部实现不同,或者有些自己的特殊玩法。
消息消费者:就是接受消息,然后进行相应的业务逻辑的处理,可以订阅某个主题,然后当有消息的时候,消息中间件会通知你,当然你也可以每隔多长时间去消息中间件拉数据。
1.2 Spring Cloud Stream
Spring Cloud Stream官方的说法是一个构建消息驱动微服务的框架。我们可以这么理解,这个Spring Cloud Stream封装了mq的玩法,统一了模型,然后屏蔽各个mq产品中间件不同,降低了我们的学习成本,不过目前只支持kafka与rabbitmq。
我们可以看下这个Spring Cloud Stream的架构图
我们从上往下看,我们的应用程序,也就是spring core ,通过这个input 与output 这两种channel 与binder 进行交互,binder(绑定器对象)屏蔽了咱们的消息中间件产品的差异。这个input 与output相对于应用程序来说的,这个input对于应用程序就是读,从外面到程序,output就是写,由应用程序写到外面。
2. 简单使用
2.1 环境准备
2.1.1 mq准备
这里我们使用rabbitmq,下载安装可以查看《RabbitMQ安装》
2.1.2 服务准备
如果你看过我的springcloud系列文章,可以在那套环境的基础上进行,如果没有也不要紧,我们来介绍下接下来我们的环境。
spring-cloud-parent (父工程)
------spring-cloud-eureka-server(Eureka Server 集群两个实例 ,端口分别是9090,9091)
------spring-cloud-stream-consumer (消息消费者实例,我们这里分配端口 7086)
------spring-cloud-stream-producer (消息生产者实例,我们这里分配端口 8086)
2.2 spring-cloud-parent 父工程pom文件
<!--spring boot 父启动器依赖-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<dependencyManagement>
<!--引入springcloud依赖的-->
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!--web依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-commons</artifactId>
</dependency>
<!--这里新添加spring-test-starter-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2.3 spring-cloud-eureka-server Eureka Server
如果你使用的是我springcloud系列文章里面的基础环境的话,就可以跳过本小节,如果不是请接着往下看
2.3.1 pom文件
只需要加个 eurake server 依赖就可以了
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
</dependencies>
2.3.2 application.xml
1.需要配置主机host文件
2. application.yml
spring:
application:
name: spring-cloud-eureka-server
---
spring:
profiles: u1
eureka:
instance:
hostname: EurekaServerA
client:
service-url:
defaultZone: http://EurekaServerB:9091/eureka
register-with-eureka: true
fetch-registry: true
server:
port: 9090
---
spring:
profiles: u2
eureka:
instance:
hostname: EurekaServerB
client:
service-url:
defaultZone: http://EurekaServerA:9090/eureka
register-with-eureka: true
fetch-registry: true
server:
port: 9091
我这里使用了spring.profiles的特性,配置了9090 与9091 端口
3. 同时需要配置idea的启动项
2.3.3 主启动类
需要在主启动类上加@EnableEurekaServer注解表示启用这个Eureka Server
2.4 消息生产者
2.4.1 pom文件
这里添加eureka client 与 spring-cloud-stream-rabbit 的依赖
<dependencies>
<!--eureka client 客户端依赖引入-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--spring cloud stream 依赖(rabbit)-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
2.4.2 appliction.yml
server:
port: 8086
spring:
application:
name: spring-cloud-stream-producer
cloud:
stream:
# 绑定mq信息,这里我们绑定的是rabbitmq
binders:
# 给这个binder起个名字
spring-clould-stream-binder:
# mq的类型,如果是kafka的话就是kafka
type: rabbit
# 配置mq的信息
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 这里就是将通道与binder进行绑定
bindings:
# 定义output,因为我们是消息生产者,需要将消息写到channel中
output:
# 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
destination: springCloudStreamStudyTopic
# 传输内容的格式,也就是消息的格式,如果是json的话 application/json
content-type: text/plain
# 绑定的binder
binder: spring-clould-stream-binder
eureka:
client:
service-url:
defaultZone: http://EurekaServerA:9090/eureka,http://EurekaServerB:9091/eureka
fetch-registry: true
register-with-eureka: true
instance:
prefer-ip-address: true # 使用ip注册
#自定义实例显示格式,添加版本号
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}:@project.version@
2.4.3 主启动类
这里没啥好说的,就是添加@EnableDiscoveryClient 开启服务注册与发现
2.4.4 编写生产消息
先写一个发送消息的接口
public interface MessageProducerService {
public void sendMes(String content);
}
然后再写一个实现类:
//因为我们这边是个消息生产这,所以说是个消息的源头,用Source
@EnableBinding(Source.class)
public class MessageProducerServiceImpl implements MessageProducerService {
@Autowired
private Source source;
@Override
public void sendMes(String content) {
// 使用spring cloud stream提供的通道写出消息
source.output().send(MessageBuilder.withPayload(content).build());
}
}
然后这个Source里面定义了一个output的channel
2.4.5 编写测试类
@SpringBootTest
@RunWith(SpringRunner.class)
public class MessageProducerServiceTest {
@Autowired
private MessageProducerService messageProducerService;
@Test
public void testSendMes(){
messageProducerService.sendMes("hello spring cloud stream ");
}
}
2.5 消息消费者
2.5.1 pom文件
这个是与消息生产者一样的
<dependencies>
<!--eureka client 客户端依赖引入-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--spring cloud stream 依赖(rabbit)-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
2.5.2 appliction.yml
server:
port: 7086
spring:
application:
name: spring-cloud-stream-consumer
cloud:
stream:
# 绑定mq信息,这里我们绑定的是rabbitmq
binders:
# 给这个binder起个名字
spring-clould-stream-binder:
# mq的类型,如果是kafka的话就是kafka
type: rabbit
# 配置mq的信息
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 这里就是将通道与binder进行绑定
bindings:
# 定义output,因为我们是消息生产者,需要将消息写到channel中
input:
# 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
destination: springCloudStreamStudyTopic
# 传输内容的格式,也就是消息的格式,如果是json的话 application/json
content-type: text/plain
# 绑定的binder
binder: spring-clould-stream-binder
eureka:
client:
service-url:
defaultZone: http://EurekaServerA:9090/eureka,http://EurekaServerB:9091/eureka
fetch-registry: true
register-with-eureka: true
instance:
prefer-ip-address: true # 使用ip注册
#自定义实例显示格式,添加版本号
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}:@project.version@
2.5.3 主启动类
这里没啥好说的,就是添加@EnableDiscoveryClient 开启服务注册与发现
2.5.3 编写消费消息
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
//这里是消息消费者,所以就就是接收者,所以使用Sink
@EnableBinding(Sink.class)
public class MessageConsumerService {
@StreamListener(Sink.INPUT)
public void recevieMes(Message<String> message){
System.out.println("-----我接受到的消息是:"+message.getPayload());
}
}
我们可以看下Sink 源码,其实它里面绑定的是input的channel
2.6 测试
1.启动两个Eureka Server 实例
2.启动消息消费者实例,spring-cloud-stream-consumer
3.执行消息生产者test来生产消息
我们可以看到消息消费者控制台打印的信息:
同时我们也可以看下rabbitmq的控制台(本机部署的可以浏览器访问:http://localhost:15672)
3.自定义消息通道
在spring cloud stream 中内置了Source 与Sink 两个接口,然后binding了input 输入流与output输出流通道,我们可以自定义各种消息通道,我们只需要自定义一个类似Source 与Sink的接口就可以,然后里面多定义几个通道,这些都是由你的业务场景决定的
案例:比如说我要将 xxx项目组里应用程序产生的日志,投递另一个项目组的mq中,这种情况一般是一个消息中间件就可以搞定,但是我们这边用了两个,分别是不同项目组的,这里只是举个例子,我们要实现如下图的消息流转,从a mq中获得消息 然后投递到 b mq中。
我们只需要定义一个类似Source或者是Sink的接口,然后放两个channel,分别是 input,output
public interface LogStream {
String INPUT_LOG="inputLogChannel";
String OUTPUT_LOG="outputLogChannel";
@Input(INPUT_LOG)
SubscribableChannel inputLogChannel();
@Output(OUTPUT_LOG)
MessageChannel outputLogChannel();
}
使用:
- 在@EnableBinding()绑定咱们上面那个接口
- 使用 @StreamListener 做监听的时候,需要指定 LogStream.INPUT_LOG,然后写消息的时候使用OUTPUT_LOG。
我们只需要改造下消息消费者服务就可以了
3.1 改造消息消费者服务
3.1.1自定义类似Source Sink的接口
public interface LogStream {
String INPUT_LOG_NAME="inputLogChannel";
String OUTPUT_LOG_NAME="outputLogChannel";
@Input(INPUT_LOG_NAME)
SubscribableChannel inputLogChannel();
@Output(OUTPUT_LOG_NAME)
MessageChannel outputLogChannel();
}
3.1.2 application.yml
server:
port: 7086
spring:
application:
name: spring-cloud-stream-consumer
cloud:
stream:
# 需要指定一个默认的binder,不然会报错
default-binder: spring-clould-stream-binder
binders:
# 给这个binder起个名字
spring-clould-stream-binder:
# mq的类型,如果是kafka的话就是kafka
type: rabbit
# 配置mq的信息
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
spring-clould-stream-xxx-binder:
# mq的类型,如果是kafka的话就是kafka
type: rabbit
# 配置mq的信息
environment:
spring:
rabbitmq:
# 另一个mq的ip
host: xxx.xxx.xxx.xx
port: 5672
username: guest
password: guest
# 这里就是将通道与binder进行绑定
bindings:
# 定义output,因为我们是消息生产者,需要将消息写到channel中
inputLogChannel:
# 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
destination: springCloudStreamStudyTopic
# 传输内容的格式,也就是消息的格式,如果是json的话 application/json
content-type: text/plain
# 绑定的binder
binder: spring-clould-stream-binder
outputLogChannel:
# 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
destination: topicA
# 传输内容的格式,也就是消息的格式,如果是json的话 application/json
content-type: text/plain
# 绑定的binder
binder: spring-clould-stream-xxx-binder
eureka:
client:
service-url:
defaultZone: http://EurekaServerA:9090/eureka,http://EurekaServerB:9091/eureka
fetch-registry: true
register-with-eureka: true
instance:
prefer-ip-address: true # 使用ip注册
#自定义实例显示格式,添加版本号
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}:@project.version@
我们配置了2个binder ,然后将不同的通道绑定到不同的binder上面。
3.1.3消息消费编码
这里我们将从 a mq拿到的消息又投递到了 b mq中
@EnableBinding(LogStream.class)
public class MultiMessageCousumerService {
@Autowired
private LogStream logStream;
@StreamListener(LogStream.INPUT_LOG_NAME)
public void receiveMes(Message<String> mes){
String receiveMsg = mes.getPayload();
System.out.println("接受到的msg:"+receiveMsg);
logStream.outputLogChannel().send(MessageBuilder.withPayload(receiveMsg+" ~").build());
}
}
3.2 测试
将Eureka Server 集群先提起来,然后将这个消息消费者服务提起来,最后使用消息生产者服务的单元测试生产消息
我们再来看看这两个mq:
4.消息分组
现在,使用spring cloud stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不允许的,我们这时候需要给消息消费者加个分组信息,这个多个消费者实例在一个组下面就不会再出现消息重复消费。
我们这时候需要配置一下这个消息消费者的application.yml
spring:
application:
name: spring-cloud-stream-consumer
cloud:
stream:
# 绑定mq信息,这里我们绑定的是rabbitmq
binders:
# 给这个binder起个名字
spring-clould-stream-binder:
# mq的类型,如果是kafka的话就是kafka
type: rabbit
# 配置mq的信息
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 这里就是将通道与binder进行绑定
bindings:
# 定义output,因为我们是消息生产者,需要将消息写到channel中
input:
# 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
destination: springCloudStreamStudyTopic
# 传输内容的格式,也就是消息的格式,如果是json的话 application/json
content-type: text/plain
# 绑定的binder
binder: spring-clould-stream-binder
# 定义分组
group: studyGroup