SpringCloud之Stream学习篇

   日期:2020-07-12     浏览:95    评论:0    
核心提示:1. springcloud stream介绍1.1 mq产品介绍我们在项目开发或多或少的用过或者听说过mq吧,官方名称叫消息中间件哈,现在主流的mq产品有activemq,rabbitmq,rocketmq,kafka,它们主要被用来应用解耦,流量削峰,消息传递,异步处理,日志处理等等。我们先来看一下传统消息中间件的模型由消息生产者,消息中间件产品,消息消费者组成。消息生产者:顾名思义,就是生产消息的,也是消息的投递者,将消息投递给消息中间件。消息中间件:这个其实就是咱们上面说的那些具体的产

文章目录

      • 1. springcloud stream介绍
        • 1.1 mq产品介绍
        • 1.2 Spring Cloud Stream
      • 2. 简单使用
        • 2.1 环境准备
          • 2.1.1 mq准备
          • 2.1.2 服务准备
        • 2.2 spring-cloud-parent 父工程pom文件
        • 2.3 spring-cloud-eureka-server Eureka Server
          • 2.3.1 pom文件
          • 2.3.2 application.xml
          • 2.3.3 主启动类
        • 2.4 消息生产者
          • 2.4.1 pom文件
          • 2.4.2 appliction.yml
          • 2.4.3 主启动类
          • 2.4.4 编写生产消息
          • 2.4.5 编写测试类
        • 2.5 消息消费者
          • 2.5.1 pom文件
          • 2.5.2 appliction.yml
          • 2.5.3 主启动类
          • 2.5.3 编写消费消息
        • 2.6 测试
      • 3.自定义消息通道
        • 3.1 改造消息消费者服务
          • 3.1.1自定义类似Source Sink的接口
          • 3.1.2 application.yml
          • 3.1.3消息消费编码
        • 3.2 测试
      • 4.消息分组

1. springcloud stream介绍

1.1 mq产品介绍

我们在项目开发或多或少的用过或者听说过mq吧,官方名称叫消息中间件哈,现在主流的mq产品有activemq,rabbitmq,rocketmq,kafka,它们主要被用来应用解耦,流量削峰,消息传递,异步处理,日志处理等等。
我们先来看一下传统消息中间件的模型

由消息生产者,消息中间件产品,消息消费者组成。
消息生产者:顾名思义,就是生产消息的,也是消息的投递者,将消息投递给消息中间件。
消息中间件:这个其实就是咱们上面说的那些具体的产品了,只不过内部实现不同,或者有些自己的特殊玩法。
消息消费者:就是接受消息,然后进行相应的业务逻辑的处理,可以订阅某个主题,然后当有消息的时候,消息中间件会通知你,当然你也可以每隔多长时间去消息中间件拉数据。

1.2 Spring Cloud Stream

Spring Cloud Stream官方的说法是一个构建消息驱动微服务的框架。我们可以这么理解,这个Spring Cloud Stream封装了mq的玩法,统一了模型,然后屏蔽各个mq产品中间件不同,降低了我们的学习成本,不过目前只支持kafka与rabbitmq。
我们可以看下这个Spring Cloud Stream的架构图

我们从上往下看,我们的应用程序,也就是spring core ,通过这个input 与output 这两种channel 与binder 进行交互,binder(绑定器对象)屏蔽了咱们的消息中间件产品的差异。这个input 与output相对于应用程序来说的,这个input对于应用程序就是读,从外面到程序,output就是写,由应用程序写到外面。

2. 简单使用

2.1 环境准备

2.1.1 mq准备

这里我们使用rabbitmq,下载安装可以查看《RabbitMQ安装》

2.1.2 服务准备

如果你看过我的springcloud系列文章,可以在那套环境的基础上进行,如果没有也不要紧,我们来介绍下接下来我们的环境。

spring-cloud-parent (父工程)

