java
主页 > 软件编程 > java >

SpringCloudStream原理和深入使用介绍

2024-06-20 | 佚名 | 点击:

Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。

应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:Spring Cloud Stream能够屏蔽底层消息中间件【RabbitMQ,kafka等】的差异,降低切换成本,统一消息的编程模型。

相关概念

Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。

在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。

Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。

Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。

**消费者组:**在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息)

分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理

Spring Message

Spring Message是Spring Framework的一个模块,其作用就是统一消息的编程模型。

1

2

3

4

5

package org.springframework.messaging;

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

消息通道 MessageChannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:

1

2

3

4

5

6

7

8

@FunctionalInterface

public interface MessageChannel {

    long INDEFINITE_TIMEOUT = -1;

    default boolean send(Message<?> message) {

        return send(message, INDEFINITE_TIMEOUT);

    }

    boolean send(Message<?> message, long timeout);

}

消息通道里的消息由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅

1

2

3

4

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

由MessageHandler真正地消费/处理消息

1

2

3

4

@FunctionalInterface

public interface MessageHandler {

    void handleMessage(Message<?> message) throws MessagingException;

}

Spring Integration

Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

它提出了不少新的概念,包括消息路由MessageRoute、消息分发MessageDispatcher、消息过滤Filter、消息转换Transformer、消息聚合Aggregator、消息分割Splitter等等。同时还提供了MessageChannel和MessageHandler的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

Spring-Cloud-Stream的架构

img

快速入门

引入依赖

1

2

3

4

5

<!--stream-->

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

</dependency>

增加配置文件

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

spring:

    cloud:

        stream:

            # 定义消息中间件

            binders:

              MyRabbit:

                  type: rabbit

                  environment:

                    spring:

                        rabbitmq:

                            host: localhost

                            port: 5672

                            username: root

                            password: root

                            vhost: /

            bindings:

            # 生产者中定义,定义发布对象

              myInput:

                destination: myStreamExchange

                group: myStreamGroup

                binder: MyRabbit

            # 消费者中定义,定义订阅的对象

              myOutput-in-0:

                destination: myStreamExchange

                group: myStreamGroup

                binder: MyRabbit

        # 消费者中定义,定义输出的函数

        function:

            definition: myOutput

生产者

1

2

3

4

5

@Resource

    private StreamBridge streamBridge;

    public void sendNormal() {

        streamBridge.send("myInput", "hello world");

    }

消费者

1

2

3

4

5

6

7

8

9

@Bean("myOutput")

    public Consumer<Message<String>> myOutput() {

        return (message) -> {

            MessageHeaders headers = message.getHeaders();

            System.out.println("myOutput head is : " + headers);

            String payload = message.getPayload();

            System.out.println("myOutput payload is : " + payload);

        };

    }

如何自定义Binder

添加spring-cloud-stream依赖

1

2

3

4

5

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-stream</artifactId>

    <version>${spring.cloud.stream.version}</version>

</dependency>

提供ProvisioningProvider的实现

ProvisioningProvider负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

public class FileProvisioningProvider implements ProvisioningProvider<

    ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>> {

    public FileProvisioningProvider() {

        super();

    }

    @Override

    public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<FileProducerProperties> properties) throws ProvisioningException {

        return new FileMessageDestination(name);

    }

    @Override

    public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws ProvisioningException {

        return new FileMessageDestination(name);

    }

    private static class FileMessageDestination implements ProducerDestination, ConsumerDestination {

        private final String destination;

        private FileMessageDestination(final String destination) {

            this.destination = destination;

        }

        @Override

        public String getName() {

            return destination.trim();

        }

        @Override

        public String getNameForPartition(int partition) {

            throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");

        }

    }

}

提供MessageProducer的实现

MessageProducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

