消息事务是指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。这个特性在0.10.2的版本是不支持的,从0.11版本开始才支持。华为云DMS率先提供Kafka 1.1.0的专享版服务,支持消息事务特性。
支持事务消息有什么作用?消息事务是实现分布式事务的一种方案,可以确保分布式场景下的数据最终一致性。例如最常用的转账场景,小王 转账到小明,实际操作是小王账户减去相应金额,小明的账户增加相应金额,在分库分表的前提下,2个账户存储在不同的数据库中,这时需要分布式事务才能保证数据库一致性,单个数据库的事务无法保证跨库之间的原子性。如果小王账户先扣钱,再去发送消息到小明所在的数据库去通知增加钱,在没有事务消息的情况下,无论是先扣钱或者先发送通知增加钱,都会有数据不一致的问题,因为无法保证两者的原子性。而有了事务消息,可以保证发送通知与本地事务(扣钱)是一个原子操作,本地事务与发送通知可以同时成功或者同时失败,确保数据一致。
除了数据最终一致性外,还实现了消息Exactly once语义。所谓Exactly once语义是消息传递语义中最难实现的一种,包括At most once:最多一次(不会重复,但是可能丢失数据); At least once:至少投递一次(不会丢失,但是会导致重复)和Exactly once: 刚好一次(不丢不重),也即幂等性。Kafka的幂等性可以保证生产只对一个分区实现Exactl once语义,需要多个分区也实现这个语义,还需要引入消息事务确保原子性。
分布式事务介绍
当前系统架构主流是分布式架构与微服务架构,在这种架构下数据源不是单一的数据库,业务逻辑往往需要在多个数据库中实现原子操作,单个数据库中的强大的本地事务无法保证多节点原子操作。 此时需要分布式事务来确保数据的一致性。目前使用较多的分布式事务解决方案有几种:
1、XA事务:两阶段/三阶段提交
XA是由X/Open组织提出的分布式事务的规范。XA规范主要定义了(全局)事务管理器(Transaction Manager)和(局部)资源管理器(Resource Manager)之间的接口。XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。实现XA事务的关键是两阶段和三阶段提交协议。
两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。一般分为协调器C和若干事务参与者Si两种角色,这里的事务参与者就是具体的数据库,协调器可以和事务参与者在一台机器上,如下图
二阶段提交协议主要包括由2个阶段:第一个阶段为准备阶段(prepare),第二阶段为提交阶段。准备阶段由事务协调者向事务参与者发送prepare消息,各个参与者处理本地事务但不提交,然后向事务协调者返回事务状态。 提交阶段根据准备阶段各参与者的执行请求,协调者确定事务是提交或者回滚,向各个参与者发送命令。
二阶段提交协议主要的问题是在提交执行过程中,所有的参与者都需要听从协调者的统一调度,期间处于阻塞状态而不能从事其他操作,这样效率及其低下。特别是当协调者发出提交通知到部分参与者后宕机,其他参与者就会阻塞。
2PC的缺点:
性能问题:所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作
单点问题:协调者(TM)在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者(RM)会一直等待状态,无法完成其它操作。
数据一致性问题:协调者如果因为网络问题或者宕机了,向一部分参与者发送了commit,另一部分还没来得及发就宕机了,就会造成数据不一致。
局限性:只适用于数据库层面的分布式事务
针对二阶段提交存在的问题,三阶段提交协议在prepare与commit阶段之间增加一个pre-commit阶段。Prepare阶段只询问参与者而不做事务,而在pre-commit阶段各个参与者才会执行本地事务但不提交。Commit阶段就是直接提交。这样做可以避免二阶段当协调者迟迟没有发出commit或者rollback通知,参与者在超时后可以自行提交或者回滚,避免阻塞事务(这是因为经过了prepare阶段已经确认了各个参与者是可以执行的,最后第三阶段直接执行即可)。 三阶段提交也存在很多问题,也不能完全保证数据一致,完全一致需要用到Paxos算法。
优点:引进了超时机制,变相的优化了参与者同步阻塞的范围,也避免了协调者单点问题,阶段3协调者出现问题参与者也会提交事务
缺点:依旧存在数据不一致问题,阶段三假设协调者发出回滚命令,而某个参与者因网络问题没有收到命令,受机制影响也会提交事务,此时数据就不一致了;同时因为多引进了一个阶段,性能方面也会有所下降
SAGE协议
Saga模式
Saga模式属于长事务解决方案,其核心思想把一个分布式事务拆分为多个本地事务,每个本地事务都有相应的执行模块和补偿模块,当Saga事务中任意一个本地事务出错时,可以通过调用相关的补偿方法恢复之前的事务,达到事务最终一致性。
Saga模式由三部分组成:
LLT(Long Live Transaction):由一个个本地事务组成的事务链。
本地事务:事务链由一个个子事务(本地事务)组成,LLT = T1+T2+T3+…+Ti。
补偿:每个本地事务 Ti 有对应的补偿 Ci。
在业务流程中,正常情况下每个参与者都在一阶段提交本地事务,按照T1->T2->T3->…->Ti的顺序执行。当出现异常时,则会发起补偿,将之前提交的事务回滚,执行顺序变成T1->T2->T3->C3->C2->C1。如下图所示:
Saga模式的恢复其实有两种:向后恢复(Backward Recovery)和向前恢复(Forward Recovery)
向后恢复(Backward Recovery):撤销掉之前所有成功子事务。如果任意本地子事务失败,则补偿已完成的事务。如异常情况的执行顺序T1,T2,T3,..Ti,Ci,…C3,C2,C1。
向前恢复(Forward Recovery):即重试失败的事务,适用于必须要成功的场景,该情况下不需要Ci。执行顺序:T1,T2,…,Tj(失败),Tj(重试),…,Ti。
Saga模式满足事务的ACD三个特性:
原子性:Saga协调器协调事务链中的本地事务要么全部提交,要么全部回滚
一致性:Saga事务可以实现最终一致性
持久性:基于本地事务,所以这个特性可以很好实现
但是缺乏隔离性会引发脏读、幻读和不可重复读等问题,因此需要在业务设计上去解决这个问题,通常在应用层面通过逻辑锁或者串行化操作来确保读取数据的准确性。
实现Saga的注意事项:
(1) Ti和Ci必须是幂等的。如向后恢复和向前恢复时候如果不是幂等操作会导致数据不一致。
(2) Ci必须是能够成功的,如果无法成功则需要人工介入。
(3) Ti->Ci和Ci->Ti的执行结果必须是一样的。
TCC补偿性事务解决方案
TCC分别对应Try、Confirm和Cancel三种操作,含义如下:
Try:预留业务资源
Confirm:确认执行业务操作,执行事务
Cancel:取消执行业务操作
TCC解决了跨应用业务操作的原子性问题,在诸如组合支付、账务拆分场景非常实用。
TCC实际上把数据库层的二阶段提交上提到了应用层来实现,对于数据库来说是一阶段提交,规避了数据库层的2PC性能低下问题。
TCC需要业务提供使用,代码侵入性强,开发复杂和成本高。
非强一致性,属于补偿事务,实现最终一致
对于 TCC 的工作机制,我们举一个比较简单的例子。在一个理财 App 中,用户通过账户余额购买一个理财产品,这里涉及两个事务操作:
在账户服务中,对用户账户余额进行扣减。
在理财产品服务中,对指定理财产品可申购金额进行扣减。
这两个事务操作在微服务架构下分别对应的是两个不同的微服务,以及独立的数据库操作,在 TCC 的工作机制中,首先针对账户服务和理财产品服务分别提供 Try、Confirm 和 Cancel 三个方法。
在账户服务的 Try 方法中对实际申购金额进行冻结,Confirm 方法把 Try 方法冻结的资金进行实际的扣减,Cancel 方法把 Try 方法冻结的资金进行解冻。
理财产品服务的 Try 方法中将本次申购的部分额度进行冻结,Confirm 方法把 Try 方法冻结的额度进行实际扣减,Cancel 方法把 Try 方法中冻结的额度进行释放。
在一个主业务方法中,分别调用这两个服务对外提供的处理方法(资金扣减、理财产品可申购额度扣减),这两个服务处理实际业务时,会先调用 Try 方法来做资源预留,如果这两个方法处理都正常,TCC 事务协调器就会调用 Confirm 方法对预留资源进行实际应用。否则 TCC 事务协调器一旦感知到任何一个服务的 Try 方法处理失败,就会调用各个服务的 Cancel 方法进行回滚,从而保证数据的一致性。
在一些特殊情况下,比如理财产品服务宕机或者出现异常,导致该服务并没有收到 TCC 事务协调器的 Cancel 或者 Confirm 请求,怎么办呢?没关系,TCC 事务框架会记录一些分布式事务的操作日志,保存分布式事务运行的各个阶段和状态。TCC 事务协调器会根据操作日志来进行重试,以达到数据的最终一致性。
需要注意的是,TCC 服务支持接口调用失败发起重试,所以 TCC 暴露的接口都需要满足幂等性。
本地消息表
本地消息表的核心思想是将分布式事务拆分成本地事务进行处理,本质上是一个消息异步处理的过程,其处理流程如下:
事务发起方在同一个事务里同时写业务表和消息表
事务发起方本地有个定时任务,定时查询消息表的状态,将未处理的消息通过消息中间件发送到事务消费方进行处理。
事务消费方读取消息队列中的消息,并写入本地的业务表中
消息正常处理完成后,事务消费方会通过消息中间件,通知事务发起方消息已处理
事务发起方在接收到结果后,会更新消息表中的状态为已处理
从上述流程上看,本地消息表方案是基于消息中间件的可靠性来达到事务的最终一致性。在这个过程中,当中间出现一些异常时,这里进一步分析:
当①和②处理出错,由于写业务数据和写消息表仍然在本地事务中,直接回滚即可
当③处理出错,发送消息失败,由于消息数据在本地消息表中有保存,只需要通过轮询任务定时发送到事务消费方,重新读取消息处理业务即可
如果是业务上⑥执行失败,事务消费方需发消息到事务发起方主动回滚事务
如果是事务发起方回滚事务但是消息已经被消费了,则需要通知到事务消费方发起回滚
总之,本地消息表方案整体简单易于实现,从应用设计开发的角度实现了消息数据的可靠性。但是本地消息表有以下缺点:
消息数据和业务数据耦合,占用业务库资源
消息表需要根据具体的业务场景制定,不具备通用性
性能上受限于数据库的性能,高并发场景下会有性能瓶颈
只适用于最终一致性的场景
一致性事务消息
基于消息中间件的事务消息来完成分布式事务。事务消息可以确保本地执行事务与消息发送是原子的:先发送一条消息到消息中间件(例如RabbitMQ),然后执行本地事务(本地DB操作),当本地事务成功后再发送提交确认(Confirm)到消息中间件,然后这条消息才能被其他业务消费者所能感知,从而确保原子性;如果本地事务失败,则Rollback消息中间件。
所以,我们这里最核心的就是A银行通过本地事务保证日志记录+后台线程轮询保证消息不丢失。B银行通过本地事务保证日志记录从而保证消息不重复消费!B银行在回调A银行的接口时会通知处理结果,如果转账失败,A银行会根据处理结果进行回滚。
rabbit MQ通过query本地事务状态来实现消息事务。
当然,分布式事务最好的解决方案是尽量避免出现分布式事务!
kafka实现的多条消息的事务性,要么一起被消费到,要么一起丢失。
参考:
https://blog.csdn.net/weixin_39785970/article/details/110844239
https://xie.infoq.cn/article/7b55d37b619256f9fa4ca4777
https://www.modb.pro/db/424506