------spring-cloud-eureka-server(Eureka Server 集群两个实例 ,端口分别是9090,9091)

------spring-cloud-stream-consumer (消息消费者实例,我们这里分配端口 7086)

------spring-cloud-stream-producer (消息生产者实例,我们这里分配端口 8086)

2.2 spring-cloud-parent 父工程pom文件


    <!--spring boot 父启动器依赖-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
    </parent>
    <dependencyManagement>
        <!--引入springcloud依赖的-->
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Greenwich.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <!--web依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-commons</artifactId>
        </dependency>

		<!--这里新添加spring-test-starter-->
 		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

2.3 spring-cloud-eureka-server Eureka Server

如果你使用的是我springcloud系列文章里面的基础环境的话,就可以跳过本小节,如果不是请接着往下看

2.3.1 pom文件

只需要加个 eurake server 依赖就可以了

  <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>
    </dependencies>

2.3.2 application.xml

1.需要配置主机host文件

2. application.yml

spring:
  application:
    name: spring-cloud-eureka-server


---
spring:
  profiles: u1
eureka:
  instance:
    hostname: EurekaServerA
  client:
    service-url:
      defaultZone:  http://EurekaServerB:9091/eureka
    register-with-eureka: true
    fetch-registry: true
server:
  port: 9090
---
spring:
  profiles: u2
eureka:
  instance:
    hostname: EurekaServerB
  client:
    service-url:
      defaultZone:  http://EurekaServerA:9090/eureka
    register-with-eureka: true
    fetch-registry: true
server:
  port: 9091

我这里使用了spring.profiles的特性,配置了9090 与9091 端口
3. 同时需要配置idea的启动项

2.3.3 主启动类

需要在主启动类上加@EnableEurekaServer注解表示启用这个Eureka Server

2.4 消息生产者

2.4.1 pom文件

这里添加eureka client 与 spring-cloud-stream-rabbit 的依赖

    <dependencies>
        <!--eureka client 客户端依赖引入-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--spring cloud stream 依赖(rabbit)-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

2.4.2 appliction.yml
server:
  port: 8086

spring:
  application:
    name: spring-cloud-stream-producer
  cloud:
    stream:
      # 绑定mq信息,这里我们绑定的是rabbitmq
      binders:
        # 给这个binder起个名字
        spring-clould-stream-binder:
          # mq的类型,如果是kafka的话就是kafka
          type: rabbit
          # 配置mq的信息
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      # 这里就是将通道与binder进行绑定
      bindings:
        # 定义output,因为我们是消息生产者,需要将消息写到channel中
        output:
          # 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
          destination: springCloudStreamStudyTopic
          # 传输内容的格式,也就是消息的格式,如果是json的话 application/json
          content-type: text/plain
          # 绑定的binder
          binder: spring-clould-stream-binder

eureka:
  client:
    service-url:
      defaultZone: http://EurekaServerA:9090/eureka,http://EurekaServerB:9091/eureka
    fetch-registry: true
    register-with-eureka: true
  instance:
    prefer-ip-address: true # 使用ip注册
    #自定义实例显示格式,添加版本号
    instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}:@project.version@


2.4.3 主启动类

这里没啥好说的,就是添加@EnableDiscoveryClient 开启服务注册与发现

2.4.4 编写生产消息

先写一个发送消息的接口

public interface MessageProducerService {
    public void sendMes(String content);
}

然后再写一个实现类:

//因为我们这边是个消息生产这,所以说是个消息的源头,用Source
@EnableBinding(Source.class)
public class MessageProducerServiceImpl  implements MessageProducerService {
    @Autowired
    private Source source;
    @Override
    public void sendMes(String content) {
        // 使用spring cloud stream提供的通道写出消息
        source.output().send(MessageBuilder.withPayload(content).build());
    }
}

然后这个Source里面定义了一个output的channel