super.onInit();

        executorService = Executors.newScheduledThreadPool(1);

    }

    @Override

    public void doStart() {

        executorService.scheduleWithFixedDelay(() -> {

            String payload = getPayload();

            if (payload != null) {

                Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();

                sendMessage(receivedMessage);

            }

        }, 0, 50, TimeUnit.MILLISECONDS);

    }

    @Override

    protected void doStop() {

        executorService.shutdownNow();

    }

    private String getPayload() {

        try {

            List<String> allLines = Files.readAllLines(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));

            String currentPayload = allLines.get(allLines.size() - 1);

            if (!currentPayload.equals(previousPayload)) {

                previousPayload = currentPayload;

                return currentPayload;

            }

        } catch (IOException e) {

            FileUtil.touch(new File(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));

        }

        return null;

    }

}

提供MessageHandler的实现

MessageHandler提供产生事件所需的逻辑。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public class FileMessageHandler extends AbstractMessageHandler {

    FileExtendedBindingProperties fileExtendedBindingProperties;

    ProducerDestination destination;

    public FileMessageHandler(ProducerDestination destination, FileExtendedBindingProperties fileExtendedBindingProperties) {

        this.destination = destination;

        this.fileExtendedBindingProperties = fileExtendedBindingProperties;

    }

    @Override

    protected void handleMessageInternal(Message<?> message) {

        try {

            if (message.getPayload() instanceof byte[]) {

                Files.write(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"), (byte[]) message.getPayload());

            } else {

                throw new RuntimeException("处理消息失败");

            }

        } catch (IOException e) {

            throw new RuntimeException(e);

        }

    }

}

提供Binder的实现

提供自己的Binder抽象实现:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

public class FileMessageChannelBinder extends AbstractMessageChannelBinder

    <ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>, FileProvisioningProvider>

    implements ExtendedPropertiesBinder<MessageChannel, FileConsumerProperties, FileProducerProperties> {

    FileExtendedBindingProperties fileExtendedBindingProperties;

    public FileMessageChannelBinder(String[] headersToEmbed, FileProvisioningProvider provisioningProvider, FileExtendedBindingProperties fileExtendedBindingProperties) {

        super(headersToEmbed, provisioningProvider);

        this.fileExtendedBindingProperties = fileExtendedBindingProperties;

    }

    @Override

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<FileProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {

        FileMessageHandler fileMessageHandler = new FileMessageHandler(destination, fileExtendedBindingProperties);

        return fileMessageHandler;

    }

    @Override

    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws Exception {

        FileMessageProducerAdapter fileMessageProducerAdapter = new FileMessageProducerAdapter(destination, fileExtendedBindingProperties);

        return fileMessageProducerAdapter;

    }

    @Override

    public FileConsumerProperties getExtendedConsumerProperties(String channelName) {

        return fileExtendedBindingProperties.getExtendedConsumerProperties(channelName);

    }

    @Override

    public FileProducerProperties getExtendedProducerProperties(String channelName) {

        return fileExtendedBindingProperties.getExtendedProducerProperties(channelName);

    }

    @Override

    public String getDefaultsPrefix() {

        return fileExtendedBindingProperties.getDefaultsPrefix();

    }

    @Override

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {

        return fileExtendedBindingProperties.getExtendedPropertiesEntryClass();

    }

}

创建Binder的配置

严格要求创建一个 Spring 配置来初始化你的绑定器实现的 bean

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

@EnableConfigurationProperties(FileExtendedBindingProperties.class)

@Configuration

public class FileMessageBinderConfiguration {

    @Bean

    @ConditionalOnMissingBean

    public FileProvisioningProvider fileMessageBinderProvisioner() {

        return new FileProvisioningProvider();

    }

    @Bean

    @ConditionalOnMissingBean

    public FileMessageChannelBinder fileMessageBinder(FileProvisioningProvider fileMessageBinderProvisioner, FileExtendedBindingProperties fileExtendedBindingProperties) {

        return new FileMessageChannelBinder(null, fileMessageBinderProvisioner, fileExtendedBindingProperties);

    }

    @Bean

    public FileProducerProperties fileConsumerProperties() {

        return new FileProducerProperties();

    }

}

详细的代码见https://gitee.com/xiaovcloud/spring-cloud-stream

原文链接:
相关文章
最新更新