基于 Kafka 实现分布式事件驱动

事件驱动是一种灵活的系统设计方法,在事件驱动的系统中,当数据发生变化时系统会产生、发布一个对应的事件,其它对这个事件感兴趣的部分会接收到通知,并进行相应的处理。事件驱动设计最大的好处在我看来有两点:一是它为系统提供了很好的扩展能力,好比咱们能够对某类事件增长一个订阅者来对系统进行扩展,最主要的是咱们并不须要修改任何已有的代码,它彻底符合开闭原则;二是它实现了模块间的低偶合,系统间各个部分不是强依赖关系,而是经过事件把整个系统串联起来。
固然,任何事务都有两面性,事件驱动也有其很差的方面。首先,实现一套这样的系统复杂度就很高,对开发人员的要求也很高;再次,对系统的总体把控会很困难,想象一下面对几百个类别的事件,而且没有一个统一的地方可让咱们看到整个业务处理流程,会是什么心情?因此当咱们决定采用事件驱动实现系统中,必定要维护好相关的文档,并保持它们的有效性。
咱们再来看看事件驱动架构的一些其它的优势:
更好的响应性
事件驱动中,事件的响应是异步处理的,因此它具备更好的响应性。
更好的容错性
业务主流程在发布事件以后便结束了,扩展流程的延后处理能够异步不断的失败重试,直到成功为止,系统总体容错性更强。
设计篇
首先,咱们须要定义什么是事件?从业务角度看,事件包括如下属性:
事件的定义属性字段类型说明标识IDstring系统内部每一个事件都须要一个惟一的标识。类型eventTypestring数据发生变化产生事件,不一样类型的数据变化产生不一样类型的事件。好比会员下单、会员注册、用户修改手机号等等。时间eventTimedatetime即数据发生变化的时间。上下文contextstring事件发生时的上下文信息。好比会员修改手机号事件,须要原号码和新号码,会员 ID 等信息。
接下来,咱们看看如何设计一套基于事件驱动的系统,你知道设计模式中的观察者模式吗?
观察者模式 :定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知全部观察者对象,使它们可以自动更新本身。观察者模式 :定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知全部观察者对象,使它们可以自动更新本身。
观察者模式天生就是事件驱动的一个实现,可是直接使用它有不少的弊端。首先,它是基于主题的,有多少类事件就须要多少个主题类,这可能会致使类爆炸;其次,观察者模式是同步实现的,这样咱们可能会牺牲掉响应性和容错性等优点。
因此咱们须要对观察者模式稍做改进,咱们分别从事件发发布和消费两个方面来分析。
事件的发布
本文的标题是《基于 Kafka 实现事件驱动架构》,很明显,咱们使用 kafka 做为消息中间件来传递事件消息。因此,像修改会员手机号码的代码可能实现以下:
@Transactional(readOnly =false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)br/>@Override
publicvoidchangePhoneNumber(String newNumber){
userDao.updatePhone(this.getUserId(), newNumber);// 本地数据库修改java

// 发布 用户手机号码变动 事件
Event event =newEvent(...);// 建立一个事件对象,表示用户修改手机号码
ProducerRecord record =newProducerRecord(...event);// 根据 event 生成 kakfa recordmysql

Future<RecordMetadata> f = kafkaProducer.send(record);
try{
f.get();
}catch(InterruptedException | ExecutionException e) {
thrownewRuntimeException(e);
}
}算法

这段代码正确吗?从逻辑上看,它彻底正确。但从可靠性角度看它是有问题的。Kafka 和数据库是两个异构系统,咱们不能仅仅经过一个本地事务保证他们之间的数据一致性。例如,推送 Kafka 成功了,可是在提交 DB 事务的时候失败了呢(好比说事务超时滚)?这样 kafka 中就会存在一个脏数据,由于本地数据库事务已经回滚了。
分布式系统数据一致性一直就是复杂的问题,经常使用的方案有两阶段提交、三阶段提交、zookeeper 的 zab 协议、proxs、raft 等算法,这不是本文的重点。咱们采用一个简单易懂的方式来解决上面的问题。咱们引入一张 DB 事件表,在发布事件时将事件信息存入这个事件表,将事件的发布和业务处理包装在同一个本地事务中。
createtableifnotexistsevent_queue(
idbigintnotnullauto_incrementcomment'主键',
event_idchar(32)notnullcomment'事件 ID',
event_typechar(12)notnullcomment'事件类型',
event_timedatetimenotnullcomment'事件发生时间',
contextmediumtextnotnullcomment'事件内容',
primarykey(id),
uniquekey(event_id)
)engine=innodbdefaultcharset=utf8comment='事件队列表';sql

