领域事件及事件总线EventBus使用实践
在过去的 30 多年,就已经有领域建模和设计的思潮;Eric Evans 将其定义为领域驱动设计(Domain-Driven Design,简称DDD)。领域模型是领域驱动的核心,而领域事件又作为领域模型中的重要模块,解决了开发者日常开发中的很多痛点,比如,代码耦合降低,拓展性增强。
@大鹏开源:别看我有点萌,我可以秒变大鹏😄
了解 DDD 与领域事件
领域模型不是高大上的东西,所有的领域模型抽象都来自于具体的业务及业务的需求,而脱离业务需求的应用设计是没有任何价值的!
比如在Today的新零售电商架构中:门店、采购、订单、供应商、物流、商品、台账等等都是应用设计中的不同领域模型,必然还存在或多或少的子域模型。而对于技术人员来说,这些抽象出来的领域,就代表应用架构存在若干子系统。
系统与系统间,势必会存在某些关联。比如说A领域“发生某件事情”、“当什么产生变化的时候”、“如果什么状态变更”…,都将可能成为B领域所要关心的事件。
事件总线
我们将发出事件通知的一方称为发送者 (Publisher) ,关心事件的一方称为订阅者 (Subscriber)。
关心一件事,便会收集这件事情相关的信息。而这些都将会转换为消息流,在订阅这件事情的领域间传播,一旦命中所要关心的事情,就由订阅者自行去处理接下来的事情。
以上eventbus示意图大致流程是这样的:
- 服务接口触发事件
- eventbus 分发事件,如果存在领域内订阅者,直接分发到指定订阅者,再将事件消息存库定时发送至 kafka
- 如果不存在领域内订阅者,事件消息直接存库并定时发送 kafka
- 消息在发送成功以后会被清除,为了保证事务的一致性。建议事件db共享业务数据源
- 订阅者只需要订阅事件双方规约好的 topic 和事件类型就可以命中需要的事件消息
引入事件的依据
很多业务场景下,我们可能需要在某件事情完成后,根据业务完成状态来做业务路由。
比如说商品领域的的变价审核业务,在商品变价审核通过之后,对应的商品价格也随之生效;价格的变动可能会引起采购,供应商,抑或者门店等领域作出响应的调整。
而我们在代码中通常这样去描述与以上类似的业务:
1 | val code = updateSkuPriceApproveStatus(skuId,status) |
从上面可以看出,当主线业务遭遇某个状态时,需要第三方系统作出应对,我们在主线任务中加入了冗长的代码甚至引入别人的 api ,这使得我的单个业务变的臃肿、过度耦合、不易阅读。恰好领域事件帮助我们更加优雅的解决了这个问题!
当引入事件后,do A 将变成了 send eventA
实践事件总线
在 today 中台服务团队的各领域实践中,已经开始投产 eventbus ,并且效果可观,三方系统的订阅对接相当便捷.那这样的事件机制该如何去使用?
为了给第三方系统和本部门的业务开发人员提供一致性的开发体验,我们将事件总线从dapeng的框架中剥离出来, 单独提供了一套类库用于实现事件的发布以及订阅。
- 事件总线eventBus的核心库
1 | "com.today" % "event-bus_2.12" % "0.1-SNAPSHOT" |
- 事件内容及状态暂存支持
- 需要在业务数据库加入一张如下结构数据表,这将作为事件消息的暂存队列和事件发送状态存储表
1 | CREATE TABLE `common_event` ( |
这里以商品变价审核的状态变更为例,了解在开发中如何做事件发送与订阅.
具体来说,就是四步:
- 定义事件结构体
- 在服务接口方法中声明待发布的事件
- 通过EventBus发布事件
- 通过EventBus接收事件
下面具体说明一下每个步骤:
定义事件结构体
1.事件收发双方共同协定定义事件消息的内容, 一个领域的所有消息定义都在同一个独立的idl文件中. 这个idl文件应该放在发布者的API包中.
2.我们的事件对象需要定义一个事件 id (建议通过分布式取号服务来获取), 订阅者可以自己决定是否需要用这个事件 id 来做消息的幂等处理
==> goods_events.thrift
1 | namespace java com.today.api.goods.events |
声明待发布事件
- 秉承代码及文档一致的理念,所有的服务都会在统一的文档站点进行开放展示,每个服务/每个接口的描述,包括出入参都一目了然.
- 我们在服务接口方法里面声明需要发布的事件,这些事件清单将会在文档站点对应的服务方法中得到展示,减少服务开发人员的沟通成本,一看便知。
== >goods_service.thrift
1 | namespace java com.today.api.goods.service |
- 在文档站点方法上效果如下:
- 显示独立的事件清单
注:如果想要了解更多有关文档站点的内容,请留意后期的 dapeng 文档站点专题
- 定义事件发布任务idl
==> goods_event_task.thrift
1 | namespace java com.today.api.goods.service |
- 为发布任务服务提供以下实现模版
==> task/GoodsScheduledServiceImpl.scala
1 |
|
关键性的bean配置
所有的事件消息,最终都会发送到 kafka 的队列中,等待订阅者消费;所以每一个配置都将必不可少。
==> spring/services.xml
1
2
3
4
5
6
7<!--messageTask 事件发布任务bean-->
<bean id="messageTask" class="com.today.eventbus.scheduler.MsgPublishTask">
<constructor-arg name="topic" value="${KAFKA_TOPIC}"/>
<constructor-arg name="kafkaHost" value="${KAFKA_PRODUCER_HOST}"/>
<constructor-arg name="tidPrefix" value="${KAFKA_TID_PREFIX}"/>
<constructor-arg name="dataSource" ref="tx_goods_dataSource"/>
</bean>topic
kafka 消息 topic,领域区分(建议:domain_version_event
)kafkaHost
kafka 集群地址(如:127.0.0.1:9091,127.0.0.1:9092)tidPrefix
kafka 事务 id 前缀,领域区分dataSource
使用业务的 dataSource服务配置
==> config_user_service.properties
1
2
3
4# event config
KAFKA_TOPIC=goods_1.0.0_event
KAFKA_PRODUCER_HOST=127.0.0.1:9092
KAFKA_TID_PREFIX=goods_0.0.1
通过EventBus发布事件
- 在做事件触发前,你需要实现
AbstractEventBus
,来做好自定义的本地监听分发
==> commons/EventBus.scala
1 | object EventBus extends AbstractEventBus { |
- 交由 spring 托管
==> spring/services.xml
1
2
3<bean id="eventBus" class="com.today.service.commons.EventBus" factory-method="getInstance">
<property name="dataSource" ref="tx_goods_dataSource"/>
</bean> - 事件发布
1
EventBus.fireEvent(RegisteredEvent(event_id,user.id))
通过EventBus接收事件
- 对于领域内事件订阅者
EventBus
的 dispatchEvent
方法提供领域内订阅者的事件分发,以便本地订阅者可以订阅到关注的事件消息。这些领域内的订阅者,只需要在 dispatchEvent
中模式匹配进行分发。是不是已经是相当的简洁呢?
1 | ... |
对于跨领域事件订阅者
依赖
1 | <!--if => maven project--> |
- 注解扫描支持配置:
1
<bean id="postProcessor" class="com.today.eventbus.MsgAnnotationBeanPostProcessor"/>
- 订阅事件消息
同一个领域的事件在同一个消费者类中处理1
2
3
4
5
6
7
8
9
10
11
12
13// java
)
public class GoodsEventsConsumer {
public void subscribeSkuAttributeUpdateApprovedEvent(SkuAttributeUpdateApprovedEvent event) {
System.out.println(event.skuId);
// do somthing
}
...
}
//scala
serializer = classOf[RegisteredEventSerializer]
... GoodsEventsConsumer
也需要在 spring 上下文中托管。
@KafkaConsumer
groupId
:kafka Consumer groupId,领域区分topic
:kafka 消息 topickafkaHostKey
:- 可自行配置的 kafka 地址,默认值为
dapeng.kafka.consumer.host
,可以自定义覆盖默认值 - 用户只要负责把这些配置放到 env 或者 properties 里面
- 如:
System.setProperty("kafka.consumer.host","127.0.0.1:9092");
- 可自行配置的 kafka 地址,默认值为
@KafkaListener
- serializer 事件消息解码器,由事件发送方提供.
领域内消费事件与跨领域消费事件的不同
通过以上已经知道在事件中,存在领域内的订阅者消费事件消息,也可能存在跨领域的事件订阅者消费事件消息。下面将分析这两者的不同:
- 领域内的事件订阅者,通常是不能脱离领域的存在,存在领域内强关系的,但又需要解耦!
- 而跨领域的事件消息订阅,通常只需保证最终一致性,他们相对事件发送方没有强依赖关系。
需要注意的是:在 eventbus 中,领域内消费事件之后还是会将事件消息广播出去。因为不能保证不会有其他领域对发生的事件感兴趣!
如商品领域的商品变价审核通过后,触发了审核通过事件:
- 事件触发后将使价格生效,这部分生效操作可以通过领域内的事件订阅进行解耦。
- 因为更新了商品价格,可能存在库存系统或者其他业务系统对商品数据敏感,可以通过跨领域事件发送-订阅,做商品的数据推送
附:binlog-kafka动态缓存更新支持
@BinlogListener
- eventbus将订阅者 api 进行了有趣的拓展,加入binlog-kafka动态缓存更新支持
- 使用方法与事件的订阅者方法类似,唯一的不同就是你不再需要消息解码器!
1 |
|
总结
总体来说,EventBus 的引入对于开发者而言,不论是事件的发送还是订阅,都是易用的,没有多余的配置。对于第三方系统的支持也做的非常优秀,希望在日常开发中能够更加灵活的运用。尽量减少不必要的耦合!并能经受实践考验!
有关eventBus的具体实现细节,将由小伙伴 hz.lei 来进行剖析!
- hz.lei: DDD-事件总线实现架构原理分析
本文标题:领域事件及事件总线EventBus使用实践
文章作者:AwesomeYang
发布时间:2018-03-10
最后更新:2024-05-25
原始链接:https://struy.cn/2018/03/10/DDD-event-bus/
版权声明:未经允许禁止转载,请关注公众号联系作者