Spring Cloud Stream 使用

/ java / 没有评论 / 15浏览

Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架. 可以基于Spring Boot来创建独立的、可用于生产的Spring应用程序. 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream为一些供应商的消息中间件产品提供个性化的自动化配置实现, 并引入发布-定于、消费组以及分区三个核心概念.

简单使用
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <scope>compile</scope>
    </dependency>
@EnableBinding(Sink.class)
public class SinkReceiver {

    private static Logger logger = LoggerFactory.getLogger(StreamHelloApiApplication.class);

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        logger.info("Received: " + payload);
    }

}

@EnableBinding 指定一个或多个定义@Input@Output注解的接口, 以此实现对消息通道Channel的绑定.

@StreamListener 将被修饰的方法注册为消息中间件上数据流的事件监听器.

启动程序

通过RabbitMQ提供的管理界面发送消息, 可以看到程序中接收到消息.

绑定器

Binder绑定器是Spring Cloud Stream中一个重要的概念. 在没有绑定器的时候, Spring Boot要直接与消息中间件交互的时候, 由于各个消息中间件构建初衷不同, 所以在实现细节上会有较大的差异, 当替换中间件或者升级时, 就需要付出非常大的代价.

通过定义绑定器作为中间件, 完美的实现了应用程序与消息中间件细节之间的隔离. 通过向程序暴漏同意的Channel通道, 使程序不需要考虑各种不同的消息中间件实现. 需要升级或者替换其他中间件的时候, 只需要更换对于的Binder绑定器而不需要修改任何Spring Boot应用逻辑.

发布-订阅模式

Spring Cloud Stream 中消息通信方式遵循发布-订阅模式, 当一条消息被投递到消息中间件, 通过共享的Topic主题进行广播, 消息消费者在订阅主题中受到它并触发自身业务逻辑处理. Topic主题时Spring Cloud Stream中一个抽象概念, 用来代表发布共享消息给消费者的地方, 在不同的消息中间件中, Topic可能对于不同的概念. 比如RabbitMQ中, 对于Exchange, 而在Kafka中对应KafkaTopic

消费组

如果同一个主题上的应用需要启动多个实例, 通过spring.cloud.stream.bindings.input.group属性为应用指定一个组名, 这样这个应用的多个实例在接收到消息的时候, 只会有一个成员真正接收到消息并处理. 默认没有为应用指定消费组的时候, Spring Cloud Stream会为其分配一个独立的匿名消费组.

消息分区

用于解决当生产者将消息数据发送给多个消费者实例, 保证拥有共同特征的消息数据始终由同一个消费者实例接收和处理. Spring Cloud Stream为分区提供通用的抽象实现, 用来在消息中间件的上层实现分区处理, 所以对于消息中间件自身是否实现了消息分区并不关心.