2.4.5 编写测试类
@SpringBootTest
@RunWith(SpringRunner.class)
public class MessageProducerServiceTest {
    @Autowired
    private  MessageProducerService messageProducerService;
    @Test
    public void testSendMes(){
        messageProducerService.sendMes("hello spring cloud stream ");
    }
}

2.5 消息消费者

2.5.1 pom文件

这个是与消息生产者一样的


    <dependencies>
        <!--eureka client 客户端依赖引入-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--spring cloud stream 依赖(rabbit)-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

2.5.2 appliction.yml
server:
  port: 7086

spring:
  application:
    name: spring-cloud-stream-consumer
  cloud:
    stream:
      # 绑定mq信息,这里我们绑定的是rabbitmq
      binders:
        # 给这个binder起个名字
        spring-clould-stream-binder:
          # mq的类型,如果是kafka的话就是kafka
          type: rabbit
          # 配置mq的信息
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      # 这里就是将通道与binder进行绑定
      bindings:
        # 定义output,因为我们是消息生产者,需要将消息写到channel中
        input:
          # 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
          destination: springCloudStreamStudyTopic
          # 传输内容的格式,也就是消息的格式,如果是json的话 application/json
          content-type: text/plain
          # 绑定的binder
          binder: spring-clould-stream-binder

eureka:
  client:
    service-url:
      defaultZone: http://EurekaServerA:9090/eureka,http://EurekaServerB:9091/eureka
    fetch-registry: true
    register-with-eureka: true
  instance:
    prefer-ip-address: true # 使用ip注册
    #自定义实例显示格式,添加版本号
    instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}:@project.version@
2.5.3 主启动类

这里没啥好说的,就是添加@EnableDiscoveryClient 开启服务注册与发现

2.5.3 编写消费消息
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

//这里是消息消费者,所以就就是接收者,所以使用Sink
@EnableBinding(Sink.class)
public class MessageConsumerService {

    @StreamListener(Sink.INPUT)
    public void recevieMes(Message<String> message){
        System.out.println("-----我接受到的消息是:"+message.getPayload());
    }
}

我们可以看下Sink 源码,其实它里面绑定的是input的channel

2.6 测试

1.启动两个Eureka Server 实例
2.启动消息消费者实例,spring-cloud-stream-consumer
3.执行消息生产者test来生产消息
我们可以看到消息消费者控制台打印的信息:

