Spring Cloud Bus 使用笔记

/ java / 没有评论 / 200浏览

消息代理

消息代理(Message Broker) 是一种消息验证、传输、路由的架构模式. 
它在应用程序之间起到通信调度并最小化应用之间的依赖的作用,使得应用程序可以高效的解耦通信过程.

消息代理是一个中间件产品, 它的核心是一个消息的路由程序,用来实现接受和分发消息, 
并根据设定好的消息处理流来转发给正确的应用.它包括独立的通信和消息传递协议, 
能够实现组织内部和组织间的网络通信.设计代理的目的就是为了能够从应用程序中传入消息,
并执行一些特别的操作

RabbitMQ实现消息总线

Maven依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

简单配置(用户可以在RabbitMQ中创建, 默认用户名密码为guest)

    spring.application.name=rabbitmq-hello

    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=springcloud
    spring.rabbitmq.password=123456

创建消息生产者Sender, 通过注入AmqpTemplate接口的实例来实现消息的发送, AmqpTemplate接口定义了一套针对AMQP协议的基础操作. 在Spring Boot中会根据配置来注入其具体实现.

    @Component
    public class Sender {

        @Resource
        private AmqpTemplate amqpTemplate;

        public void send() {
            String context = "hello " + new Date();
            System.out.println("Sender: " + context);
            amqpTemplate.convertAndSend(context);
        }

    }

创建消息消费者Receiver. 通过@RabbitListener注解定义该类对hello队列的监听, 并用@RabbitHandler 注解指定对消息的处理方法.

    @Component
    @RabbitListener(queues = "hello")
    public class Receiver {

        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver: " + hello);
        }

    }

创建RabbitConfig用来配置队列、交换器、路由等高级信息. 此处使用最小化配置RabbitMQ配置完成一个基本的生产和消费.

    @Configuration
    public class RabbitConfig {

        @Bean
        public Queue helloQueue() {
            return new Queue("hello");
        }

    }

到此就可以启动测试生产消费过程.

整合Spring Cloud Bus

在Config Client中 引入Maven依赖, actuator模块用来提供刷新端点

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>

在配置文件中增加关于RabbitMQ的连接和用户信息

# /refresh更新配置
management.endpoints.web.exposure.include=*


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

启动程序访问 http://localhost:7002/actuator/bus-refresh 将会刷新配置信息

注: 推荐在Config Server中也引入Spring Cloud Bus, 将配置服务端也加入到消息总线中. 
    `/actuator/bus-refresh` 请求不再发送到具体服务实例上, 而是发送给Config Server, 并通过`destination`参数指定需要刷新配置的服务或实例.
Kafka实现消息总线

要使用kafka实现消息总线, 只需要把spring-cloud-starter-bus-amqp模块替换为spring-cloud-starter-bus-kafka 模块.

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bus-kafka</artifactId>
    </dependency>

默认配置将连接localhost服务端, 如需其他配置, 均采用spring.cloud.stream.kafka前缀完成配置.

实现原理

Spring Cloud Bus采用了Spring的事件驱动模型ApplicationEventApplicationListener来刷新配置

不想写, 睡觉....