带有Spring Cloud Stream的简单事件驱动微服务
事件驱动的体系结构很棒。但是如果没有框架,编写与流行的事件消息传递平台一起使用所需的脚手架可能会很麻烦。在本文中,我们将介绍如何使用Spring Cloud Stream简化代码。
问题
您只想为事件驱动的应用程序编写逻辑,但是样例消息代码可能会妨碍您的工作。将您的应用程序连接到消息传递服务非常棘手,如果您是企业开发人员,则可能需要使用多种消息传递技术(本地或云中)。
解决方案
让灵活的消息抽象处理复杂的消息平台集成,以便您可以专注于编写简单的干净业务逻辑。Spring Cloud Stream是一个不错的选择。它通过一个易于使用的API统一了许多流行的消息平台,其中包括RabbitMQ,Apache Kafka,Amazon Kinesis,Google PubSub,Solace PubSub +,Azure Event Hub和Apache RocketMQ。它甚至消除了这些平台之间的方法和功能上的细微差异(例如分区或交换),使您可以自由创建创新的事件驱动解决方案。
在接下来的演示中,您将确切看到Spring Cloud Stream的巧妙抽象如何帮助使事件流代码更整洁,更容易使用。您还将看到使用Spring Cloud Stream的,在两个不同的消息传递平台( RabbitMQ或Kafka )之间切换是多么简单。 binding
库。
在你开始前
这些事件驱动的微服务需要在PC上安装以下最新的应用程序[^ 1]:
运行演示
首先,从GitHub克隆代码存储库。为此(如果已安装Git),请打开一个新的终端窗口,然后发出以下命令。如果您尚未安装Git,请下载并解压缩该zip文件 。
git clone https://github.com/benwilcock/spring-cloud-stream-demo.git
检查代码后,您会注意到该存储库包含两个微服务。
-
的
Loansource
微服务(在/loansource
夹)。此微服务充当事件消息的源。这些事件是Loan
类似于您在银行和金融界看到的应用程序。每笔贷款都有一个“名称”,“金额”和“状态”(设置为PENDING
首先)。 -
的
Loancheck
微服务(在/loancheck
夹)。此微服务充当Loan
处理器。它检查哪些贷款是好的贷款,并将其分类APPROVED
要么DECLINED
状态。
要运行演示,请按照以下说明进行操作。
步骤1:启动Messaging Server
在新的终端窗口中,转到项目的根文件夹,然后发出以下命令。
您需要在系统上安装并运行“ Docker” ,此脚本才能按要求正常运行
docker-compose
。
./start-servers.sh
该脚本将启动Kafka和RabbitMQ并将日志输出从这两者输出到终端窗口(除非您以Ctrl-C
)。按下时服务器不会停止Ctrl-C
-他们将继续在后台运行。一旦启动,这些服务器将全部对计算机上运行的应用程序可用。
第2步:在Kafka或RabbitMQ模式之间选择
在接下来的步骤3和4中,我们必须用-P
以及我们要使用的消息传递平台的名称。
- 对于Kafka ,使用:
-Pkafka
- 对于RabbitMQ ,使用:
-Prabbit
如果您省略-P
完全设置,然后使用Kafka。
注意:此演示不是为了在Kafka和RabbitMQ之间“桥接”消息而设计的,因此在编译和运行它们时,请确保在两个应用程序的每个中选择相同的配置文件名称。如果桥接邮件系统是您的目标, 请参阅此处的文档 。
步骤3:产生一些贷款事件
在新的终端窗口中, /loansource
目录使用当前目录cd
,然后发出以下命令替换
使用您要运行的模式( kafka
要么rabbit
模式,如上述步骤2所述)。
./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>
一旦loansource
应用程序已启动,在终端窗口中,您应该每秒都看到一条消息,告诉您新的Loan事件已经发布到了PENDING
州。保持该微服务运行,然后继续下一步。
步骤4:处理贷款事件
在另一个新的终端窗口中, /loancheck
目录当前目录,然后发出以下命令,再次替换为
使用您要运行的模式。
./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>
一旦loancheck
应用程序已启动,在终端窗口中,您应该每秒都看到一条消息,告诉您新PENDING
已从消息传递平台读取了贷款申请,或者APPROVED
要么DECLINED
。如果您想了解这些应用程序是如何构建的,请跳至“工作原理”。
步骤5:停止演示
完成微服务后,在每个终端窗口中/loansource
和/loancheck
微服务出版社Ctrl-C
。该应用程序将停止,事件处理将停止。
如果要在Kafka和Rabbit之间切换模式,只需返回步骤2并重复该过程即可。
如果您已经完全完成了演示,并且还想停止Kafka和RabbitMQ服务器,请在项目根文件夹的终端窗口中运行
./stop-servers.sh
脚本。如果您只是在模式之间进行切换,则没有必要。
怎么运行的
Maven配置文件(在每个项目的pom.xml
)控制在构建时将哪些Spring Cloud Stream绑定添加为依赖项。当你选择-Pkafka
然后[spring-cloud-stream-binder-kafka][kafka]
依赖项已添加到项目中。当你选择-Prabbit
然后[spring-cloud-stream-binder-rabbit][rabbit]
依赖已添加。
<profiles>
<profile>
<id>kafka</id>
<properties>
<spring.profile.activated>kafka</spring.profile.activated>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
</dependencies>
</profile>
...
<profiles>
您对Maven个人资料的选择也会影响spring.profiles.active
物业src/main/resources/application.properties
该文件会切换您看到启动时间的标题。
贷款微服务
对于Loansource misroservice,我们正在使用Spring Cloud Stream v2.1中的一项新功能-Spring Cloud Function支持 。有了这项新功能,获得LoansourceApplication
微服务充当Loan
消息是要声明一个@Bean
生成并返回一个Supplier<>
。在这种情况下Supplier
类型的Loan
。函数方法代码看起来像这样……
@Bean
public Supplier<Loan> supplyLoan() {
return () -> {
Loan loan = new Loan(UUID.randomUUID().toString(), "Ben", 10000L);
LOG.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
return loan;
};
}
Supplier<>
是Java函数数据类型。因为只有一个@Bean
返回此类型的方法,Spring Cloud Stream确切知道下一步该做什么。默认情况下,它将每秒触发一次此功能并将结果发送到默认值MessageChannel
命名为“输出”。这种功能方法的好处是它仅包含业务逻辑,因此您可以使用常规单元测试对其进行测试。
我们可以使用
spring.cloud.function.definition
物业application.properties
文件以明确声明我们要绑定到绑定目标的功能Bean-但对于只有一个的情况@Bean
定义,这是没有必要的。如果我们要使用其他轮询间隔,则可以使用
spring.integration.poller.fixed-delay
物业application.properties
文件。
Loancheck微服务
的loancheck
微服务需要更多的代码,但不需要太多。它的工作是对Loan
事件进入单独的渠道。为了做到这一点,它订阅了来自来源output
主题,然后将其发送到approved
要么declined
基于贷款价值的主题,类似于欺诈检查工具。
因为我们使用了3个消息传递通道(一个入站和两个出站),所以LoanProcessor
接口用于阐明输入和输出。当前,它看起来像这样:
@Component
public interface LoanProcessor {
String APPLICATIONS_IN = "output"; // Topic where the new loans appear
String APPROVED_OUT = "approved"; // Topic where the approved loans are sent
String DECLINED_OUT = "declined"; // Topic where the declined loans are sent
@Input(APPLICATIONS_IN)
SubscribableChannel sourceOfLoanApplications();
@Output(APPROVED_OUT)
MessageChannel approved();
@Output(DECLINED_OUT)
MessageChannel declined();
}
这个LoanProcessor
接口首先在@SpringBootApplication
类( LoanCheckApplication.java
)作为参数@EnableBinding()
注释,如下所示。
@SpringBootApplication
@EnableBinding(LoanProcessor.class)
public class LoanCheckApplication {
public static void main(String[] args) {
SpringApplication.run(LoanCheckApplication.class, args);
}
}
另外, Spring @Component
叫做LoanChecker.java
用这个构造LoanProcessor
在运行时。此外,该组件的checkAndSortLoans(Loan)
每当有新的Loan
事件到来是因为它被注释为@StreamListener()
为了LoanProcessor.APPLICATIONS_IN
渠道。您可以看到以下代码示例中正在使用此注释。
@StreamListener(LoanProcessor.APPLICATIONS_IN)
public void checkAndSortLoans(Loan loan) {
if (loan.getAmount() > MAX_AMOUNT) {
loan.setStatus(Statuses.DECLINED.name());
processor.declined().send(message(loan));
} else {
loan.setStatus(Statuses.APPROVED.name());
processor.approved().send(message(loan));
}
}
然后,此方法对Loan
使用简单业务逻辑的对象。根据排序结果,它会将它们继续发送给processor.approved()
频道或processor.declined()
渠道(相应地设置其贷款状态后)。
包起来
如您所见,使用Spring Cloud Streams时分离的关注点确实非常健康。在两种微服务中,特定于Kafka或RabbitMQ的代码绝对为零。这使我们能够专注于业务逻辑,而与消息传递平台无关,而您只需更改项目的“绑定”依赖关系,即可轻松交换消息传递平台。 pom.xml
。
还有更多…
您可以看到事件通过消息传递平台流动,如下所示:
-
对于Kafka ,可以使用KafDrop工具
localhost:9000
可用于观察主题和事件消息。无需登录。 -
对于RabbitMQ ,可以在以下位置找到Rabbit Management Console:
localhost:15672
可用于观察交流和事件消息。要登录的用户名是guest
密码也是guest
。要观察实际的消息内容,您可能需要手动创建一个队列,然后使用以下命令将其绑定到所需的主题#
身为你的routing key
。
要了解有关Spring Cloud Stream的最新信息,请访问Spring网站上的项目专用项目页面 。
要从头开始创建自己的Spring项目,请使用位于start.spring.io的项目配置器 。
如果您想深入了解Spring和纯Kafka,请查看以下出色的博客文章:
-
加里·罗素(Gary Russell):Apache Kafka深入研究的 Spring :错误处理,消息转换和事务支持
-
Soby Chacko:适用于Apache Kafka的Spring深入探讨:Apache Kafka和Spring Cloud Stream
-
脚注
[^ 1]:使用Maven , Spring Boot和Spring Cloud Stream编写和打包此存储库中的微服务代码。在运行时,代码依赖于Kafka , Zookeeper , RabbitMQ和KafDrop (由Obsidian Dynamics提供的Docker映像)。此列表中的所有内容均已为您提供-无需安装它们。