发布事件,就是向这个事件表中增长一条记录,修改会员手机号码的代码如今变成了:
@Transactional(readOnly =false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)br/>@Override
publicvoidchangePhoneNumber(String newNumber){
userDao.updatePhone(this.getUserId(), newNumber);// 本地数据库修改数据库

// 发布 用户手机号码变动 事件
Event event =newEvent(...);// 建立一个事件对象,表示用户修改手机号码
eventDao.insert(event);// 向事件表建立一条新记录。
}json

因为事件消息如今被暂存进了 DB,咱们还须要将它取出来推到 Kafka,为此咱们须要起一个线程,不断的读取事件表中的记录发送给 Kafka,并在成功发送以后将记录从 DB 中删除。若是删除 DB 的时候失败了,那么消息会被从新推送到 kafka,意为着咱们实现的是 At least once 的递交语义,对于业务上不接受重复的场景,在消费端须要作好幂等处理。
讲到这里,关于事件的分布已经接近尾声,但还有一个问题: 性能 。若是一个系统的负载很高,一秒内产生成千上万个事件,那咱们的事件表就会成为瓶颈,由于只用了一个线程来处理事件表向 Kafka 的推送,集群中只有一个实例能发辉做用,没法实现弹性。为了解决这个问题,咱们能够对事件表进行分表,并使用多线程并发处理,并且这些线程能够分布在不一样的集群实例中。但这样使设计变得更复杂了,如今咱们须要解决一个新的问题: 如何保证一个事件表,最多只被一个线程处理? 咱们须要保证一个事件表同一时刻只能被一个线程处理,同时在实例宕机后,其它实例能够起线程接替它的工做。这句话咱们换一种方式来描述更容易理解:
集群有 M 个实例,须要进行 N 个任务(任务是把事件分表中的事件信息推送到 kafka)
一个任务最多能够分配给 1 个实例,1 个实例能够同时执行多个任务。
若是一个实例宕机了,分配给它的任务须要从新在其它实例上分配。
N 个任务固定不变,实例能够动态增长或减小,须要实现实例之间的均衡负载。
若是你熟悉像 HBase、ES 这类分布式系统的话,不难理解咱们须要在集群中选出一个实例做为 Master,由它来负责任务在集群中的分配工做。咱们借助 Kookeeper,全部实例在启动时建立一个 EPHEMERAL 类型的 master 节点,建立成功的实例成为 Master,其它实例则监听 master 节点,当 Master 实例宕机后从新竞选。设计模式

每一个实例启动后,会在 workers 节点下建立一个临时节点,表示本身做为一个 Worker 加入集群;Worker 同时会监听本身建立的子节点,接收由 Master 分配给本身的任务。Master 会监听 workers 下子节点的变化,当实例下线或有新的实例加入集群中时,Master 会收到通知并从新进行任务的分配。分配的具体信息保存在 Worker 实例建立的子节点中,Master 经过直接修改这些子节点的内容实现分配。
从事件的发布来看,系统的架构是这样的:多线程

基于 Kafka 实现分布式事件驱动

这里有个细节须要说明: 由于 Kafka 只保证 partition 级别的有序性,咱们的事件分表数必须大于或等于 partition 的数量,不然事件的顺序得不到保证 。
事件的消费
由于咱们使用了 Kafka 做为事件消息中间件,事件的消费简单不少。每一个实例在启动时启一个 Kafka Consumer 便可,像实例间的负载、可用性、故障转移等等问题,Kafka 已经帮咱们解决了,咱们只须要从 Kafka 中获取事件消息,并通知相应的订阅者便可。架构

基于 Kafka 实现分布式事件驱动

订阅者须要实现 BaseSubscriber 接口,另外在启动时,须要把事件与订阅者的关系维护在 SubscriberConfig 类中:
BaseSubscriber sub = ...// your implementation
SubscriberConfig.instance().addSubscriber("event_type", sub);并发