同时我们也可以看下rabbitmq的控制台(本机部署的可以浏览器访问:http://localhost:15672)

3.自定义消息通道

在spring cloud stream 中内置了Source 与Sink 两个接口,然后binding了input 输入流与output输出流通道,我们可以自定义各种消息通道,我们只需要自定义一个类似Source 与Sink的接口就可以,然后里面多定义几个通道,这些都是由你的业务场景决定的
案例:比如说我要将 xxx项目组里应用程序产生的日志,投递另一个项目组的mq中,这种情况一般是一个消息中间件就可以搞定,但是我们这边用了两个,分别是不同项目组的,这里只是举个例子,我们要实现如下图的消息流转,从a mq中获得消息 然后投递到 b mq中。

我们只需要定义一个类似Source或者是Sink的接口,然后放两个channel,分别是 input,output

public interface LogStream {

    String INPUT_LOG="inputLogChannel";
    String OUTPUT_LOG="outputLogChannel";
    @Input(INPUT_LOG)
    SubscribableChannel inputLogChannel();
    @Output(OUTPUT_LOG)
    MessageChannel outputLogChannel();
}

使用:

  1. 在@EnableBinding()绑定咱们上面那个接口
  2. 使用 @StreamListener 做监听的时候,需要指定 LogStream.INPUT_LOG,然后写消息的时候使用OUTPUT_LOG。

我们只需要改造下消息消费者服务就可以了

3.1 改造消息消费者服务

3.1.1自定义类似Source Sink的接口
public interface LogStream {

    String INPUT_LOG_NAME="inputLogChannel";
    String OUTPUT_LOG_NAME="outputLogChannel";
    @Input(INPUT_LOG_NAME)
    SubscribableChannel inputLogChannel();
    @Output(OUTPUT_LOG_NAME)
    MessageChannel outputLogChannel();
}
3.1.2 application.yml
server:
  port: 7086

spring:
  application:
    name: spring-cloud-stream-consumer
  cloud:
    stream:
      # 需要指定一个默认的binder,不然会报错
      default-binder: spring-clould-stream-binder
      binders:
        # 给这个binder起个名字
        spring-clould-stream-binder:
          # mq的类型,如果是kafka的话就是kafka
          type: rabbit
          # 配置mq的信息
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
        spring-clould-stream-xxx-binder:
          # mq的类型,如果是kafka的话就是kafka
          type: rabbit
          # 配置mq的信息
          environment:
            spring:
              rabbitmq:
              	# 另一个mq的ip
                host: xxx.xxx.xxx.xx
                port: 5672
                username: guest
                password: guest


      # 这里就是将通道与binder进行绑定
      bindings:
        # 定义output,因为我们是消息生产者,需要将消息写到channel中
        inputLogChannel:
          # 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
          destination: springCloudStreamStudyTopic
          # 传输内容的格式,也就是消息的格式,如果是json的话 application/json
          content-type: text/plain
          # 绑定的binder
          binder: spring-clould-stream-binder
        outputLogChannel:
          # 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
          destination: topicA
          # 传输内容的格式,也就是消息的格式,如果是json的话 application/json
          content-type: text/plain
          # 绑定的binder
          binder: spring-clould-stream-xxx-binder

eureka:
  client:
    service-url:
      defaultZone: http://EurekaServerA:9090/eureka,http://EurekaServerB:9091/eureka
    fetch-registry: true
    register-with-eureka: true
  instance:
    prefer-ip-address: true # 使用ip注册
    #自定义实例显示格式,添加版本号
    instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}:@project.version@


我们配置了2个binder ,然后将不同的通道绑定到不同的binder上面。

3.1.3消息消费编码

这里我们将从 a mq拿到的消息又投递到了 b mq中

@EnableBinding(LogStream.class)
public class MultiMessageCousumerService {
    @Autowired
    private LogStream logStream;
    @StreamListener(LogStream.INPUT_LOG_NAME)
    public void receiveMes(Message<String> mes){
        String receiveMsg = mes.getPayload();

        System.out.println("接受到的msg:"+receiveMsg);

        logStream.outputLogChannel().send(MessageBuilder.withPayload(receiveMsg+" ~").build());
    }
}

3.2 测试

将Eureka Server 集群先提起来,然后将这个消息消费者服务提起来,最后使用消息生产者服务的单元测试生产消息


我们再来看看这两个mq:

4.消息分组

现在,使用spring cloud stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不允许的,我们这时候需要给消息消费者加个分组信息,这个多个消费者实例在一个组下面就不会再出现消息重复消费。
我们这时候需要配置一下这个消息消费者的application.yml

spring:
  application:
    name: spring-cloud-stream-consumer
  cloud:
    stream:
      # 绑定mq信息,这里我们绑定的是rabbitmq
      binders:
        # 给这个binder起个名字
        spring-clould-stream-binder:
          # mq的类型,如果是kafka的话就是kafka
          type: rabbit
          # 配置mq的信息
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      # 这里就是将通道与binder进行绑定
      bindings:
        # 定义output,因为我们是消息生产者,需要将消息写到channel中
        input:
          # 使用消息队列名字,在kafka就是topic的名字,然后rabbitmq的话就是Exchange 的名字
          destination: springCloudStreamStudyTopic
          # 传输内容的格式,也就是消息的格式,如果是json的话 application/json
          content-type: text/plain
          # 绑定的binder
          binder: spring-clould-stream-binder
          # 定义分组
          group: studyGroup

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服