系统总体的设计是面向扩展的,咱们能够经过调整集群应用实例数、事件表分表数量和 kafka partitions 数量来提升系统总体的吞吐量。事件表分表越多,事件消息从 DB 到 kafka 的延迟就更低;应用实例越多,系统单位时间内能承受的事件上限也越多,另外也能更好的负载 kafka 消息的消费。
每个应用,做为事件发布者,其产生的事件最终都被推送到一个 Kafka Topic;但做为消费者,能够订阅不一样的 Topic,这些 Topic 能够是本身的推送的,也能够是其它应用推送的事件。
实现篇
这里咱们只对部分核心代码做一个简单的介绍:
SimpleLock 是一个基于 Zookeeper 的简单分布式锁实现,咱们使用 SimpleLock 来实现 Master 的竞选。
EventSubmitter 是一个线程,负责把事件表中的事件信息推送到 Kafka broker。初始化时须要传入一个 int 参数,表示处理哪个事件分表。它被实现成一个响应中断的线程,由于当 Master 从新分配任务后,Worker 须要先停掉当前进行中的任务。
Master 类是 Master 实例的主要实现。实例在启动时会调 Master 类的 start 方法,Master 实例监听 workers 节点,当有新实例加入或实例下线时,Master 实例会调用 onWorkerChange 方法进行从新分配, onWorkerChange 方法实现了一个简单的分配算法,只有任务变动的 Worker 实例会收到分配通知。
Worker 类是 Worker 实例的主要实现,实例在启动时会调 Worker 类的 start 方法。集群中的每个实例都是 Worker,会在 workers 节点下建立一个临时的节点表示本身,同时监听该节点,接受 Master 分配给本身的任务。当 Worker 接收到分配通知时,会先中止当前在运行的全部任务,再根据 worker 节点的内容开始执行新分配的任务。
示例
来看一个具体的事例,假设咱们要以天为维度,统计天天的下单量和下单金额。如今,咱们已经有了订单表:
createtableifnotexistsorder(
order_idbigintnotnullauto_incrementcomment'主键',
user_idbigintnotnullcomment'客户 id',
order_timedatetimenotnullcomment'订单时间',
order_amountintnotnullcomment'订单金额,单位:分',
primarykey(order_id)
)engine=innodbdefaultcharset=utf8comment='订单表';

这个需求咱们能够简单的使用 sql 来作,好比:
selectdate(order_time)as day,count(*)as total_num,sum(order_amount)as total_amount from order
group bydate(order_time)
{1}

可是在生产环境中这么作每每不现实,好比性能问题、或者咱们对订单表作了分表、或者几个月前的数据库了备份,而你正好须要查询这些数据,等等。实现这个需求更好的方式是采用事件驱动,在下单的时候发布一个事件,而后异步的维护一个查询表,这样之间的种种问题都将不复存在。先建立一个查询表,以下:
createtableifnotexistsdaily_order_report(
idbigintnotnullauto_incrementcomment'主键',
daydatenotnullcomment'统计日',
order_numbigintnotnullcomment'订单数量',
order_totalbigintnotnullcomment'订单总金额,单位:分',
primarykey(id),
uniquekey(day)
)engine=innodbdefaultcharset=utf8comment='订单日报表';

在下单的时候,咱们须要发布一个 下单事件 :
@Transactional(readOnly =false, propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)br/>@Override
publicvoidcreateOrder(Order order){
orderDao.insert(order);

// 发布下单事件
publisher.publish("order_created",newDate(), order.json(), order.getUserId().intValue() % Configuration.instance().getNumOfEventTables());
}

以后,咱们须要实现一个订阅者,在接收到 下单事件 后,根据订单的日期作相应的统计:br/>@Component
publicclassDailyOrderReportSubscriberimplementsBaseSubscriber{

@Autowired
privateOrderRepos repos;

@Override
publicvoidonEvent(Event e){
Order order = Order.fromJson(e.getContext());
DailyOrderReport report = repos.selectDailyOrderReportByKey(newjava.sql.Date(order.getOrderTime().getTime()));

if(null== report) {
report =newDailyOrderReport();
report.setDay(newjava.sql.Date(order.getOrderTime().getTime()));
report.setOrderNum(1l);
report.setOrderTotal(newLong(order.getOrderAmount()));

repos.createDailyOrderReport(report);
}else{
report.setOrderNum(report.getOrderNum() +1);
report.setOrderTotal(report.getOrderTotal() + order.getOrderAmount());

repos.updateDailyOrderReport(report);
}
}
}

随机建立 10 个订单后,咱们的报表状况以下:
mysql>select*fromorder;
+----------+---------+---------------------+--------------+
| order_id | user_id | order_time | order_amount |
+----------+---------+---------------------+--------------+
| 21 | 3 | 2018-09-24 01:06:43 | 251 |
| 22 | 2 | 2018-09-24 01:06:43 | 371 |
| 23 | 5 | 2018-09-24 01:06:43 | 171 |
| 24 | 0 | 2018-09-24 01:06:43 | 904 |
| 25 | 3 | 2018-09-24 01:06:43 | 55 |
| 26 | 5 | 2018-09-24 01:06:44 | 315 |
| 27 | 8 | 2018-09-24 01:06:44 | 543 |
| 28 | 8 | 2018-09-24 01:06:44 | 537 |
| 29 | 2 | 2018-09-24 01:06:44 | 123 |
| 30 | 3 | 2018-09-24 01:06:45 | 938 |
+----------+---------+---------------------+--------------+
10 rows inset(0.00sec)

mysql>select*fromdaily_order_report;
+----+------------+-----------+-------------+
| id | day | order_num | order_total |
+----+------------+-----------+-------------+
| 2 | 2018-09-24 | 10 | 4208 |
+----+------------+-----------+-------------+
1 row inset(0.00sec)

mysql>

相关文章
相关标签/搜索