跳到主要内容

· 阅读需 29 分钟

Seata 意为:Simple Extensible Autonomous Transaction Architecture,是一套一站式分布式事务解决方案,提供了 AT、TCC、Saga 和 XA 事务模式,本文详解其中的 Saga 模式。
项目地址:https://github.com/apache/incubator-seata

本文作者:屹远(陈龙),蚂蚁金服分布式事务核心研发。

金融分布式应用开发的痛点

分布式系统有一个比较明显的问题就是,一个业务流程需要组合一组服务。这样的事情在微服务下就更为明显了,因为这需要业务上的一致性的保证。也就是说,如果一个步骤失败了,那么要么回滚到以前的服务调用,要么不断重试保证所有的步骤都成功。---《左耳听风-弹力设计之“补偿事务”》

而在金融领域微服务架构下的业务流程往往会更复杂,流程很长,比如一个互联网微贷业务流程调十几个服务很正常,再加上异常处理的流程那就更复杂了,做过金融业务开发的同学会很有体感。

所以在金融分布式应用开发过程中我们面临一些痛点:

  • 业务一致性难以保障

我们接触到的大多数业务(比如在渠道层、产品层、集成层的系统),为了保障业务最终一致性,往往会采用“补偿”的方式来做,如果没有一个协调器来支持,开发难度是比较大的,每一步都要在 catch 里去处理前面所有的“回滚”操作,这将会形成“箭头形”的代码,可读性及维护性差。或者重试异常的操作,如果重试不成功可能要转异步重试,甚至最后转人工处理。这些都给开发人员带来极大的负担,开发效率低,且容易出错。

  • 业务状态难以管理

业务实体很多、实体的状态也很多,往往做完一个业务活动后就将实体的状态更新到了数据库里,没有一个状态机来管理整个状态的变迁过程,不直观,容易出错,造成业务进入一个不正确的状态。

  • 幂等性难以保障

服务的幂等性是分布式环境下的基本要求,为了保证服务的幂等性往往需要服务开发者逐个去设计,有用数据库唯一键实现的,有用分布式缓存实现的,没有一个统一的方案,开发人员负担大,也容易遗漏,从而造成资损。

  • 业务监控运维难,缺乏统一的差错守护能力

业务的执行情况监控一般通过打印日志,再基于日志监控平台查看,大多数情况是没有问题的,但是如果业务出错,这些监控缺乏当时的业务上下文,对排查问题不友好,往往需要再去数据库里查。同时日志的打印也依赖于开发,容易遗漏。对于补偿事务往往需要有“差错守护触发补偿”、“工人触发补偿”操作,没有统一的差错守护和处理规范,这些都要开发者逐个开发,负担沉重。

理论基础

一些场景下,我们对数据有强一致性的需求时,会采用在业务层上需要使用“两阶段提交”这样的分布式事务方案。而在另外一些场景下,我们并不需要这么强的一致性,那就只需要保证最终一致性就可以了。

例如蚂蚁金服目前在金融核心系统使用的就是 TCC 模式,金融核心系统的特点是一致性要求高(业务上的隔离性)、短流程、并发高。

而在很多金融核心以上的业务(比如在渠道层、产品层、集成层的系统),这些系统的特点是最终一致即可、流程多、流程长、还可能要调用其它公司的服务(如金融网络)。这是如果每个服务都开发 Try、Confirm、Cancel 三个方法成本高。如果事务中有其它公司的服务,也无法要求其它公司的服务也遵循 TCC 这种开发模式。同时流程长,事务边界太长会影响性能。

对于事务我们都知道 ACID,也很熟悉 CAP 理论最多只能满足其中两个,所以,为了提高性能,出现了 ACID 的一个变种 BASE。ACID 强调的是一致性(CAP 中的 C),而 BASE 强调的是可用性(CAP 中的 A)。我们知道,在很多情况下,我们是无法做到强一致性的 ACID 的。特别是我们需要跨多个系统的时候,而且这些系统还不是由一个公司所提供的。BASE 的系统倾向于设计出更加有弹力的系统,在短时间内,就算是有数据不同步的风险,我们也应该允许新的交易可以发生,而后面我们在业务上将可能出现问题的事务通过补偿的方式处理掉,以保证最终的一致性。

所以我们在实际开发中会进行取舍,对于更多的金融核心以上的业务系统可以采用补偿事务,补偿事务处理方面在 30 年前就提出了  Saga 理论,随着微服务的发展,近些年才逐步受到大家的关注。目前业界比较也公认 Saga 是作为长事务的解决方案。

https://github.com/aphyr/dist-sagas/blob/master/sagas.pdf[1] > http://microservices.io/patterns/data/saga.html[2]

社区和业界的方案

Apache Camel Saga

Camel 是实现 EIP(Enterprise Integration Patterns)企业集成模式的一款开源产品,它基于事件驱动的架构,有着良好的性能和吞吐量,它在 2.21 版本新增加了 Saga EIP。

Saga EIP 提供了一种方式可以通过 camel route 定义一系列有关联关系的 Action,这些 Action 要么都执行成功,要么都回滚,Saga 可以协调任何通讯协议的分布式服务或本地服务,并达到全局的最终一致性。Saga 不要求整个处理在短时间内完成,因为它不占用任何数据库锁,它可以支持需要长时间处理的请求,从几秒到几天,Camel 的 Saga EIP 是基于  Microprofile 的 LRA[3](Long Running Action),同样也是支持协调任何通讯协议任何语言实现的分布式服务。

Saga 的实现不会对数据进行加锁,而是在给操作定义它的“补偿操作”,当正常流程执行出错的时候触发那些已经执行过的操作的“补偿操作”,将流程回滚掉。“补偿操作”可以在 Camel route 上用 Java 或 XML DSL(Definition Specific Language)来定义。

下面是一个 Java DSL 示例:

// action
from("direct:reserveCredit")
.bean(idService, "generateCustomId") // generate a custom Id and set it in the body
.to("direct:creditReservation")

// delegate action
from("direct:creditReservation")
.saga()
.propagation(SagaPropagation.SUPPORTS)
.option("CreditId", body()) // mark the current body as needed in the compensating action
.compensation("direct:creditRefund")
.bean(creditService, "reserveCredit")
.log("Credit ${header.amount} reserved. Custom Id used is ${body}");

// called only if the saga is cancelled
from("direct:creditRefund")
.transform(header("CreditId")) // retrieve the CreditId option from headers
.bean(creditService, "refundCredit")
.log("Credit for Custom Id ${body} refunded");

XML DSL 示例:

<route>
<from uri="direct:start"/>
<saga>
<compensation uri="direct:compensation" />
<completion uri="direct:completion" />
<option optionName="myOptionKey">
<constant>myOptionValue</constant>
</option>
<option optionName="myOptionKey2">
<constant>myOptionValue2</constant>
</option>
</saga>
<to uri="direct:action1" />
<to uri="direct:action2" />
</route>

Eventuate Tram Saga

Eventuate Tram Saga[4]  框架是使用 JDBC / JPA 的 Java 微服务的一个 Saga 框架。它也和 Camel Saga 一样采用了  Java DSL 来定义补偿操作:

public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {

private SagaDefinition<CreateOrderSagaData> sagaDefinition =
step()
.withCompensation(this::reject)
.step()
.invokeParticipant(this::reserveCredit)
.step()
.invokeParticipant(this::approve)
.build();


@Override
public SagaDefinition<CreateOrderSagaData> getSagaDefinition() {
return this.sagaDefinition;
}


private CommandWithDestination reserveCredit(CreateOrderSagaData data) {
long orderId = data.getOrderId();
Long customerId = data.getOrderDetails().getCustomerId();
Money orderTotal = data.getOrderDetails().getOrderTotal();
return send(new ReserveCreditCommand(customerId, orderId, orderTotal))
.to("customerService")
.build();

...

Apache ServiceComb Saga

ServiceComb Saga[5]  也是一个微服务应用的数据最终一致性解决方案。相对于  TCC  而言,在 try 阶段,Saga 会直接提交事务,后续 rollback 阶段则通过反向的补偿操作来完成。与前面两种不同是它是采用 Java 注解+拦截器的方式来进行“补偿”服务的定义。

架构:

Saga 是由  alpha  和  **omega **组成,其中:

  • alpha 充当协调者的角色,主要负责对事务进行管理和协调;
  • omega 是微服务中内嵌的一个 agent,负责对网络请求进行拦截并向 alpha 上报事务事件;

下图展示了 alpha,omega 以及微服务三者的关系:
ServiceComb Saga

使用示例:

public class ServiceA extends AbsService implements IServiceA {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Autowired
private IServiceB serviceB;

@Autowired
private IServiceC serviceC;

@Override
public String getServiceName() {
return "servicea";
}

@Override
public String getTableName() {
return "testa";
}

@Override
@SagaStart
@Compensable(compensationMethod = "cancelRun")
@Transactional(rollbackFor = Exception.class)
public Object run(InvokeContext invokeContext) throws Exception {
LOG.info("A.run called");
doRunBusi();
if (invokeContext.isInvokeB(getServiceName())) {
serviceB.run(invokeContext);
}
if (invokeContext.isInvokeC(getServiceName())) {
serviceC.run(invokeContext);
}
if (invokeContext.isException(getServiceName())) {
LOG.info("A.run exception");
throw new Exception("A.run exception");
}
return null;
}

public void cancelRun(InvokeContext invokeContext) {
LOG.info("A.cancel called");
doCancelBusi();
}

蚂蚁金服的实践

蚂蚁金服内部大规模在使用 TCC 模式分布式事务,主要用于金融核心等对一致性要求高、性能要求高的场景。在更上层的业务系统因为流程多流程长,开发 TCC 成本比较高,大都会权衡采用 Saga 模式来到达业务最终一致性,由于历史的原因不同的 BU 有自己的一套“补偿”事务的方案,基本上是两种:

  • 一种是当一个服务在失败时需要“重试”或“补偿”时,在执行服务前在数据库插入一条记录,记录状态,当异常时通过定时任务去查询数据库记录并进行“重试”或“补偿”,当业务流程执行成功则删除记录;
  • 另一种是设计一个状态机引擎和简单的 DSL,编排业务流程和记录业务状态,状态机引擎可以定义“补偿服务”,当异常时由状态机引擎反向调用“补偿服务”进行回滚,同时还会有一个“差错守护”平台,监控那些执行失败或补偿失败的业务流水,并不断进行“补偿”或“重试”;

方案对比

社区和业界的解决方案一般是两种,一种基本状态机或流程引擎通过 DSL 方式编排流程程和补偿定义,一种是基于 Java 注解+拦截器实现补偿,那么这两种方案有什么优缺点呢?

方式优点缺点
状态机+DSL
- 可以用可视化工具来定义业务流程,标准化,可读性高,可实现服务编排的功能
- 提高业务分析人员与程序开发人员的沟通效率
- 业务状态管理:流程本质就是一个状态机,可以很好的反映业务状态的流转
- 提高异常处理灵活性:可以实现宕机恢复后的“向前重试”或“向后补偿”
- 天然可以使用 Actor 模型或 SEDA 架构等异步处理引擎来执行,提高整体吞吐量

- 业务流程实际是由 JAVA 程序与 DSL 配置组成,程序与配置分离,开发起来比较繁琐
- 如果是改造现有业务,对业务侵入性高
- 引擎实现成本高
拦截器+java 注解
- 程序与注解是在一起的,开发简单,学习成本低
- 方便接入现有业务
- 基于动态代理拦截器,框架实现成本低

- 框架无法提供 Actor 模型或 SEDA 架构等异步处理模式来提高系统吞吐量
- 框架无法提供业务状态管理
- 难以实现宕机恢复后的“向前重试”,因为无法恢复线程上下文

Seata Saga 的方案

Seata Saga 的简介可以看一下《Seata Saga 官网文档》[6]。

Seata Saga 采用了状态机+DSL 方案来实现,原因有以下几个:

  • 状态机+DSL 方案在实际生产中应用更广泛;
  • 可以使用 Actor 模型或 SEDA 架构等异步处理引擎来执行,提高整体吞吐量;
  • 通常在核心系统以上层的业务系统会伴随有“服务编排”的需求,而服务编排又有事务最终一致性要求,两者很难分割开,状态机+DSL 方案可以同时满足这两个需求;
  • 由于 Saga 模式在理论上是不保证隔离性的,在极端情况下可能由于脏写无法完成回滚操作,比如举一个极端的例子, 分布式事务内先给用户 A 充值,然后给用户 B 扣减余额,如果在给 A 用户充值成功,在事务提交以前,A 用户把线消费掉了,如果事务发生回滚,这时则没有办法进行补偿了,有些业务场景可以允许让业务最终成功,在回滚不了的情况下可以继续重试完成后面的流程,状态机+DSL 的方案可以实现“向前”恢复上下文继续执行的能力, 让业务最终执行成功,达到最终一致性的目的。

在不保证隔离性的情况下:业务流程设计时要遵循“宁可长款, 不可短款”的原则,长款意思是客户少了线机构多了钱,以机构信誉可以给客户退款,反之则是短款,少的线可能追不回来了。所以在业务流程设计上一定是先扣款。

状态定义语言(Seata State Language)

  1. 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件;

  2. 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点;

  3. 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚;

    注意: 异常发生时是否进行补偿也可由用户自定义决定

  4. 可以实现服务编排需求,支持单项选择、并发、异步、子状态机、参数转换、参数映射、服务执行状态判断、异常捕获等功能;

假设有一个业务流程要调两个服务,先调库存扣减(InventoryService),再调余额扣减(BalanceService),保证在一个分布式内要么同时成功,要么同时回滚。两个参与者服务都有一个 reduce 方法,表示库存扣减或余额扣减,还有一个 compensateReduce 方法,表示补偿扣减操作。以  InventoryService 为例看一下它的接口定义:

public interface InventoryService {

/**
* reduce
* @param businessKey
* @param amount
* @param params
* @return
*/
boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);

/**
* compensateReduce
* @param businessKey
* @param params
* @return
*/
boolean compensateReduce(String businessKey, Map<String, Object> params);
}

这个业务流程对应的状态图:

示例状态图
对应的 JSON

{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": ["$.[businessKey]", "$.[count]"],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Expression": "[reduceInventoryResult] == true",
"Next": "ReduceBalance"
}
],
"Default": "Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException": "$.[mockReduceBalanceFail]"
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": ["java.lang.Throwable"],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}

状态语言在一定程度上参考了  AWS Step Functions[7]。

"状态机" 属性简介:

  • Name: 表示状态机的名称,必须唯一;
  • Comment: 状态机的描述;
  • Version: 状态机定义版本;
  • StartState: 启动时运行的第一个"状态";
  • States: 状态列表,是一个 map 结构,key 是"状态"的名称,在状态机内必须唯一;

"状态" 属性简介:

  • Type:"状态" 的类型,比如有:
    • ServiceTask: 执行调用服务任务;
    • Choice: 单条件选择路由;
    • CompensationTrigger: 触发补偿流程;
    • Succeed: 状态机正常结束;
    • Fail: 状态机异常结束;
    • SubStateMachine: 调用子状态机;
  • ServiceName: 服务名称,通常是服务的 beanId;
  • ServiceMethod: 服务方法名称;
  • CompensateState: 该"状态"的补偿"状态";
  • Input: 调用服务的输入参数列表,是一个数组,对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的  SpringEL[8], 如果是常量直接写值即可;
  • Output: 将服务返回的参数赋值到状态机上下文中,是一个 map 结构,key 为放入到状态机上文时的 key(状态机上下文也是一个 map),value 中 $. 是表示 SpringEL 表达式,表示从服务的返回参数中取值,#root 表示服务的整个返回参数;
  • Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知,我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个 map 结构,key 是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是 SpringEL 表达式判断服务返回参数,带 $Exception{开头表示判断异常类型,value 是当这个条件表达式成立时则将服务执行状态映射成这个值;
  • Catch: 捕获到异常后的路由;
  • Next: 服务执行完成后下一个执行的"状态";
  • Choices: Choice 类型的"状态"里, 可选的分支列表, 分支中的 Expression 为 SpringEL 表达式,Next 为当表达式成立时执行的下一个"状态";
  • ErrorCode: Fail 类型"状态"的错误码;
  • Message: Fail 类型"状态"的错误信息;

更多详细的状态语言解释请看《Seata Saga 官网文档》[6http://seata.io/zh-cn/docs/user/saga.html]。

状态机引擎原理:

状态机引擎原理

  • 图中的状态图是先执行 stateA, 再执行 stataB,然后执行 stateC;
  • "状态"的执行是基于事件驱动的模型,stataA 执行完成后,会产生路由消息放入 EventQueue,事件消费端从 EventQueue 取出消息,执行 stateB;
  • 在整个状态机启动时会调用 Seata Server 开启分布式事务,并生产 xid, 然后记录"状态机实例"启动事件到本地数据库;
  • 当执行到一个"状态"时会调用 Seata Server 注册分支事务,并生产 branchId, 然后记录"状态实例"开始执行事件到本地数据库;
  • 当一个"状态"执行完成后会记录"状态实例"执行结束事件到本地数据库, 然后调用 Seata Server 上报分支事务的状态;
  • 当整个状态机执行完成,会记录"状态机实例"执行完成事件到本地数据库, 然后调用 Seata Server 提交或回滚分布式事务;

状态机引擎设计:

状态机引擎设计

状态机引擎的设计主要分成三层, 上层依赖下层,从下往上分别是:

  • Eventing 层:

    • 实现事件驱动架构, 可以压入事件, 并由消费端消费事件, 本层不关心事件是什么消费端执行什么,由上层实现;
  • ProcessController 层:

    • 由于上层的 Eventing 驱动一个“空”流程执行的执行,"state"的行为和路由都未实现,由上层实现;

      基于以上两层理论上可以自定义扩展任何"流程"引擎。这两层的设计是参考了内部金融网络平台的设计。

  • StateMachineEngine 层:

    • 实现状态机引擎每种 state 的行为和路由逻辑;
    • 提供 API、状态机语言仓库;

Saga 模式下服务设计的实践经验

下面是实践中总结的在 Saga 模式下微服务设计的一些经验,当然这是推荐做法,并不是说一定要 100% 遵循,没有遵循也有“绕过”方案。

好消息:Seata Saga 模式对微服务的接口参数没有任务要求,这使得 Saga 模式可用于集成遗留系统或外部机构的服务。

允许空补偿

  • 空补偿:原服务未执行,补偿服务执行了;
  • 出现原因:
    • 原服务 超时(丢包);
    • Saga 事务触发 回滚;
    • 未收到原服务请求,先收到补偿请求;

所以服务设计时需要允许空补偿,即没有找到要补偿的业务主键时返回补偿成功并将原业务主键记录下来。

防悬挂控制

  • 悬挂:补偿服务 比 原服务 先执行;
  • 出现原因:
    • 原服务 超时(拥堵);
    • Saga 事务回滚,触发 回滚;
    • 拥堵的原服务到达;

所以要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝服务的执行。

幂等控制

  • 原服务与补偿服务都需要保证幂等性, 由于网络可能超时,可以设置重试策略,重试发生时要通过幂等控制避免业务数据重复更新。

总结

很多时候我们不需要强调强一性,我们基于 BASE 和 Saga 理论去设计更有弹性的系统,在分布式架构下获得更好的性能和容错能力。分布式架构没有银弹,只有适合特定场景的方案,事实上 Seata Saga 是一个具备“服务编排”和“Saga 分布式事务”能力的产品,总结下来它的适用场景是:

  • 适用于微服务架构下的“长事务”处理;
  • 适用于微服务架构下的“服务编排”需求;
  • 适用于金融核心系统以上的有大量组合服务的业务系统(比如在渠道层、产品层、集成层的系统);
  • 适用于业务流程中需要集成遗留系统或外部机构提供的服务的场景(这些服务不可变不能对其提出改造要求)。

文中涉及相关链接

[1]https://github.com/aphyr/dist-sagas/blob/master/sagas.pdf
[2]http://microservices.io/patterns/data/saga.html
[3]Microprofile 的 LRAhttps://github.com/eclipse/microprofile-sandbox/tree/master/proposals/0009-LRA
[4]Eventuate Tram Sagahttps://github.com/eventuate-tram/eventuate-tram-sagas
[5]ServiceComb Sagahttps://github.com/apache/servicecomb-pack
[6]Seata Saga 官网文档http://seata.io/zh-cn/docs/user/saga.html
[7]AWS Step Functionshttps://docs.aws.amazon.com/zh_cn/step-functions/latest/dg/tutorial-creating-lambda-state-machine.html
[8]SpringELhttps://docs.spring.io/spring/docs/4.3.10.RELEASE/spring-framework-reference/html/expressions.html

· 阅读需 26 分钟

作者:屹远(陈龙),蚂蚁金服分布式事务框架核心研发。
本文根据 8 月 11 日 SOFA Meetup#3 广州站 《分布式事务 Seata 及其三种模式详解》主题分享整理,着重分享分布式事务产生的背景、理论基础,以及 Seata 分布式事务的原理以及三种模式(AT、TCC、Saga)的分布式事务实现。

现场回顾视频以及 PPT 见文末链接。

3 分布式事务 Seata 三种模式详解-屹远.jpg

一、分布式事务产生的背景

1.1 分布式架构演进之 - 数据库的水平拆分

蚂蚁金服的业务数据库起初是单库单表,但随着业务数据规模的快速发展,数据量越来越大,单库单表逐渐成为瓶颈。所以我们对数据库进行了水平拆分,将原单库单表拆分成数据库分片。

如下图所示,分库分表之后,原来在一个数据库上就能完成的写操作,可能就会跨多个数据库,这就产生了跨数据库事务问题。

image.png

1.2 分布式架构演进之 - 业务服务化拆分

在业务发展初期,“一块大饼”的单业务系统架构,能满足基本的业务需求。但是随着业务的快速发展,系统的访问量和业务复杂程度都在快速增长,单系统架构逐渐成为业务发展瓶颈,解决业务系统的高耦合、可伸缩问题的需求越来越强烈。

如下图所示,蚂蚁金服按照面向服务架构(SOA)的设计原则,将单业务系统拆分成多个业务系统,降低了各系统之间的耦合度,使不同的业务系统专注于自身业务,更有利于业务的发展和系统容量的伸缩。

image.png

业务系统按照服务拆分之后,一个完整的业务往往需要调用多个服务,如何保证多个服务间的数据一致性成为一个难题。

二、分布式事务理论基础

2.1 两阶段提交协议

16_16_18__08_13_2019.jpg

两阶段提交协议:事务管理器分两个阶段来协调资源管理器,第一阶段准备资源,也就是预留事务所需的资源,如果每个资源管理器都资源预留成功,则进行第二阶段资源提交,否则协调资源管理器回滚资源。

2.2 TCC

16_16_51__08_13_2019.jpg

TCC(Try-Confirm-Cancel) 实际上是服务化的两阶段提交协议,业务开发者需要实现这三个服务接口,第一阶段服务由业务代码编排来调用 Try 接口进行资源预留,所有参与者的 Try 接口都成功了,事务管理器会提交事务,并调用每个参与者的 Confirm 接口真正提交业务操作,否则调用每个参与者的 Cancel 接口回滚事务。

2.3 Saga

3 分布式事务 Seata 三种模式详解-屹远-9.jpg

Saga 是一种补偿协议,在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。

分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任何一个正向操作执行失败,那么分布式事务会退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。

Saga 理论出自 Hector & Kenneth 1987发表的论文 Sagas。

Saga 正向服务与补偿服务也需要业务开发者实现。

三、Seata 及其三种模式详解

3.1 分布式事务 Seata 介绍

Seata(Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架)是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。Seata 开源半年左右,目前已经有超过 1.1 万 star,社区非常活跃。我们热忱欢迎大家参与到 Seata 社区建设中,一同将 Seata 打造成开源分布式事务标杆产品。

Seata:https://github.com/apache/incubator-seata

image.png

3.2 分布式事务 Seata 产品模块

如下图所示,Seata 中有三大模块,分别是 TM、RM 和 TC。 其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。

image.png

在 Seata 中,分布式事务的执行流程:

  • TM 开启分布式事务(TM 向 TC 注册全局事务记录);
  • 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
  • TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
  • TC 汇总事务信息,决定分布式事务是提交还是回滚;
  • TC 通知所有 RM 提交/回滚 资源,事务二阶段结束;

3.3 分布式事务 Seata 解决方案

Seata 会有 4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。

15_49_23__08_13_2019.jpg

2.3.1 AT 模式

今年 1 月份,Seata 开源了 AT 模式。AT 模式是一种无侵入的分布式事务解决方案。在 AT 模式下,用户只需关注自己的“业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操作。

image.png

AT 模式如何做到对业务的无侵入 :
  • 一阶段:

在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,然后执行“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

图片3.png

  • 二阶段提交:

二阶段如果是提交的话,因为“业务 SQL”在一阶段已经提交至数据库, 所以 Seata 框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。

图片4.png

  • 二阶段回滚:

二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

图片5.png

AT 模式的一阶段、二阶段提交和回滚均由 Seata 框架自动生成,用户只需编写“业务 SQL”,便能轻松接入分布式事务,AT 模式是一种对业务无任何侵入的分布式事务解决方案。

2.3.2 TCC 模式

2019 年 3 月份,Seata 开源了 TCC 模式,该模式由蚂蚁金服贡献。TCC 模式需要用户根据自己的业务场景实现 Try、Confirm 和 Cancel 三个操作;事务发起方在一阶段执行 Try 方式,在二阶段提交执行 Confirm 方法,二阶段回滚执行 Cancel 方法。

图片6.png

TCC 三个方法描述:

  • Try:资源的检测和预留;
  • Confirm:执行的业务操作提交;要求 Try 成功 Confirm 一定要能成功;
  • Cancel:预留资源释放;

蚂蚁金服在 TCC 的实践经验
**
16_48_02__08_13_2019.jpg

1 TCC 设计 - 业务模型分 2 阶段设计:

用户接入 TCC ,最重要的是考虑如何将自己的业务模型拆成两阶段来实现。

以“扣钱”场景为例,在接入 TCC 前,对 A 账户的扣钱,只需一条更新账户余额的 SQL 便能完成;但是在接入 TCC 之后,用户就需要考虑如何将原来一步就能完成的扣钱操作,拆成两阶段,实现成三个方法,并且保证一阶段 Try  成功的话 二阶段 Confirm 一定能成功。

图片7.png

如上图所示,

Try 方法作为一阶段准备方法,需要做资源的检查和预留。在扣钱场景下,Try 要做的事情是就是检查账户余额是否充足,预留转账资金,预留的方式就是冻结 A 账户的 转账资金。Try 方法执行之后,账号 A 余额虽然还是 100,但是其中 30 元已经被冻结了,不能被其他事务使用。

二阶段 Confirm 方法执行真正的扣钱操作。Confirm 会使用 Try 阶段冻结的资金,执行账号扣款。Confirm 方法执行之后,账号 A 在一阶段中冻结的 30 元已经被扣除,账号 A 余额变成 70 元 。

如果二阶段是回滚的话,就需要在 Cancel 方法内释放一阶段 Try 冻结的 30 元,使账号 A 的回到初始状态,100 元全部可用。

用户接入 TCC 模式,最重要的事情就是考虑如何将业务模型拆成 2 阶段,实现成 TCC 的 3 个方法,并且保证 Try 成功 Confirm 一定能成功。相对于 AT 模式,TCC 模式对业务代码有一定的侵入性,但是 TCC 模式无 AT 模式的全局行锁,TCC 性能会比 AT 模式高很多。

2 TCC 设计 - 允许空回滚:
**
16_51_44__08_13_2019.jpg

Cancel 接口设计时需要允许空回滚。在 Try 接口因为丢包时没有收到,事务管理器会触发回滚,这时会触发 Cancel 接口,这时 Cancel 执行时发现没有对应的事务 xid 或主键时,需要返回回滚成功。让事务服务管理器认为已回滚,否则会不断重试,而 Cancel 又没有对应的业务数据可以进行回滚。

3 TCC 设计 - 防悬挂控制:
**
16_51_56__08_13_2019.jpg

悬挂的意思是:Cancel 比 Try 接口先执行,出现的原因是 Try 由于网络拥堵而超时,事务管理器生成回滚,触发 Cancel 接口,而最终又收到了 Try 接口调用,但是 Cancel 比 Try 先到。按照前面允许空回滚的逻辑,回滚会返回成功,事务管理器认为事务已回滚成功,则此时的 Try 接口不应该执行,否则会产生数据不一致,所以我们在 Cancel 空回滚返回成功之前先记录该条事务 xid 或业务主键,标识这条记录已经回滚过,Try 接口先检查这条事务xid或业务主键如果已经标记为回滚成功过,则不执行 Try 的业务操作。

4 TCC 设计 - 幂等控制:
**
16_52_07__08_13_2019.jpg

幂等性的意思是:对同一个系统,使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的。因为网络抖动或拥堵可能会超时,事务管理器会对资源进行重试操作,所以很可能一个业务操作会被重复调用,为了不因为重复调用而多次占用资源,需要对服务设计时进行幂等控制,通常我们可以用事务 xid 或业务主键判重来控制。

2.3.3 Saga 模式

Saga 模式是 Seata 即将开源的长事务解决方案,将由蚂蚁金服主要贡献。在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。

分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任何一个正向操作执行失败,那么分布式事务会去退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。

图片8.png

Saga 模式下分布式事务通常是由事件驱动的,各个参与者之间是异步执行的,Saga 模式是一种长事务解决方案。

1 Saga 模式使用场景
**
16_44_58__08_13_2019.jpg

Saga 模式适用于业务流程长且需要保证事务最终一致性的业务系统,Saga 模式一阶段就会提交本地事务,无锁、长流程情况下可以保证性能。

事务参与者可能是其它公司的服务或者是遗留系统的服务,无法进行改造和提供 TCC 要求的接口,可以使用 Saga 模式。

Saga模式的优势是:

  • 一阶段提交本地数据库事务,无锁,高性能;
  • 参与者可以采用事务驱动异步执行,高吞吐;
  • 补偿服务即正向服务的“反向”,易于理解,易于实现;

缺点:Saga 模式由于一阶段已经提交本地数据库事务,且没有进行“预留”动作,所以不能保证隔离性。后续会讲到对于缺乏隔离性的应对措施。
2 基于状态机引擎的 Saga 实现

17_13_19__08_13_2019.jpg

目前 Saga 的实现一般有两种,一种是通过事件驱动架构实现,一种是基于注解加拦截器拦截业务的正向服务实现。Seata 目前是采用事件驱动的机制来实现的,Seata 实现了一个状态机,可以编排服务的调用流程及正向服务的补偿服务,生成一个 json 文件定义的状态图,状态机引擎驱动到这个图的运行,当发生异常的时候状态机触发回滚,逐个执行补偿服务。当然在什么情况下触发回滚用户是可以自定义决定的。该状态机可以实现服务编排的需求,它支持单项选择、并发、异步、子状态机调用、参数转换、参数映射、服务执行状态判断、异常捕获等功能。

3 状态机引擎原理

16_45_32__08_13_2019.jpg

该状态机引擎的基本原理是,它基于事件驱动架构,每个步骤都是异步执行的,步骤与步骤之间通过事件队列流转,
极大的提高系统吞吐量。每个步骤执行时会记录事务日志,用于出现异常时回滚时使用,事务日志会记录在与业务表所在的数据库内,提高性能。

4 状态机引擎设计

16_45_46__08_13_2019.jpg

该状态机引擎分成了三层架构的设计,最底层是“事件驱动”层,实现了 EventBus 和消费事件的线程池,是一个 Pub-Sub 的架构。第二层是“流程控制器”层,它实现了一个极简的流程引擎框架,它驱动一个“空”的流程执行,“空”的意思是指它不关心流程节点做什么事情,它只执行每个节点的 process 方法,然后执行 route 方法流转到下一个节点。这是一个通用框架,基于这两层,开发者可以实现任何流程引擎。最上层是“状态机引擎”层,它实现了每种状态节点的“行为”及“路由”逻辑代码,提供 API 和状态图仓库,同时还有一些其它组件,比如表达式语言、逻辑计算器、流水生成器、拦截器、配置管理、事务日志记录等。

5 Saga 服务设计经验

和TCC类似,Saga的正向服务与反向服务也需求遵循以下设计原则:

1)Saga 服务设计 - 允许空补偿
**
16_52_22__08_13_2019.jpg

2)Saga 服务设计 - 防悬挂控制
**
16_52_52__08_13_2019.jpg

3)Saga 服务设计 - 幂等控制
**
3 分布式事务 Seata 三种模式详解-屹远-31.jpg

4)Saga 设计 - 自定义事务恢复策略
**
16_53_07__08_13_2019.jpg

前面讲到 Saga 模式不保证事务的隔离性,在极端情况下可能出现脏写。比如在分布式事务未提交的情况下,前一个服务的数据被修改了,而后面的服务发生了异常需要进行回滚,可能由于前面服务的数据被修改后无法进行补偿操作。这时的一种处理办法可以是“重试”继续往前完成这个分布式事务。由于整个业务流程是由状态机编排的,即使是事后恢复也可以继续往前重试。所以用户可以根据业务特点配置该流程的事务处理策略是优先“回滚”还是“重试”,当事务超时的时候,Server 端会根据这个策略不断进行重试。

由于 Saga 不保证隔离性,所以我们在业务设计的时候需要做到“宁可长款,不可短款”的原则,长款是指在出现差错的时候站在我方的角度钱多了的情况,钱少了则是短款,因为如果长款可以给客户退款,而短款则可能钱追不回来了,也就是说在业务设计的时候,一定是先扣客户帐再入帐,如果因为隔离性问题造成覆盖更新,也不会出现钱少了的情况。

6 基于注解和拦截器的 Saga 实现
**
17_13_37__08_13_2019.jpg

还有一种 Saga 的实现是基于注解+拦截器的实现,Seata 目前没有实现,可以看上面的伪代码来理解一下,one 方法上定义了 @SagaCompensable 的注解,用于定义 one 方法的补偿方法是 compensateOne 方法。然后在业务流程代码 processA 方法上定义 @SagaTransactional 注解,启动 Saga 分布式事务,通过拦截器拦截每个正向方法当出现异常的时候触发回滚操作,调用正向方法的补偿方法。

7 两种 Saga 实现优劣对比

两种 Saga 的实现各有又缺点,下面表格是一个对比:

17_13_49__08_13_2019.jpg

状态机引擎的最大优势是可以通过事件驱动的方法异步执行提高系统吞吐,可以实现服务编排需求,在 Saga 模式缺乏隔离性的情况下,可以多一种“向前重试”的事情恢复策略。注解加拦截器的的最大优势是,开发简单、学习成本低。

总结

本文先回顾了分布式事务产生的背景及理论基础,然后重点讲解了 Seata 分布式事务的原理以及三种模式(AT、TCC、Saga)的分布式事务实现。

Seata 的定位是分布式事全场景解决方案,未来还会有 XA 模式的分布式事务实现,每种模式都有它的适用场景,AT 模式是无侵入的分布式事务解决方案,适用于不希望对业务进行改造的场景,几乎0学习成本。TCC 模式是高性能分布式事务解决方案,适用于核心系统等对性能有很高要求的场景。Saga 模式是长事务解决方案,适用于业务流程长且需要保证事务最终一致性的业务系统,Saga 模式一阶段就会提交本地事务,无锁,长流程情况下可以保证性能,多用于渠道层、集成层业务系统。事务参与者可能是其它公司的服务或者是遗留系统的服务,无法进行改造和提供 TCC 要求的接口,也可以使用 Saga 模式。

本次分享的视频回顾以及PPT 查看地址:https://tech.antfin.com/community/activities/779/review

· 阅读需 15 分钟

在微服务架构体系下,我们可以按照业务模块分层设计,单独部署,减轻了服务部署压力,也解耦了业务的耦合,避免了应用逐渐变成一个庞然怪物,从而可以轻松扩展,在某些服务出现故障时也不会影响其它服务的正常运行。总之,微服务在业务的高速发展中带给我们越来越多的优势,但是微服务并不是十全十美,因此不能盲目过度滥用,它有很多不足,而且会给系统带来一定的复杂度,其中伴随而来的分布式事务问题,是微服务架构体系下必然需要处理的一个痛点,也是业界一直关注的一个领域,因此也出现了诸如 CAP 和 BASE 等理论。

在今年年初,阿里开源了一个分布式事务中间件,起初起名为 Fescar,后改名为 Seata,在它开源之初,我就知道它肯定要火,因为这是一个解决痛点的开源项目,Seata 一开始就是冲着对业务无侵入与高性能方向走,这正是我们对解决分布式事务问题迫切的需求。因为待过的几家公司,用的都是微服务架构,但是在解决分布式事务的问题上都不太优雅,所以我也在一直关注 Seata 的发展,今天就简要说说它的一些设计上的原理,后续我将会对它的各个模块进行深入源码分析,感兴趣的可以持续关注我的公众号或者博客,不要跟丢。

分布式事务解决的方案有哪些?

目前分布式事务解决的方案主要有对业务无入侵和有入侵的方案,无入侵方案主要有基于数据库 XA 协议的两段式提交(2PC)方案,它的优点是对业务代码无入侵,但是它的缺点也是很明显:必须要求数据库对 XA 协议的支持,且由于 XA 协议自身的特点,它会造成事务资源长时间得不到释放,锁定周期长,而且在应用层上面无法干预,因此它性能很差,它的存在相当于七伤拳那样“伤人七分,损己三分”,因此在互联网项目中并不是很流行这种解决方案。

为了这个弥补这种方案带来性能低的问题,大佬们又想出了很多种方案来解决,但这无一例外都需要通过在应用层做手脚,即入侵业务的方式,比如很出名的 TCC 方案,基于 TCC 也有很多成熟的框架,如 ByteTCC、tcc-transaction 等。以及基于可靠消息的最终一致性来实现,如 RocketMQ 的事务消息。

入侵代码的方案是基于现有情形“迫不得已”才推出的解决方案,实际上它们实现起来非常不优雅,一个事务的调用通常伴随而来的是对该事务接口增加一系列的反向操作,比如 TCC 三段式提交,提交逻辑必然伴随着回滚的逻辑,这样的代码会使得项目非常臃肿,维护成本高。

Seata 各模块之间的关系

针对上面所说的分布式事务解决方案的痛点,那很显然,我们理想的分布式事务解决方案肯定是性能要好而且要对业务无入侵,业务层上无需关心分布式事务机制的约束,Seata 正是往这个方向发展的,因此它非常值得期待,它将给我们的微服务架构带来质的提升。

那 Seata 是怎么做到的呢?下面说说它的各个模块之间的关系。

Seata 的设计思路是将一个分布式事务可以理解成一个全局事务,下面挂了若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以操作分布式事务像操作本地事务一样。

Seata 内部定义了 3个模块来处理全局事务和分支事务的关系和处理过程,这三个组件分别是:

  • Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
  • Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
  • Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

简要说说整个全局事务的执行步骤:

  1. TM 向 TC 申请开启一个全局事务,TC 创建全局事务后返回全局唯一的 XID,XID 会在全局事务的上下文中传播;
  2. RM 向 TC 注册分支事务,该分支事务归属于拥有相同 XID 的全局事务;
  3. TM 向 TC 发起全局提交或回滚;
  4. TC 调度 XID 下的分支事务完成提交或者回滚。

与 XA 方案有什么不同?

Seata 的事务提交方式跟 XA 协议的两段式提交在总体上来说基本是一致的,那它们之间有什么不同呢?

我们都知道 XA 协议它依赖的是数据库层面来保障事务的一致性,也即是说 XA 的各个分支事务是在数据库层面上驱动的,由于 XA 的各个分支事务需要有 XA 的驱动程序,一方面会导致数据库与 XA 驱动耦合,另一方面它会导致各个分支的事务资源锁定周期长,这也是它没有在互联网公司流行的重要因素。

基于 XA 协议以上的问题,Seata 另辟蹊径,既然在依赖数据库层会导致这么多问题,那我就从应用层做手脚,这还得从 Seata 的 RM 模块说起,前面也说过 RM 的主要作用了,其实 RM 在内部做了对数据库操作的代理层,如下:

Seata 在数据源做了一层代理层,所以我们使用 Seata 时,我们使用的数据源实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,主要是解析 SQL,把业务数据在更新前后的数据镜像组织成回滚日志,并将 undo log 日志插入 undo_log 表中,保证每条更新数据的业务 sql 都有对应的回滚日志存在。

这样做的好处就是,本地事务执行完可以立即释放本地事务锁定的资源,然后向 TC 上报分支状态。当 TM 决议全局提交时,就不需要同步协调处理了,TC 会异步调度各个 RM 分支事务删除对应的 undo log 日志即可,这个步骤非常快速地可以完成;当 TM 决议全局回滚时,RM 收到 TC 发送的回滚请求,RM 通过 XID 找到对应的 undo log 回滚日志,然后执行回滚日志完成回滚操作。

如上图所示,XA 方案的 RM 是放在数据库层的,它依赖了数据库的 XA 驱动程序。

如上图所示,Seata 的 RM 实际上是已中间件的形式放在应用层,不用依赖数据库对协议的支持,完全剥离了分布式事务方案对数据库在协议支持上的要求。

分支事务如何提交和回滚?

下面详细说说分支事务是如何提交和回滚的:

  • 第一阶段:

分支事务利用 RM 模块中对 JDBC 数据源代理,加入了若干流程,对业务 SQL 进行解释,把业务数据在更新前后的数据镜像组织成回滚日志,并生成 undo log 日志,对全局事务锁的检查以及分支事务的注册等,利用本地事务 ACID 特性,将业务 SQL 和 undo log 写入同一个事物中一同提交到数据库中,保证业务 SQL 必定存在相应的回滚日志,最后对分支事务状态向 TC 进行上报。

  • 第二阶段:

TM决议全局提交:

当 TM 决议提交时,就不需要同步协调处理了,TC 会异步调度各个 RM 分支事务删除对应的 undo log 日志即可,这个步骤非常快速地可以完成。这个机制对于性能提升非常关键,我们知道正常的业务运行过程中,事务执行的成功率是非常高的,因此可以直接在本地事务中提交,这步对于提升性能非常显著。

TM决议全局回滚:

当 TM 决议回滚时,RM 收到 TC 发送的回滚请求,RM 通过 XID 找到对应的 undo log 回滚日志,然后利用本地事务 ACID 特性,执行回滚日志完成回滚操作并删除 undo log 日志,最后向 TC 进行回滚结果上报。

业务对以上所有的流程都无感知,业务完全不关心全局事务的具体提交和回滚,而且最重要的一点是 Seata 将两段式提交的同步协调分解到各个分支事务中了,分支事务与普通的本地事务无任何差异,这意味着我们使用 Seata 后,分布式事务就像使用本地事务一样,完全将数据库层的事务协调机制交给了中间件层 Seata 去做了,这样虽然事务协调搬到应用层了,但是依然可以做到对业务的零侵入,从而剥离了分布式事务方案对数据库在协议支持上的要求,且 Seata 在分支事务完成之后直接释放资源,极大减少了分支事务对资源的锁定时间,完美避免了 XA 协议需要同步协调导致资源锁定时间过长的问题。

其它方案的补充

上面说的其实是 Seata 的默认模式,也叫 AT 模式,它是类似于 XA 方案的两段式提交方案,并且是对业务无侵入,但是这种机制依然是需要依赖数据库本地事务的 ACID 特性,有没有发现,我在上面的图中都强调了必须是支持 ACID 特性的关系型数据库,那么问题就来了,非关系型或者不支持 ACID 的数据库就无法使用 Seata 了,别慌,Seata 现阶段为我们准备了另外一种模式,叫 MT 模式,它是一种对业务有入侵的方案,提交回滚等操作需要我们自行定义,业务逻辑需要被分解为 Prepare/Commit/Rollback 3 部分,形成一个 MT 分支,加入全局事务,它存在的意义是为 Seata 触达更多的场景。

只不过,它不是 Seata “主打”的模式,它的存在仅仅作为补充的方案,从以上官方的发展远景就可以看出来,Seata 的目标是始终是对业务无入侵的方案。

注:本文图片设计参考Seata官方图

作者简介:

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 7 分钟

前言

TaaS 是 Seata 服务端(TC, Transaction Coordinator)的一种高可用实现,使用 Golang 编写。Taas 由InfiniVision (http://infinivision.cn) 贡献给Seata开源社区。现已正式开源,并贡献给 Seata 社区。

在Seata开源之前,我们内部开始借鉴GTS以及一些开源项目来实现分布式事务的解决方案TaaS(Transaction as a Service)。

在我们完成TaaS的服务端的开发工作后,Seata(当时还叫Fescar)开源了,并且引起了开源社区的广泛关注,加上阿里巴巴的平台影响力以及社区活跃度,我们认为Seata会成为今后开源分布式事务的标准,我们决定TaaS兼容Seata。

在发现Seata的服务端的实现是单机的,高可用等并没有实现,于是我们与Seata社区负责人取得联系,并且决定把TaaS开源,回馈开源社区。 同时,我们会长期维护,并且和Seata版本保持同步。

目前,Seata官方的Java高可用版本也在开发中,TaaS和该高可用版本的设计思想不同,在今后会长期共存。

TaaS已经开源, github (https://github.com/apache/incubator-seata-go-server),欢迎大家试用。

设计原则

  1. 高性能,性能和机器数量成正比,即通过加入新机器到集群中,就可以提升性能
  2. 高可用,一台机器出现故障,系统能依旧可以对外提供服务,或者在较短的时间内恢复对外服务(Leader切换的时间)
  3. Auto-Rebalance,集群中增加新的机器,或者有机器下线,系统能够自动的做负载均衡
  4. 强一致,系统的元数据强一致在多个副本中存储

设计

高性能

TaaS的性能和机器数量成正比,为了支持这个特性,在TaaS中处理全局事务的最小单元称为Fragment,系统在启动的时候会设定每个Fragment支持的活跃全局事务的并发数,同时系统会对每个Fragment进行采样,一旦发现Fragment超负荷,会生成新的Fragment来处理更多的并发。

高可用

每个Fragment有多个副本和一个Leader,由Leader来处理请求。当Leader出现故障,系统会产生一个新的Leader来处理请求,在新Leader的选举过程中,这个Fragment对外不提供服务,通常这个间隔时间是几秒钟。

强一致

TaaS本身不存储全局事务的元数据,元数据存储在Elasticell (https://github.com/deepfabric/elasticell) 中,Elasticell是一个兼容redis协议的分布式的KV存储,它基于Raft协议来保证数据的一致性。

Auto-Rebalance

随着系统的运行,在系统中会存在许多Fragment以及它们的副本,这样会导致在每个机器上,Fragment的分布不均匀,特别是当旧的机器下线或者新的机器上线的时候。TaaS在启动的时候,会选择3个节点作为调度器的角色,调度器负责调度这些Fragment,用来保证每个机器上的Fragment的数量以及Leader个数大致相等,同时还会保证每个Fragment的副本数维持在指定的副本个数。

Fragment副本创建

  1. t0时间点,Fragment1在Seata-TC1机器上创建
  2. t1时间点,Fragment1的副本Fragment1'在Seata-TC2机器上创建
  3. t2时间点,Fragment1的副本Fragment1"在Seata-TC3机器上创建

在t2时间点,Fragment1的三个副本创建完毕。

Fragment副本迁移

  1. t0时刻点,系统一个存在4个Fragment,分别存在于Seata-TC1,Seata-TC2,Seata-TC3三台机器上
  2. t1时刻,加入新机器Seata-TC4
  3. t2时刻,有3个Fragment的副本被迁移到了Seata-TC4这台机器上

在线快速体验

我们在公网搭建了一个体验的环境:

本地快速体验

使用docker-compose快速体验TaaS的功能。

git clone https://github.com/seata/taas.git
docker-compse up -d

由于组件依赖较多,docker-compose启动30秒后,可以对外服务

Seata服务地址

服务默认监听在8091端口,修改Seata对应的服务端地址体验

Seata UI

访问WEB UI http://127.0.0.1:8084/ui/index.html

关于InfiniVision

深见网络是一家技术驱动的企业级服务提供商,致力于利用人工智能、云计算、区块链、大数据,以及物联网边缘计算技术助力传统企业的数字化转型和升级。深见网络积极拥抱开源文化并将核心算法和架构开源,知名人脸识别软件 InsightFace (https://github.com/deepinsight/insightface) (曾多次获得大规模人脸识别挑战冠军),以及分布式存储引擎 Elasticell (https://github.com/deepfabric/elasticell) 等均是深见网络的开源产品。

关于作者

作者张旭,开源网关Gateway (https://github.com/fagongzi/gateway) 作者,目前就职于InfiniVision,负责基础架构相关的研发工作。

· 阅读需 21 分钟

Fescar 简介

常见的分布式事务方式有基于 2PC 的 XA (e.g. atomikos),从业务层入手的 TCC( e.g. byteTCC)、事务消息 ( e.g. RocketMQ Half Message) 等等。XA 是需要本地数据库支持的分布式事务的协议,资源锁在数据库层面导致性能较差,而支付宝作为布道师引入的 TCC 模式需要大量的业务代码保证,开发维护成本较高。

分布式事务是业界比较关注的领域,这也是短短时间 Fescar 能收获 6k Star 的原因之一。Fescar 名字取自 Fast & Easy Commit And Rollback ,简单来说 Fescar 通过对本地 RDBMS 分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是相对于 XA 模式是性能较好不长时间占用连接资源,相对于 TCC 方式开发成本和业务侵入性较低。

类似于 XA,Fescar 将角色分为 TC、RM、TM,事务整体过程模型如下:

Fescar事务过程

1. TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
2. XID 在微服务调用链路的上下文中传播。
3. RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
4. TM 向 TC 发起针对 XID 的全局提交或回滚决议。
5. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

其中在目前的实现版本中 TC 是独立部署的进程,维护全局事务的操作记录和全局锁记录,负责协调并驱动全局事务的提交或回滚。TM RM 则与应用程序工作在同一应用进程。RM 对 JDBC 数据源采用代理的方式对底层数据库做管理,利用语法解析,在执行事务时保留快照,并生成 undo log。大概的流程和模型划分就介绍到这里,下面开始对 Fescar 事务传播机制的分析。

Fescar 事务传播机制

Fescar 事务传播包括应用内事务嵌套调用和跨服务调用的事务传播。Fescar 事务是怎么在微服务调用链中传播的呢?Fescar 提供了事务 API 允许用户手动绑定事务的 XID 并加入到全局事务中,所以我们根据不同的服务框架机制,将 XID 在链路中传递即可实现事务的传播。

RPC 请求过程分为调用方与被调用方两部分,我们需要对 XID 在请求与响应时做相应的处理。大致过程为:调用方即请求方将当前事务上下文中的 XID 取出,通过 RPC 协议传递给被调用方;被调用方从请求中的将 XID 取出,并绑定到自己的事务上下文中,纳入全局事务。微服务框架一般都有相应的 Filter 和 Interceptor 机制,我们来具体分析下 Spring Cloud 与 Fescar 的整合过程。

Fescar 与 Spring Cloud Alibaba 集成部分源码解析

本部分源码全部来自于 spring-cloud-alibaba-fescar. 源码解析部分主要包括 AutoConfiguration、微服务被调用方和微服务调用方三大部分。对于微服务调用方方式具体分为 RestTemplate 和 Feign,其中对于 Feign 请求方式又进一步细分为结合 Hystrix 和 Sentinel 的使用模式。

Fescar AutoConfiguration

对于 AutoConfiguration 的解析此处只介绍与 Fescar 启动相关的部分,其他部分的解析将穿插于【微服务被调用方】和 【微服务调用方】章节进行介绍。

Fescar 的启动需要配置 GlobalTransactionScanner,GlobalTransactionScanner 负责初始化 Fescar 的 RM client、TM client 和 自动代理标注 GlobalTransactional 注解的类。GlobalTransactionScanner bean 的启动通过 GlobalTransactionAutoConfiguration 加载并注入 FescarProperties。
FescarProperties 包含了 Fescar 的重要属性 txServiceGroup ,此属性的可通过 application.properties 文件中的 key: spring.cloud.alibaba.fescar.txServiceGroup 读取,默认值为 ${spring.application.name}-fescar-service-group 。txServiceGroup 表示 Fescar 的逻辑事务分组名,此分组名通过配置中心(目前支持文件、Apollo)获取逻辑事务分组名对应的 TC 集群名称,进一步通过集群名称构造出 TC 集群的服务名,通过注册中心(目前支持 nacos、redis、zk 和 eureka)和服务名找到可用的 TC 服务节点,然后 RM client、TM client 与 TC 进行 rpc 交互。

微服务被调用方

由于调用方的逻辑比较多一点,我们先分析被调用方的逻辑。针对于 Spring Cloud 项目,默认采用的 RPC 传输协议是 HTTP 协议,所以使用了 HandlerInterceptor 机制来对 HTTP 的请求做拦截。

HandlerInterceptor 是 Spring 提供的接口, 它有以下三个方法可以被覆写。

    /**
* Intercept the execution of a handler. Called after HandlerMapping determined
* an appropriate handler object, but before HandlerAdapter invokes the handler.
*/
default boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {

return true;
}

/**
* Intercept the execution of a handler. Called after HandlerAdapter actually
* invoked the handler, but before the DispatcherServlet renders the view.
* Can expose additional model objects to the view via the given ModelAndView.
*/
default void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
@Nullable ModelAndView modelAndView) throws Exception {
}

/**
* Callback after completion of request processing, that is, after rendering
* the view. Will be called on any outcome of handler execution, thus allows
* for proper resource cleanup.
*/
default void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
@Nullable Exception ex) throws Exception {
}

根据注释,我们可以很明确的看到各个方法的作用时间和常用用途。对于 Fescar 集成来讲,它根据需要重写了 preHandle、afterCompletion 方法。

FescarHandlerInterceptor 的作用是将服务链路传递过来的 XID,绑定到服务节点的事务上下文中,并且在请求完成后清理相关资源。FescarHandlerInterceptorConfiguration 中配置了所有的 url 均进行拦截,对所有的请求过来均会执行该拦截器,进行 XID 的转换与事务绑定。

/**
* @author xiaojing
*
* Fescar HandlerInterceptor, Convert Fescar information into
* @see com.alibaba.fescar.core.context.RootContext from http request's header in
* {@link org.springframework.web.servlet.HandlerInterceptor#preHandle(HttpServletRequest , HttpServletResponse , Object )},
* And clean up Fescar information after servlet method invocation in
* {@link org.springframework.web.servlet.HandlerInterceptor#afterCompletion(HttpServletRequest, HttpServletResponse, Object, Exception)}
*/
public class FescarHandlerInterceptor implements HandlerInterceptor {

private static final Logger log = LoggerFactory
.getLogger(FescarHandlerInterceptor.class);

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {

String xid = RootContext.getXID();
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (log.isDebugEnabled()) {
log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
}

if (xid == null && rpcXid != null) {
RootContext.bind(rpcXid);
if (log.isDebugEnabled()) {
log.debug("bind {} to RootContext", rpcXid);
}
}
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception e) throws Exception {

String rpcXid = request.getHeader(RootContext.KEY_XID);

if (StringUtils.isEmpty(rpcXid)) {
return;
}

String unbindXid = RootContext.unbind();
if (log.isDebugEnabled()) {
log.debug("unbind {} from RootContext", unbindXid);
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
log.warn("bind {} back to RootContext", unbindXid);
}
}
}

}

preHandle 在请求执行前被调用,xid 为当前事务上下文已经绑定的全局事务的唯一标识,rpcXid 为请求通过 HTTP Header 传递过来需要绑定的全局事务标识。preHandle 方法中判断如果当前事务上下文中没有 XID,且 rpcXid 不为空,那么就将 rpcXid 绑定到当前的事务上下文。

afterCompletion 在请求完成后被调用,该方法用来执行资源的相关清理动作。Fescar 通过 RootContext.unbind() 方法对事务上下文涉及到的 XID 进行解绑。下面 if 中的逻辑是为了代码的健壮性考虑,如果遇到 rpcXid 和 unbindXid 不相等的情况,再将 unbindXid 重新绑定回去。

对于 Spring Cloud 来讲,默认采用的 RPC 方式是 HTTP 的方式,所以对被调用方来讲,它的请求拦截方式不用做任何区分,只需要从 Header 中将 XID 就可以取出绑定到自己的事务上下文中即可。但是对于调用方由于请求组件的多样化,包括熔断隔离机制,所以要区分不同的情况做处理,后面我们来具体分析一下。

微服务调用方

Fescar 将请求方式分为:RestTemplate、Feign、Feign+Hystrix 和 Feign+Sentinel 。不同的组件通过 Spring Boot 的 Auto Configuration 来完成自动的配置,具体的配置类清单可以看 spring.factories ,下文也会介绍相关的配置类。

RestTemplate

先来看下如果调用方如果是是基于 RestTemplate 的请求,Fescar 是怎么传递 XID 的。

public class FescarRestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);

String xid = RootContext.getXID();

if (!StringUtils.isEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
}

FescarRestTemplateInterceptor 实现了 ClientHttpRequestInterceptor 接口的 intercept 方法,对调用的请求做了包装,在发送请求时若存在 Fescar 事务上下文 XID 则取出并放到 HTTP Header 中。

FescarRestTemplateInterceptor 通过 FescarRestTemplateAutoConfiguration 实现将 FescarRestTemplateInterceptor 配置到 RestTemplate 中去。

@Configuration
public class FescarRestTemplateAutoConfiguration {

@Bean
public FescarRestTemplateInterceptor fescarRestTemplateInterceptor() {
return new FescarRestTemplateInterceptor();
}

@Autowired(required = false)
private Collection<RestTemplate> restTemplates;

@Autowired
private FescarRestTemplateInterceptor fescarRestTemplateInterceptor;

@PostConstruct
public void init() {
if (this.restTemplates != null) {
for (RestTemplate restTemplate : restTemplates) {
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
restTemplate.getInterceptors());
interceptors.add(this.fescarRestTemplateInterceptor);
restTemplate.setInterceptors(interceptors);
}
}
}

}

init 方法遍历所有的 restTemplate ,并将原来 restTemplate 中的拦截器取出,增加 fescarRestTemplateInterceptor 后置入并重排序。

Feign

Feign 类关系图

接下来看下 Feign 的相关代码,该包下面的类还是比较多的,我们先从其 AutoConfiguration 入手。

@Configuration
@ConditionalOnClass(Client.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class FescarFeignClientAutoConfiguration {

@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")
@ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")
Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {
return FescarHystrixFeignBuilder.builder(beanFactory);
}

@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU")
@ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true")
Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {
return FescarSentinelFeignBuilder.builder(beanFactory);
}

@Bean
@ConditionalOnMissingBean
@Scope("prototype")
Feign.Builder feignBuilder(BeanFactory beanFactory) {
return FescarFeignBuilder.builder(beanFactory);
}

@Configuration
protected static class FeignBeanPostProcessorConfiguration {

@Bean
FescarBeanPostProcessor fescarBeanPostProcessor(
FescarFeignObjectWrapper fescarFeignObjectWrapper) {
return new FescarBeanPostProcessor(fescarFeignObjectWrapper);
}

@Bean
FescarContextBeanPostProcessor fescarContextBeanPostProcessor(
BeanFactory beanFactory) {
return new FescarContextBeanPostProcessor(beanFactory);
}

@Bean
FescarFeignObjectWrapper fescarFeignObjectWrapper(BeanFactory beanFactory) {
return new FescarFeignObjectWrapper(beanFactory);
}
}

}

FescarFeignClientAutoConfiguration 在存在 Client.class 时生效,且要求作用在 FeignAutoConfiguration 之前。由于 FeignClientsConfiguration 是在 FeignAutoConfiguration 生成 FeignContext 生效的,所以根据依赖关系, FescarFeignClientAutoConfiguration 同样早于 FeignClientsConfiguration。

FescarFeignClientAutoConfiguration 自定义了 Feign.Builder,针对于 feign.sentinel,feign.hystrix 和 feign 的情况做了适配,目的是自定义 feign 中 Client 的真正实现为 FescarFeignClient。

HystrixFeign.builder().retryer(Retryer.NEVER_RETRY)
.client(new FescarFeignClient(beanFactory))
SentinelFeign.builder().retryer(Retryer.NEVER_RETRY)
.client(new FescarFeignClient(beanFactory));
Feign.builder().client(new FescarFeignClient(beanFactory));

FescarFeignClient 是对原来的 Feign 客户端代理增强,具体代码见下图:

public class FescarFeignClient implements Client {

private final Client delegate;
private final BeanFactory beanFactory;

FescarFeignClient(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
this.delegate = new Client.Default(null, null);
}

FescarFeignClient(BeanFactory beanFactory, Client delegate) {
this.delegate = delegate;
this.beanFactory = beanFactory;
}

@Override
public Response execute(Request request, Request.Options options) throws IOException {

Request modifiedRequest = getModifyRequest(request);

try {
return this.delegate.execute(modifiedRequest, options);
}
finally {

}
}

private Request getModifyRequest(Request request) {

String xid = RootContext.getXID();

if (StringUtils.isEmpty(xid)) {
return request;
}

Map<String, Collection<String>> headers = new HashMap<>();
headers.putAll(request.headers());

List<String> fescarXid = new ArrayList<>();
fescarXid.add(xid);
headers.put(RootContext.KEY_XID, fescarXid);

return Request.create(request.method(), request.url(), headers, request.body(),
request.charset());
}

上面的过程中我们可以看到,FescarFeignClient 对原来的 Request 做了修改,它首先将 XID 从当前的事务上下文中取出,在 XID 不为空的情况下,将 XID 放到了 Header 中。

FeignBeanPostProcessorConfiguration 定义了 3 个 bean:FescarContextBeanPostProcessor、FescarBeanPostProcessor 和 FescarFeignObjectWrapper。其中 FescarContextBeanPostProcessor FescarBeanPostProcessor 实现了 Spring BeanPostProcessor 接口。 以下为 FescarContextBeanPostProcessor 实现。

    @Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
if (bean instanceof FeignContext && !(bean instanceof FescarFeignContext)) {
return new FescarFeignContext(getFescarFeignObjectWrapper(),
(FeignContext) bean);
}
return bean;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}

BeanPostProcessor 中的两个方法可以对 Spring 容器中的 Bean 做前后处理,postProcessBeforeInitialization 处理时机是初始化之前,postProcessAfterInitialization 的处理时机是初始化之后,这 2 个方法的返回值可以是原先生成的实例 bean,或者使用 wrapper 包装后的实例。

FescarContextBeanPostProcessor 将 FeignContext 包装成 FescarFeignContext。
FescarBeanPostProcessor 将 FeignClient 根据是否继承了 LoadBalancerFeignClient 包装成 FescarLoadBalancerFeignClient 和 FescarFeignClient。

FeignAutoConfiguration 中的 FeignContext 并没有加 ConditionalOnXXX 的条件,所以 Fescar 采用预置处理的方式将 FeignContext 包装成 FescarFeignContext。

    @Bean
public FeignContext feignContext() {
FeignContext context = new FeignContext();
context.setConfigurations(this.configurations);
return context;
}

而对于 Feign Client,FeignClientFactoryBean 中会获取 FeignContext 的实例对象。对于开发者采用 @Configuration 注解的自定义配置的 Feign Client 对象,这里会被配置到 builder,导致 FescarFeignBuilder 中增强后的 FescarFeignCliet 失效。FeignClientFactoryBean 中关键代码如下:

	/**
* @param <T> the target type of the Feign client
* @return a {@link Feign} client created with the specified data and the context information
*/
<T> T getTarget() {
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);

if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
url += cleanPath();
return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}

上述代码根据是否指定了注解参数中的 URL 来选择直接调用 URL 还是走负载均衡,targeter.target 通过动态代理创建对象。大致过程为:将解析出的 feign 方法放入 map ,再通过将其作为参数传入生成 InvocationHandler,进而生成动态代理对象。
FescarContextBeanPostProcessor 的存在,即使开发者对 FeignClient 自定义操作,依旧可以完成 Fescar 所需的全局事务的增强。

对于 FescarFeignObjectWrapper,我们重点关注下 Wrapper 方法:

	Object wrap(Object bean) {
if (bean instanceof Client && !(bean instanceof FescarFeignClient)) {
if (bean instanceof LoadBalancerFeignClient) {
LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean);
return new FescarLoadBalancerFeignClient(client.getDelegate(), factory(),
clientFactory(), this.beanFactory);
}
return new FescarFeignClient(this.beanFactory, (Client) bean);
}
return bean;
}

wrap 方法中,如果 bean 是 LoadBalancerFeignClient 的实例对象,那么首先通过 client.getDelegate() 方法将 LoadBalancerFeignClient 代理的实际 Client 对象取出后包装成 FescarFeignClient,再生成 LoadBalancerFeignClient 的子类 FescarLoadBalancerFeignClient 对象。如果 bean 是 Client 的实例对象且不是 FescarFeignClient LoadBalancerFeignClient,那么 bean 会直接包装生成 FescarFeignClient。

上面的流程设计还是比较巧妙的,首先根据 Spring boot 的 Auto Configuration 控制了配置的先后顺序,同时自定义了 Feign Builder 的 Bean,保证了 Client 均是经过增强后的 FescarFeignClient 。再通过 BeanPostProcessor 对 Spring 容器中的 Bean 做了一遍包装,保证容器内的 Bean 均是增强后 FescarFeignClient ,避免 FeignClientFactoryBean getTarget 方法的替换动作。

Hystrix 隔离

下面我们再来看下 Hystrix 部分,为什么要单独把 Hystrix 拆出来看呢,而且 Fescar 代码也单独实现了个策略类。目前事务上下文 RootContext 的默认实现是基于 ThreadLocal 方式的 ThreadLocalContextCore,也就是上下文其实是和线程绑定的。Hystrix 本身有两种隔离状态的模式,基于信号量或者基于线程池进行隔离。Hystrix 官方建议是采取线程池的方式来充分隔离,也是一般情况下在采用的模式:

Thread or Semaphore
The default, and the recommended setting, is to run HystrixCommands using thread isolation (THREAD) and HystrixObservableCommands using semaphore isolation (SEMAPHORE).

Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.

Generally the only time you should use semaphore isolation for HystrixCommands is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.

service 层的业务代码和请求发出的线程肯定不是同一个,那么 ThreadLocal 的方式就没办法将 XID 传递给 Hystrix 的线程并传递给被调用方的。怎么处理这件事情呢,Hystrix 提供了机制让开发者去自定义并发策略,只需要继承 HystrixConcurrencyStrategy 重写 wrapCallable 方法即可。

public class FescarHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

private HystrixConcurrencyStrategy delegate;

public FescarHystrixConcurrencyStrategy() {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
}

@Override
public <K> Callable<K> wrapCallable(Callable<K> c) {
if (c instanceof FescarContextCallable) {
return c;
}

Callable<K> wrappedCallable;
if (this.delegate != null) {
wrappedCallable = this.delegate.wrapCallable(c);
}
else {
wrappedCallable = c;
}
if (wrappedCallable instanceof FescarContextCallable) {
return wrappedCallable;
}

return new FescarContextCallable<>(wrappedCallable);
}

private static class FescarContextCallable<K> implements Callable<K> {

private final Callable<K> actual;
private final String xid;

FescarContextCallable(Callable<K> actual) {
this.actual = actual;
this.xid = RootContext.getXID();
}

@Override
public K call() throws Exception {
try {
RootContext.bind(xid);
return actual.call();
}
finally {
RootContext.unbind();
}
}

}
}

Fescar 也提供一个 FescarHystrixAutoConfiguration,在存在 HystrixCommand 的时候生成 FescarHystrixConcurrencyStrategy

@Configuration
@ConditionalOnClass(HystrixCommand.class)
public class FescarHystrixAutoConfiguration {

@Bean
FescarHystrixConcurrencyStrategy fescarHystrixConcurrencyStrategy() {
return new FescarHystrixConcurrencyStrategy();
}

}

参考文献

本文作者

郭树抗,社区昵称 ywind,曾就职于华为终端云,现搜狐智能媒体中心 Java 工程师,目前主要负责搜狐号相关开发,对分布式事务、分布式系统和微服务架构有异常浓厚的兴趣。
季敏(清铭),社区昵称 slievrly,Fescar 开源项目负责人,阿里巴巴中间件 TXC/GTS 核心研发成员,长期从事于分布式中间件核心研发工作,在分布式事务领域有着较丰富的技术积累。

· 阅读需 9 分钟

针对Fescar 相信很多开发者已经对他并不陌生,当然Fescar 已经成为了过去时,为什么说它是过去时,因为Fescar 已经华丽的变身为Seata。如果还不知道Seata 的朋友,请登录下面网址查看。

SEATA GITHUB:[https://github.com/apache/incubator-seata]

对于阿里各位同学的前仆后继,给我们广大开发者带来很多开源软件,在这里对他们表示真挚的感谢与问候。

今天在这里和大家分享下Spring Cloud 整合Seata 的相关心得。也让更多的朋友在搭建的道路上少走一些弯路,少踩一些坑。

2.工程内容

本次搭建流程为:client->网关->服务消费者->服务提供者.

                        技术框架:spring cloud gateway

spring cloud fegin

nacos1.0.RC2

fescar-server0.4.1(Seata)

关于nacos的启动方式请参考:Nacos启动参考

首先seata支持很多种注册服务方式,在 fescar-server-0.4.1\conf 目录下

    file.conf
logback.xml
nacos-config.sh
nacos-config.text
registry.conf

总共包含五个文件,其中 file.conf和 registry.conf 分别是我们在 服务消费者 & 服务提供者 代码段需要用到的文件。 注:file.conf和 registry.conf 必须在当前使用的应用程序中,即: 服务消费者 & 服务提供者 两个应用在都需要包含。 如果你采用了配置中心 是nacos 、zk ,file.cnf 是可以忽略的。但是type=“file” 如果是为file 就必须得用file.cnf

下面是registry.conf 文件中的配置信息,其中 registry 是注册服务中心配置。config为配置中心的配置地方。

从下面可知道目前seata支持nacos,file eureka redis zookeeper 等注册配置方式,默认下载的type=“file” 文件方式,当然这里选用什么方式,取决于

每个人项目的实际情况,这里我选用的是nacos,eureka的也是可以的,我这边分别对这两个版本进行整合测试均可以通过。

注:如果整合eureka请选用官方最新版本。

3.核心配置

registry {
# file 、nacos 、eureka、redis、zk
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:1001/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
apollo {
app.id = "fescar-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}

这里要说明的是nacos-config.sh 是针对采用nacos配置中心的话,需要执行的一些默认初始化针对nacos的脚本。

SEATA的启动方式参考官方: 注意,这里需要说明下,命令启动官方是通过 空格区分参数,所以要注意。这里的IP 是可选参数,因为涉及到DNS解析,在部分情况下,有的时候在注册中心fescar 注入nacos的时候会通过获取地址,如果启动报错注册发现是计算机名称,需要指定IP。或者host配置IP指向。不过这个问题,在最新的SEATA中已经进行了修复。

sh fescar-server.sh 8091 /home/admin/fescar/data/ IP(可选)

上面提到过,在我们的代码中也是需要file.conf 和registry.conf 这里着重的地方要说的是file.conf,file.conf只有当registry中 配置file的时候才会进行加载,如果采用ZK、nacos、作为配置中心,可以忽略。因为type指定其他是不加载file.conf的,但是对应的 service.localRgroup.grouplist 和 service.vgroupMapping 需要在支持配置中心 进行指定,这样你的client 在启动后会通过自动从配置中心获取对应的 SEATA 服务 和地址。如果不配置会出现无法连接server的错误。当然如果你采用的eureka在config的地方就需要采用type="file" 目前SEATA config暂时不支持eureka的形势

transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
}
service {
#vgroup->rgroup
vgroup_mapping.service-provider-fescar-service-group = "default"
#only support single node
localRgroup.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
}

client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}

4.服务相关

这里有两个地方需要注意

    grouplist IP,这里是当前fescar-sever的IP端口,
vgroup_mapping的配置。

vgroup_mapping.服务名称-fescar-service-group,这里 要说下服务名称其实是你当前的consumer 或者provider application.properties的配置的应用名称:spring.application.name=service-provider,源代码中是 获取应用名称与 fescar-service-group 进行拼接,做key值。同理value是当前fescar的服务名称, cluster = "default" / application = "default"

     vgroup_mapping.service-provider-fescar-service-group = "default"
#only support single node
localRgroup.grouplist = "127.0.0.1:8091"

同理无论是provider 还是consumer 都需要这两个文件进行配置。

如果你采用nacos做配置中心,需要在nacos通过添加配置方式进行配置添加。

5.事务使用

我这里的代码逻辑是请求通过网关进行负载转发到我的consumer上,在consumer 中通过fegin进行provider请求。官方的例子中是通过fegin进行的,而我们这边直接通过网关转发,所以全局事务同官方的demo一样 也都是在controller层。

@RestController
public class DemoController {
@Autowired
private DemoFeignClient demoFeignClient;

@Autowired
private DemoFeignClient2 demoFeignClient2;
@GlobalTransactional(timeoutMills = 300000, name = "spring-cloud-demo-tx")
@GetMapping("/getdemo")
public String demo() {

// 调用A 服务 简单save
ResponseData<Integer> result = demoFeignClient.insertService("test",1);
if(result.getStatus()==400) {
System.out.println(result+"+++++++++++++++++++++++++++++++++++++++");
throw new RuntimeException("this is error1");
}

// 调用B 服务。报错测试A 服务回滚
ResponseData<Integer> result2 = demoFeignClient2.saveService();

if(result2.getStatus()==400) {
System.out.println(result2+"+++++++++++++++++++++++++++++++++++++++");
throw new RuntimeException("this is error2");
}

return "SUCCESS";
}
}

到此为止核心的事务整合基本到此结束了,我这里是针对A,B 两个provider进行调用,当B发生报错后,进行全局事务回滚。当然每个事务内部都可以通过自己的独立本地事务去处理自己本地事务方式。

SEATA是通过全局的XID方式进行事务统一标识方式。这里就不列出SEATA需要用的数据库表。具体参考:spring-cloud-fescar 官方DEMO

5.数据代理

这里还有一个重要的说明就是,在分库服务的情况下,每一个数据库内都需要有一个undo_log的数据库表进行XID统一存储处理。

同事针对每个提供服务的项目,需要进行数据库连接池的代理。也就是:

目前只支持Druid连接池,后续会继续支持。

@Configuration
public class DatabaseConfiguration {


@Bean(destroyMethod = "close", initMethod = "init")
@ConfigurationProperties(prefix="spring.datasource")
public DruidDataSource druidDataSource() {

return new DruidDataSource();
}


@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource) {

return new DataSourceProxy(druidDataSource);
}


@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSourceProxy);
return factoryBean.getObject();
}
}

大家要注意的就是配置文件和数据代理。如果没有进行数据源代理,undo_log是无数据的,也就是没办进行XID的管理。

本文作者:大菲.Fei

· 阅读需 26 分钟

前言

在分布式系统中,分布式事务是一个必须要解决的问题,目前使用较多的是最终一致性方案。自年初阿里开源了Fescar(四月初更名为Seata)后,该项目受到了极大的关注度,目前已接近8000Star。Seata以高性能和零侵入的方式为目标解决微服务领域的分布式事务难题,目前正处于快速迭代中,近期小目标是生产可用的Mysql版本。关于Seata的总体介绍,可以查看官方WIKI获得更多更全面的内容介绍。

本文主要基于spring cloud+spring jpa+spring cloud alibaba fescar+mysql+seata的结构,搭建一个分布式系统的demo,通过seata的debug日志和源代码,从client端(RM、TM)的角度分析说明其工作流程及原理。

文中代码基于fescar-0.4.1,由于项目刚更名为seata不久,例如一些包名、类名、jar包名称还都是fescar的命名,故下文中仍使用fescar进行表述。

示例项目:https://github.com/fescar-group/fescar-samples/tree/master/springcloud-jpa-seata

相关概念

  • XID:全局事务的唯一标识,由ip:port:sequence组成
  • Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚
  • Transaction Manager (TM ):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议
  • Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚

分布式框架支持

Fescar使用XID表示一个分布式事务,XID需要在一次分布式事务请求所涉的系统中进行传递,从而向feacar-server发送分支事务的处理情况,以及接收feacar-server的commit、rollback指令。 Fescar官方已支持全版本的dubbo协议,而对于spring cloud(spring-boot)的分布式项目社区也提供了相应的实现

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-fescar</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>

该组件实现了基于RestTemplate、Feign通信时的XID传递功能。

业务逻辑

业务逻辑是经典的下订单、扣余额、减库存流程。 根据模块划分为三个独立的服务,且分别连接对应的数据库

  • 订单:order-server
  • 账户:account-server
  • 库存:storage-server

另外还有发起分布式事务的业务系统

  • 业务:business-server

项目结构如下图 在这里插入图片描述

正常业务

  1. business发起购买请求
  2. storage扣减库存
  3. order创建订单
  4. account扣减余额

异常业务

  1. business发起购买请求
  2. storage扣减库存
  3. order创建订单
  4. account扣减余额异常

正常流程下2、3、4步的数据正常更新全局commit,异常流程下的数据则由于第4步的异常报错全局回滚。

配置文件

fescar的配置入口文件是registry.conf,查看代码ConfigurationFactory得知目前还不能指定该配置文件,所以配置文件名称只能为registry.conf

private static final String REGISTRY_CONF = "registry.conf";
public static final Configuration FILE_INSTANCE = new FileConfiguration(REGISTRY_CONF);

registry中可以指定具体配置的形式,默认使用file类型,在file.conf中有3部分配置内容

  1. transport transport部分的配置对应NettyServerConfig类,用于定义Netty相关的参数,TM、RM与fescar-server之间使用Netty进行通信
  2. service
	 service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#配置Client连接TC的地址
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
是否启用seata的分布式事务
disableGlobalTransaction = false
}
  1. client
	client {
#RM接收TC的commit通知后缓冲上限
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}

数据源Proxy

除了前面的配置文件,fescar在AT模式下稍微有点代码量的地方就是对数据源的代理指定,且目前只能基于DruidDataSource的代理。 注:在最新发布的0.4.2版本中已支持任意数据源类型

@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}

使用DataSourceProxy的目的是为了引入ConnectionProxy,fescar无侵入的一方面就体现在ConnectionProxy的实现上,即分支事务加入全局事务的切入点是在本地事务的commit阶段,这样设计可以保证业务数据与undo_log是在一个本地事务中。

undo_log是需要在业务库上创建的一个表,fescar依赖该表记录每笔分支事务的状态及二阶段rollback的回放数据。不用担心该表的数据量过大形成单点问题,在全局事务commit的场景下事务对应的undo_log会异步删除。

CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

启动Server

前往https://github.com/apache/incubator-seata/releases 下载与Client版本对应的fescar-server,避免由于版本的不同导致的协议不一致问题 进入解压之后的 bin 目录,执行

./fescar-server.sh 8091 ../data

启动成功输出

2019-04-09 20:27:24.637 INFO [main]c.a.fescar.core.rpc.netty.AbstractRpcRemotingServer.start:152 -Server started ... 

启动Client

fescar的加载入口类位于GlobalTransactionAutoConfiguration,对基于spring boot的项目能够自动加载,当然也可以通过其他方式示例化GlobalTransactionScanner

@Configuration
@EnableConfigurationProperties({FescarProperties.class})
public class GlobalTransactionAutoConfiguration {
private final ApplicationContext applicationContext;
private final FescarProperties fescarProperties;

public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext, FescarProperties fescarProperties) {
this.applicationContext = applicationContext;
this.fescarProperties = fescarProperties;
}

/**
* 示例化GlobalTransactionScanner
* scanner为client初始化的发起类
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name");
String txServiceGroup = this.fescarProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-fescar-service-group";
this.fescarProperties.setTxServiceGroup(txServiceGroup);
}

return new GlobalTransactionScanner(applicationName, txServiceGroup);
}
}

可以看到支持一个配置项FescarProperties,用于配置事务分组名称

spring.cloud.alibaba.fescar.tx-service-group=my_test_tx_group

如果不指定服务组,则默认使用spring.application.name+ -fescar-service-group生成名称,所以不指定spring.application.name启动会报错

@ConfigurationProperties("spring.cloud.alibaba.fescar")
public class FescarProperties {
private String txServiceGroup;

public FescarProperties() {
}

public String getTxServiceGroup() {
return this.txServiceGroup;
}

public void setTxServiceGroup(String txServiceGroup) {
this.txServiceGroup = txServiceGroup;
}
}

获取applicationId和txServiceGroup后,创建GlobalTransactionScanner对象,主要看类中initClient方法

private void initClient() {
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(
"applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
}
//init TM
TMClient.init(applicationId, txServiceGroup);

//init RM
RMClient.init(applicationId, txServiceGroup);

}

方法中可以看到初始化了TMClientRMClient,对于一个服务既可以是TM角色也可以是RM角色,至于什么时候是TM或者RM则要看在一次全局事务中@GlobalTransactional注解标注在哪。 Client创建的结果是与TC的一个Netty连接,所以在启动日志中可以看到两个Netty Channel,其中标明了transactionRole分别为TMROLERMROLE

2019-04-09 13:42:57.417  INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory   : NettyPool create channel to {"address":"127.0.0.1:8091","message":{"applicationId":"business-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"transactionServiceGroup":"my_test_tx_group","typeCode":101,"version":"0.4.1"},"transactionRole":"TMROLE"}
2019-04-09 13:42:57.505 INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory : NettyPool create channel to {"address":"127.0.0.1:8091","message":{"applicationId":"business-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"transactionServiceGroup":"my_test_tx_group","typeCode":103,"version":"0.4.1"},"transactionRole":"RMROLE"}
2019-04-09 13:42:57.629 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:RegisterTMRequest{applicationId='business-service', transactionServiceGroup='my_test_tx_group'}
2019-04-09 13:42:57.629 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:RegisterRMRequest{resourceIds='null', applicationId='business-service', transactionServiceGroup='my_test_tx_group'}
2019-04-09 13:42:57.699 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:1
2019-04-09 13:42:57.699 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:2
2019-04-09 13:42:57.701 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.RmRpcClient@3b06d101 msgId:1, future :com.alibaba.fescar.core.protocol.MessageFuture@28bb1abd, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 13:42:57.701 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.TmRpcClient@65fc3fb7 msgId:2, future :com.alibaba.fescar.core.protocol.MessageFuture@9a1e3df, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 13:42:57.710 INFO 93715 --- [imeoutChecker_1] c.a.fescar.core.rpc.netty.RmRpcClient : register RM success. server version:0.4.1,channel:[id: 0xe6468995, L:/127.0.0.1:57397 - R:/127.0.0.1:8091]
2019-04-09 13:42:57.710 INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory : register success, cost 114 ms, version:0.4.1,role:TMROLE,channel:[id: 0xd22fe0c5, L:/127.0.0.1:57398 - R:/127.0.0.1:8091]
2019-04-09 13:42:57.711 INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory : register success, cost 125 ms, version:0.4.1,role:RMROLE,channel:[id: 0xe6468995, L:/127.0.0.1:57397 - R:/127.0.0.1:8091]

日志中可以看到

  1. 创建Netty连接
  2. 发送注册请求
  3. 得到响应结果
  4. RmRpcClientTmRpcClient成功实例化

TM处理流程

在本例中,TM的角色是business-service,BusinessService的purchase方法标注了@GlobalTransactional注解

@Service
public class BusinessService {

@Autowired
private StorageFeignClient storageFeignClient;
@Autowired
private OrderFeignClient orderFeignClient;

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount){
storageFeignClient.deduct(commodityCode, orderCount);

orderFeignClient.create(userId, commodityCode, orderCount);
}
}

方法调用后将会创建一个全局事务,首先关注@GlobalTransactional注解的作用,在GlobalTransactionalInterceptor中被拦截处理

/**
* AOP拦截方法调用
*/
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

//获取方法GlobalTransactional注解
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);

//如果方法有GlobalTransactional注解,则拦截到相应方法处理
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}

handleGlobalTransaction方法中对TransactionalTemplate的execute进行了调用,从类名可以看到这是一个标准的模版方法,它定义了TM对全局事务处理的标准步骤,注释已经比较清楚了

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

try {
// 2. begin transaction
try {
triggerBeforeBegin();
tx.begin(business.timeout(), business.name());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. any business exception, rollback.
try {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. everything is fine, commit.
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}

通过DefaultGlobalTransaction的begin方法开启全局事务

public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
check();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid != null) {
throw new IllegalStateException();
}
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
//具体开启事务的方法,获取TC返回的XID
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Begin a NEW global transaction [" + xid + "]");
}
}

方法开头处if (role != GlobalTransactionRole.Launcher)对role的判断有关键的作用,表明当前是全局事务的发起者(Launcher)还是参与者(Participant)。如果在分布式事务的下游系统方法中也加上@GlobalTransactional注解,那么它的角色就是Participant,会忽略后面的begin直接return,而判断是Launcher还是Participant是根据当前上下文是否已存在XID来判断,没有XID的就是Launcher,已经存在XID的就是Participant. 由此可见,全局事务的创建只能由Launcher执行,而一次分布式事务中也只有一个Launcher存在。

DefaultTransactionManager负责TM与TC通讯,发送begin、commit、rollback指令

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
return response.getXid();
}

至此拿到fescar-server返回的XID表示一个全局事务创建成功,日志中也反应了上述流程

2019-04-09 13:46:57.417 DEBUG 31326 --- [nio-8084-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : offer message: timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int)
2019-04-09 13:46:57.417 DEBUG 31326 --- [geSend_TMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : write message:FescarMergeMessage timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int), channel:[id: 0xa148545e, L:/127.0.0.1:56120 - R:/127.0.0.1:8091],active?true,writable?true,isopen?true
2019-04-09 13:46:57.418 DEBUG 31326 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:FescarMergeMessage timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int)
2019-04-09 13:46:57.421 DEBUG 31326 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:MergeResultMessage com.alibaba.fescar.core.protocol.transaction.GlobalBeginResponse@2dc480dc,messageId:1196
2019-04-09 13:46:57.421 DEBUG 31326 --- [nio-8084-exec-1] c.a.fescar.core.context.RootContext : bind 192.168.224.93:8091:2008502699
2019-04-09 13:46:57.421 DEBUG 31326 --- [nio-8084-exec-1] c.a.f.tm.api.DefaultGlobalTransaction : Begin a NEW global transaction [192.168.224.93:8091:2008502699]

全局事务创建后,就开始执行business.execute(),即业务代码storageFeignClient.deduct(commodityCode, orderCount)进入RM处理流程,此处的业务逻辑为调用storage-service的扣减库存接口。

RM处理流程

@GetMapping(path = "/deduct")
public Boolean deduct(String commodityCode, Integer count){
storageService.deduct(commodityCode,count);
return true;
}

@Transactional
public void deduct(String commodityCode, int count){
Storage storage = storageDAO.findByCommodityCode(commodityCode);
storage.setCount(storage.getCount()-count);

storageDAO.save(storage);
}

storage的接口和service方法并未出现fescar相关的代码和注解,体现了fescar的无侵入。那它是如何加入到这次全局事务中的呢?答案在ConnectionProxy中,这也是前面说为什么必须要使用DataSourceProxy的原因,通过DataSourceProxy才能在业务代码的本地事务提交时,fescar通过该切入点,向TC注册分支事务并发送RM的处理结果。

由于业务代码本身的事务提交被ConnectionProxy代理实现,所以在提交本地事务时,实际执行的是ConnectionProxy的commit方法

public void commit() throws SQLException {
//如果当前是全局事务,则执行全局事务的提交
//判断是不是全局事务,就是看当前上下文是否存在XID
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}

private void processGlobalTransactionCommit() throws SQLException {
try {
//首先是向TC注册RM,拿到TC分配的branchId
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}

try {
if (context.hasUndoLog()) {
//写入undolog
UndoLogManager.flushUndoLogs(this);
}

//提交本地事务,写入undo_log和业务数据在同一个本地事务中
targetConnection.commit();
} catch (Throwable ex) {
//向TC发送RM的事务处理失败的通知
report(false);
if (ex instanceof SQLException) {
throw new SQLException(ex);
}
}
//向TC发送RM的事务处理成功的通知
report(true);
context.reset();
}

private void register() throws TransactionException {
//注册RM,构建request通过netty向TC发送注册指令
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
//将返回的branchId存在上下文中
context.setBranchId(branchId);
}

通过日志印证一下上面的流程

2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor   : xid in RootContext null xid in RpcContext 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] c.a.fescar.core.context.RootContext : bind 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor : bind 192.168.0.2:8091:2008546211 to RootContext
2019-04-09 21:57:48.386 INFO 38933 --- [nio-8081-exec-1] o.h.h.i.QueryTranslatorFactoryInitiator : HHH000397: Using ASTQueryTranslatorFactory
Hibernate: select storage0_.id as id1_0_, storage0_.commodity_code as commodit2_0_, storage0_.count as count3_0_ from storage_tbl storage0_ where storage0_.commodity_code=?
Hibernate: update storage_tbl set count=? where id=?
2019-04-09 21:57:48.673 INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient : will connect to 192.168.0.2:8091
2019-04-09 21:57:48.673 INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient : RM will register :jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false
2019-04-09 21:57:48.673 INFO 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.NettyPoolableFactory : NettyPool create channel to {"address":"192.168.0.2:8091","message":{"applicationId":"storage-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"resourceIds":"jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false","transactionServiceGroup":"hello-service-fescar-service-group","typeCode":103,"version":"0.4.0"},"transactionRole":"RMROLE"}
2019-04-09 21:57:48.677 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false', applicationId='storage-service', transactionServiceGroup='hello-service-fescar-service-group'}
2019-04-09 21:57:48.680 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:9
2019-04-09 21:57:48.680 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.RmRpcClient@7d61f5d4 msgId:9, future :com.alibaba.fescar.core.protocol.MessageFuture@186cd3e0, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 21:57:48.680 INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient : register RM success. server version:0.4.1,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:48.680 INFO 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.NettyPoolableFactory : register success, cost 3 ms, version:0.4.1,role:RMROLE,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:48.680 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting : offer message: transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1
2019-04-09 21:57:48.681 DEBUG 38933 --- [geSend_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : write message:FescarMergeMessage transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1, channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091],active?true,writable?true,isopen?true
2019-04-09 21:57:48.681 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:FescarMergeMessage transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1
2019-04-09 21:57:48.687 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:MergeResultMessage BranchRegisterResponse: transactionId=2008546211,branchId=2008546212,result code =Success,getMsg =null,messageId:11
2019-04-09 21:57:48.702 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.rm.datasource.undo.UndoLogManager : Flushing UNDO LOG: {"branchId":2008546212,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"id","type":4,"value":1},{"keyType":"NULL","name":"count","type":4,"value":993}]}],"tableName":"storage_tbl"},"beforeImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"id","type":4,"value":1},{"keyType":"NULL","name":"count","type":4,"value":994}]}],"tableName":"storage_tbl"},"sqlType":"UPDATE","tableName":"storage_tbl"}],"xid":"192.168.0.2:8091:2008546211"}
2019-04-09 21:57:48.755 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting : offer message: transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null
2019-04-09 21:57:48.755 DEBUG 38933 --- [geSend_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : write message:FescarMergeMessage transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null, channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091],active?true,writable?true,isopen?true
2019-04-09 21:57:48.756 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:FescarMergeMessage transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null
2019-04-09 21:57:48.758 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:MergeResultMessage com.alibaba.fescar.core.protocol.transaction.BranchReportResponse@582a08cf,messageId:13
2019-04-09 21:57:48.799 DEBUG 38933 --- [nio-8081-exec-1] c.a.fescar.core.context.RootContext : unbind 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.799 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor : unbind 192.168.0.2:8091:2008546211 from RootContext
  1. 获取business-service传来的XID
  2. 绑定XID到当前上下文中
  3. 执行业务逻辑sql
  4. 向TC创建本次RM的Netty连接
  5. 向TC发送分支事务的相关信息
  6. 获得TC返回的branchId
  7. 记录Undo Log数据
  8. 向TC发送本次事务PhaseOne阶段的处理结果
  9. 从当前上下文中解绑XID

其中第1步和第9步,是在FescarHandlerInterceptor中完成的,该类并不属于fescar,是前面提到的spring-cloud-alibaba-fescar,它实现了基于feign、rest通信时将xid bind和unbind到当前请求上下文中。到这里RM完成了PhaseOne阶段的工作,接着看PhaseTwo阶段的处理逻辑。

事务提交

各分支事务执行完成后,TC对各RM的汇报结果进行汇总,给各RM发送commit或rollback的指令

2019-04-09 21:57:49.813 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null,messageId:1
2019-04-09 21:57:49.813 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.RmRpcClient@7d61f5d4 msgId:1, body:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null
2019-04-09 21:57:49.814 INFO 38933 --- [atch_RMROLE_1_8] c.a.f.core.rpc.netty.RmMessageListener : onMessage:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null
2019-04-09 21:57:49.816 INFO 38933 --- [atch_RMROLE_1_8] com.alibaba.fescar.rm.AbstractRMHandler : Branch committing: 192.168.0.2:8091:2008546211 2008546212 jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false null
2019-04-09 21:57:49.816 INFO 38933 --- [atch_RMROLE_1_8] com.alibaba.fescar.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
2019-04-09 21:57:49.817 INFO 38933 --- [atch_RMROLE_1_8] c.a.fescar.core.rpc.netty.RmRpcClient : RmRpcClient sendResponse branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null
2019-04-09 21:57:49.817 DEBUG 38933 --- [atch_RMROLE_1_8] c.a.f.c.rpc.netty.AbstractRpcRemoting : send response:branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:49.817 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null

从日志中可以看到

  1. RM收到XID=192.168.0.2:8091:2008546211,branchId=2008546212的commit通知
  2. 执行commit动作
  3. 将commit结果发送给TC,branchStatus为PhaseTwo_Committed

具体看下二阶段commit的执行过程,在AbstractRMHandler类的doBranchCommit方法

/**
* 拿到通知的xid、branchId等关键参数
* 然后调用RM的branchCommit
*/
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info("Branch commit result: " + status);
}

最终会将branchCommit的请求调用到AsyncWorker的branchCommit方法。AsyncWorker的处理方式是fescar架构的一个关键部分,因为大部分事务都是会正常提交的,所以在PhaseOne阶段就已经结束了,这样就可以将锁最快的释放。PhaseTwo阶段接收commit的指令后,异步处理即可。将PhaseTwo的时间消耗排除在一次分布式事务之外。

private static final List<Phase2Context> ASYNC_COMMIT_BUFFER = Collections.synchronizedList( new ArrayList<Phase2Context>());

/**
* 将需要提交的XID加入list
*/
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) {
ASYNC_COMMIT_BUFFER.add(new Phase2Context(branchType, xid, branchId, resourceId, applicationData));
} else {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
}
return BranchStatus.PhaseTwo_Committed;
}

/**
* 通过定时任务消费list中的XID
*/
public synchronized void init() {
LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
timerExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("AsyncWorker", 1, true));
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
doBranchCommits();
} catch (Throwable e) {
LOGGER.info("Failed at async committing ... " + e.getMessage());
}
}
}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}

private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.size() == 0) {
return;
}
Map<String, List<Phase2Context>> mappedContexts = new HashMap<>();
Iterator<Phase2Context> iterator = ASYNC_COMMIT_BUFFER.iterator();

//一次定时循环取出ASYNC_COMMIT_BUFFER中的所有待办数据
//以resourceId作为key分组待commit数据,resourceId是一个数据库的连接url
//在前面的日志中可以看到,目的是为了覆盖应用的多数据源创建
while (iterator.hasNext()) {
Phase2Context commitContext = iterator.next();
List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
if (contextsGroupedByResourceId == null) {
contextsGroupedByResourceId = new ArrayList<>();
mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
}
contextsGroupedByResourceId.add(commitContext);

iterator.remove();

}

for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
Connection conn = null;
try {
try {
//根据resourceId获取数据源以及连接
DataSourceProxy dataSourceProxy = DataSourceManager.get().get(entry.getKey());
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
continue;
}
List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
for (Phase2Context commitContext : contextsGroupedByResourceId) {
try {
//执行undolog的处理,即删除xid、branchId对应的记录
UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn);
} catch (Exception ex) {
LOGGER.warn(
"Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex);
}
}

} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}
}
}

所以对于commit动作的处理,RM只需删除xid、branchId对应的undo_log即可。

事务回滚

对于rollback场景的触发有两种情况

  1. 分支事务处理异常,即ConnectionProxyreport(false)的情况
  2. TM捕获到下游系统上抛的异常,即发起全局事务标有@GlobalTransactional注解的方法捕获到的异常。在前面TransactionalTemplate类的execute模版方法中,对business.execute()的调用进行了catch,catch后会调用rollback,由TM通知TC对应XID需要回滚事务
 public void rollback() throws TransactionException {
//只有Launcher能发起这个rollback
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid == null) {
throw new IllegalStateException();
}

status = transactionManager.rollback(xid);
if (RootContext.getXID() != null) {
if (xid.equals(RootContext.getXID())) {
RootContext.unbind();
}
}
}

TC汇总后向参与者发送rollback指令,RM在AbstractRMHandler类的doBranchRollback方法中接收这个rollback的通知

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info("Branch rolling back: " + xid + " " + branchId + " " + resourceId);
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info("Branch rollback result: " + status);
}

然后将rollback请求传递到DataSourceManager类的branchRollback方法

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
//根据resourceId获取对应的数据源
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}

最终会执行UndoLogManager类的undo方法,因为是纯jdbc操作代码比较长就不贴出来了,可以通过连接到github查看源码,说一下undo的具体流程

  1. 根据xid和branchId查找PhaseOne阶段提交的undo_log
  2. 如果找到了就根据undo_log中记录的数据生成回放sql并执行,即还原PhaseOne阶段修改的数据
  3. 第2步处理完后,删除该条undo_log数据
  4. 如果第1步没有找到对应的undo_log,就插入一条状态为GlobalFinished的undo_log. 出现没找到的原因可能是PhaseOne阶段的本地事务异常了,导致没有正常写入。 因为xid和branchId是唯一索引,所以第4步的插入,可以防止PhaseOne阶段恢复后的成功写入,那么PhaseOne阶段就会异常,这样一来业务数据也就不会提交成功,数据达到了最终回滚了的效果

总结

本地结合分布式业务场景,分析了fescar client侧的主要处理流程,对TM和RM角色的主要源码进行了解析,希望能对大家理解fescar的工作原理有所帮助。

随着fescar的快速迭代以及后期的Roadmap规划,假以时日相信fescar能够成为开源分布式事务的标杆解决方案。

· 阅读需 30 分钟

再前不久,我写了一篇关于分布式事务中间件 Fescar 的解析,没过几天 Fescar 团队对其进行了品牌升级,取名为 Seata(Simpe Extensible Autonomous Transaction Architecture),而以前的 Fescar 的英文全称为 Fast & EaSy Commit And Rollback。可以看见 Fescar 从名字上来看更加局限于 Commit 和 Rollback,而新的品牌名字 Seata 旨在打造一套一站式分布式事务解决方案。更换名字之后,我对其未来的发展更有信心。

这里先大概回忆一下 Seata 的整个过程模型:

  • TM:事务的发起者。用来告诉 TC,全局事务的开始,提交,回滚。
  • RM:具体的事务资源,每一个 RM 都会作为一个分支事务注册在 TC。
  • TC:事务的协调者。也可以看做是 Fescar-server,用于接收我们的事务的注册,提交和回滚。

在之前的文章中对整个角色有个大体的介绍,在这篇文章中我将重点介绍其中的核心角色 TC,也就是事务协调器。

2.Transcation Coordinator

为什么之前一直强调 TC 是核心呢?那因为 TC 这个角色就好像上帝一样,管控着云云众生的 RM 和 TM。如果 TC 一旦不好使,那么 RM 和 TM 一旦出现小问题,那必定会乱的一塌糊涂。所以要想了解 Seata,那么必须要了解它的 TC。

那么一个优秀的事务协调者应该具备哪些能力呢?我觉得应该有以下几个:

  • 正确的协调:能正确的协调 RM 和 TM 接下来应该做什么,做错了应该怎么办,做对了应该怎么办。
  • 高可用: 事务协调器在分布式事务中很重要,如果不能保证高可用,那么它也没有存在的必要了。
  • 高性能:事务协调器的性能一定要高,如果事务协调器性能有瓶颈那么它所管理的 RM 和 TM 那么会经常遇到超时,从而引起回滚频繁。
  • 高扩展性:这个特点是属于代码层面的,如果是一个优秀的框架,那么需要给使用方很多自定义扩展,比如服务注册/发现,读取配置等等。

下面我也将逐步阐述 Seata 是如何做到上面四点。

2.1 Seata-Server 的设计

Seata-Server 整体的模块图如上所示:

  • Coordinator Core: 在最下面的模块是事务协调器核心代码,主要用来处理事务协调的逻辑,如是否 commit,rollback 等协调活动。
  • Store:存储模块,用来将我们的数据持久化,防止重启或者宕机数据丢失。
  • Discovery: 服务注册/发现模块,用于将 Server 地址暴露给我们 Client。
  • Config: 用来存储和查找我们服务端的配置。
  • Lock: 锁模块,用于给 Seata 提供全局锁的功能。
  • RPC:用于和其它端通信。
  • HA-Cluster:高可用集群,目前还没开源,为 Seata 提供可靠的高可用服务,预计将会在 0.6 版本开源。

2.2 Discovery

首先来讲讲比较基础的 Discovery 模块,又称服务注册/发现模块。我们将 Seata-Sever 启动之后,需要将自己的地址暴露给其它使用者,那么就需要我们这个模块帮忙。

这个模块有个核心接口 RegistryService,如上图所示:

  • register:服务端使用,进行服务注册。
  • unregister:服务端使用,一般在 JVM 关闭钩子,ShutdownHook 中调用。
  • subscribe:客户端使用,注册监听事件,用来监听地址的变化。
  • unsubscribe:客户端使用,取消注册监听事件。
  • lookup:客户端使用,根据 key 查找服务地址列表。
  • close:都可以使用,用于关闭 Registry 资源。

如果需要添加自己定义的服务注册/发现,那么实现这个接口即可。截止目前在社区的不断开发推动下,已经有五种服务注册/发现,分别是 redis、zk、nacos、eruka 和 consul。下面简单介绍下 Nacos 的实现:

2.2.1 register 接口:

step1:校验地址是否合法

step2:获取 Nacos 的 Naming 实例,然后将地址注册到服务名为 serverAddr(固定服务名) 的对应集群分组(registry.conf 文件配置)上面。

unregister 接口类似,这里不做详解。

2.2.2 lookup 接口:

step1:获取当前 clusterName 名字。

step2:判断当前集群名对应的服务是否已经订阅过了,如果是直接从 map 中取订阅返回的数据。

step3:如果没有订阅先主动查询一次服务实例列表,然后添加订阅并将订阅返回的数据存放到 map 中,之后直接从 map 获取最新数据。

2.2.3 subscribe 接口

这个接口比较简单,具体分两步:

step1:对将要订阅的 cluster-> listener 存放到 map 中,此处 nacos 未提交单机已订阅列表,所以需要自己实现。

step2:使用 Nacos api 订阅。

2.3 Config

配置模块也是一个比较基础,比较简单的模块。我们需要配置一些常用的参数比如:Netty 的 select 线程数量,work 线程数量,session 允许最大为多少等等,当然这些参数再 Seata 中都有自己的默认设置。

同样的在 Seata 中也提供了一个接口 Configuration,用来自定义我们需要的获取配置的地方:

  • getInt/Long/Boolean/getConfig():通过 dataId 来获取对应的值,读取不到配置、异常或超时将返回参数中的默认值。
  • putConfig:用于添加配置。
  • removeConfig:删除一个配置。
  • add/remove/get ConfigListener:添加/删除/获取 配置监听器,一般用来监听配置的变更。

目前为止有四种方式获取 Config:File(文件获取)、Nacos、Apollo 和 ZK(不推荐)。在 Seata 中首先需要配置 registry.conf,来配置 config.type 。实现 conf 比较简单这里就不深入分析。

2.4 Store

存储层的实现对于 Seata 是否高性能,是否可靠非常关键。 如果存储层没有实现好,那么如果发生宕机,在 TC 中正在进行分布式事务处理的数据将会被丢失,既然使用了分布式事务,那么其肯定不能容忍丢失。如果存储层实现好了,但是其性能有很大问题,RM 可能会发生频繁回滚那么其完全无法应对高并发的场景。

在 Seata 中默认提供了文件方式的存储,下面我们定义我们存储的数据为 Session,而我们的 TM 创造的全局事务操作数据叫 GloabSession,RM 创造的分支事务操作数据叫 BranchSession,一个 GloabSession 可以拥有多个 BranchSession。我们的目的就是要将这么多 Session 存储下来。

在 FileTransactionStoreManager#writeSession 代码中:

上面的代码主要分为下面几步:

  • step1:生成一个 TransactionWriteFuture。
  • step2:将这个 futureRequest 丢进一个 LinkedBlockingQueue 中。为什么需要将所有数据都丢进队列中呢?当然这里其实也可以用锁来实现,再另外一个阿里开源的 RocketMQ 中,使用的锁。不论是队列还是锁它们的目的是为了保证单线程写,这又是为什么呢?有人会解释说,需要保证顺序写,这样速度就很快,这个理解是错误的,我们的 FileChannel 的写方法是线程安全的,已经能保证顺序写了。保证单线程写其实是为了让我们这个写逻辑都是单线程的,因为可能有些文件写满或者记录写数据位置等等逻辑,当然这些逻辑都可以主动加锁去做,但是为了实现简单方便,直接再整个写逻辑排队处理是最为合适的。
  • step3:调用 future.get,等待我们该条数据写逻辑完成通知。

我们将数据提交到队列之后,我们接下来需要对其进行消费,代码如下:

这里将一个 WriteDataFileRunnable()提交进我们的线程池,这个 Runnable 的 run()方法如下:

分为下面几步:

step1: 判断是否停止,如果 stopping 为 true 则返回 null。

step2:从我们的队列中获取数据。

step3:判断 future 是否已经超时了,如果超时,则设置结果为 false,此时我们生产者 get()方法会接触阻塞。

step4:将我们的数据写进文件,此时数据还在 pageCahce 层并没有刷新到磁盘,如果写成功然后根据条件判断是否进行刷盘操作。

step5:当写入数量到达一定的时候,或者写入时间到达一定的时候,需要将我们当前的文件保存为历史文件,删除以前的历史文件,然后创建新的文件。这一步是为了防止我们文件无限增长,大量无效数据浪费磁盘资源。

在我们的 writeDataFile 中有如下代码:

step1:首先获取我们的 ByteBuffer,如果超出最大循环 BufferSize 就直接创建一个新的,否则就使用我们缓存的 Buffer。这一步可以很大的减少 GC。

step2:然后将数据添加进入 ByteBuffer。

step3:最后将 ByteBuffer 写入我们的 fileChannel,这里会重试三次。此时的数据还在 pageCache 层,受两方面的影响,OS 有自己的刷新策略,但是这个业务程序不能控制,为了防止宕机等事件出现造成大量数据丢失,所以就需要业务自己控制 flush。下面是 flush 的代码:

这里 flush 的条件写入一定数量或者写的时间超过一定时间,这样也会有个小问题如果是停电,那么 pageCache 中有可能还有数据并没有被刷盘,会导致少量的数据丢失。目前还不支持同步模式,也就是每条数据都需要做刷盘操作,这样可以保证每条消息都落盘,但是性能也会受到极大的影响,当然后续会不断的演进支持。

我们的 store 核心流程主要是上面几个方法,当然还有一些比如,session 重建等,这些比较简单,读者可以自行阅读。

2.5 Lock

大家知道数据库实现隔离级别主要是通过锁来实现的,同样的再分布式事务框架 Seata 中要实现隔离级别也需要通过锁。一般在数据库中数据库的隔离级别一共有四种:读未提交,读已提交,可重复读,串行化。在 Seata 中可以保证隔离级别是读已提交,但是提供了达到读已提交隔离的手段。

Lock 模块也就是 Seata 实现隔离级别的核心模块。在 Lock 模块中提供了一个接口用于管理我们的锁:

其中有三个方法:

  • acquireLock:用于对我们的 BranchSession 加锁,这里虽然是传的分支事务 Session,实际上是对分支事务的资源加锁,成功返回 true。
  • isLockable:根据事务 ID,资源 Id,锁住的 Key 来查询是否已经加锁。
  • cleanAllLocks:清除所有的锁。 对于锁我们可以在本地实现,也可以通过 redis 或者 mysql 来帮助我们实现。官方默认提供了本地全局锁的实现:

在本地锁的实现中有两个常量需要关注:

  • BUCKET_PER_TABLE:用来定义每个 table 有多少个 bucket,目的是为了后续对同一个表加锁的时候减少竞争。
  • LOCK_MAP:这个 map 从定义上来看非常复杂,里里外外套了很多层 Map,这里用个表格具体说明一下:
层数keyvalue
1-LOCK_MAPresourceId(jdbcUrl)dbLockMap
2- dbLockMaptableName (表名)tableLockMap
3- tableLockMapPK.hashcode%Bucket (主键值的 hashcode%bucket)bucketLockMap
4- bucketLockMapPKtrascationId

可以看见实际上的加锁在 bucketLockMap 这个 map 中,这里具体的加锁方法比较简单就不作详细阐述,主要是逐步的找到 bucketLockMap,然后将当前 trascationId 塞进去,如果这个主键当前有 TranscationId,那么比较是否是自己,如果不是则加锁失败。

2.6 RPC

保证 Seata 高性能的关键之一也是使用了 Netty 作为 RPC 框架,采用默认配置的线程模型如下图所示:

如果采用默认的基本配置那么会有一个 Acceptor 线程用于处理客户端的链接,会有 cpu*2 数量的 NIO-Thread,再这个线程中不会做业务太重的事情,只会做一些速度比较快的事情,比如编解码,心跳事件,和 TM 注册。一些比较费时间的业务操作将会交给业务线程池,默认情况下业务线程池配置为最小线程为 100,最大为 500。

Seata 目前允许配置的传输层配置如图所示,用户可根据需要进行 Netty 传输层面的调优,配置通过配置中心配置,首次加载时生效。

这里需要提一下的是 Seata 的心跳机制,这里是使用 Netty 的 IdleStateHandler 完成的,如下:

在 Sever 端对于写没有设置最大空闲时间,对于读设置了最大空闲时间,默认为 15s(客户端默认写空闲为 5s,发送 ping 消息),如果超过 15s 则会将链接断开,关闭资源。

step1:判断是否是读空闲的检测事件。

step2:如果是则断开链接,关闭资源。
另外 Seata 做了内存池、客户端做了批量小包合并发送、Netty 连接池(减少连接创建时的服务不可用时间)等功能,以下为批量小包合并功能。

客户端的消息发送并不是真正的消息发送通过 AbstractRpcRemoting#sendAsyncRequest 包装成 RpcMessage 存储至 basket 中并唤醒合并发送线程。合并发送线程通过 while true 的形式 最长等待 1ms 对 basket 的消息取出包装成 merge 消息进行真正发送,此时若 channel 出现异常则会通过 fail-fast 快速失败返回结果。merge 消息发送前在 map 中标识,收到结果后批量确认(AbstractRpcRemotingClient#channelRead),并通过 dispatch 分发至 messageListener 和 handler 去处理。同时,timerExecutor 定时对已发送消息进行超时检测,若超时置为失败。具体消息协议设计将会在后续的文章中给出,敬请关注。
Seata 的 Netty Client 由 TMClient 和 RMClient 组成,根据事务角色功能区分,都继承 AbstractRpcRemotingClient,AbstractRpcRemotingClient 实现了 RemotingService(服务启停), RegisterMsgListener(netty 连接池连接创建回调)和 ClientMessageSender(消息发送)继承了 AbstractRpcRemoting( Client 和 Server 顶层消息发送和处理的模板)。
RMClient 类关系图如下图所示: TMClient 和 RMClient 又会根据自身的 poolConfig 配置与 NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> 进行 channel 连接的交互,channel 连接池根据角色 key+ip 作为连接池的 key 来定位各个连接池 ,连接池对 channel 进行统一的管理。TMClient 和 RMClient 在发送过程中对于每个 ip 只会使用一个长连接,但连接不可用时,会从连接池中快速取出已经创建好并可用的连接,减少服务的不可用时间。

2.7 HA-Cluster

目前官方没有公布 HA-Cluster,但是通过一些其它中间件和官方的一些透露,可以将 HA-Cluster 用如下方式设计:

具体的流程如下:

step1:客户端发布信息的时候根据 transcationId 保证同一个 transcation 是在同一个 master 上,通过多个 Master 水平扩展,提供并发处理性能。

step2:在 server 端中一个 master 有多个 slave,master 中的数据近实时同步到 slave 上,保证当 master 宕机的时候,还能有其它 slave 顶上来可以用。

当然上述一切都是猜测,具体的设计实现还得等 0.5 版本之后。目前有一个 Go 版本的 Seata-Server 也捐赠给了 Seata(还在流程中),其通过 raft 实现副本一致性,其它细节不是太清楚。

2.8 Metrics

这个模块也是一个没有具体公布实现的模块,当然有可能会提供插件口,让其它第三方 metric 接入进来,最近 Apache SkyWalking 正在和 Seata 小组商讨如何接入进来。

3.Coordinator Core

上面我们讲了很多 Server 基础模块,想必大家对 Seata 的实现已经有个大概,接下来我会讲解事务协调器具体逻辑是如何实现的,让大家更加了解 Seata 的实现内幕。

3.1 启动流程

启动方法在 Server 类有个 main 方法,定义了我们启动流程:

step1:创建一个 RpcServer,再这个里面包含了我们网络的操作,用 Netty 实现了服务端。

step2:解析端口号、本地文件地址(用户 Server 宕机未处理完成事务恢复)、IP(可选,本机只能获取内网 ip,在跨网络时需要一个对外的 vip 注册服务)。

step3:初始化 SessionHoler,其中最重要的重要就是重我们 dataDir 这个文件夹中恢复我们的数据,重建我们的 Session。

step4:创建一个 CoorDinator,这个也是我们事务协调器的逻辑核心代码,然后将其初始化,其内部初始化的逻辑会创建四个定时任务:

  • retryRollbacking:重试 rollback 定时任务,用于将那些失败的 rollback 进行重试的,每隔 5ms 执行一次。
  • retryCommitting:重试 commit 定时任务,用于将那些失败的 commit 进行重试的,每隔 5ms 执行一次。
  • asyncCommitting:异步 commit 定时任务,用于执行异步的 commit,每隔 10ms 一次。
  • timeoutCheck:超时定时任务检测,用于检测超时的任务,然后执行超时的逻辑,每隔 2ms 执行一次。

step5: 初始化 UUIDGenerator 这个也是我们生成各种 ID(transcationId,branchId)的基本类。

step6:将本地 IP 和监听端口设置到 XID 中,初始化 rpcServer 等待客户端的连接。

启动流程比较简单,下面我会介绍分布式事务框架中的常见的一些业务逻辑 Seata 是如何处理的。

3.2 Begin-开启全局事务

一次分布式事务的起始点一定是开启全局事务,首先我们看看全局事务 Seata 是如何实现的:

step1: 根据应用 ID,事务分组,名字,超时时间创建一个 GloabSession,这个在前面也提到过它和 branchSession 分别是什么。

step2:对其添加一个 RootSessionManager 用于监听一些事件,这里要说一下目前在 Seata 里面有四种类型的 Listener(这里要说明的是所有的 sessionManager 都实现了 SessionLifecycleListener):

  • ROOT_SESSION_MANAGER:最全,最大的,拥有所有的 Session。
  • ASYNC_COMMITTING_SESSION_MANAGER:用于管理需要做异步 commit 的 Session。
  • RETRY_COMMITTING_SESSION_MANAGER:用于管理重试 commit 的 Session。
  • RETRY_ROLLBACKING_SESSION_MANAGER:用于管理重试回滚的 Session。 由于这里是开启事务,其它 SessionManager 不需要关注,我们只添加 RootSessionManager 即可。

step3:开启 Globalsession

这一步会把状态变为 Begin,记录开始时间,并且调用 RootSessionManager 的 onBegin 监听方法,将 Session 保存到 map 并写入到我们的文件。

step4:最后返回 XID,这个 XID 是由 ip+port+transactionId 组成的,非常重要,当 TM 申请到之后需要将这个 ID 传到 RM 中,RM 通过 XID 来决定到底应该访问哪一台 Server。

3.3 BranchRegister-分支事务注册

当我们全局事务在 TM 开启之后,我们 RM 的分支事务也需要注册到我们的全局事务之上,这里看看是如何处理的:

step1:通过 transactionId 获取并校验全局事务是否是开启状态。

step2:创建一个新的分支事务,也就是我们的 BranchSession。

step3:对分支事务进行加全局锁,这里的逻辑就是使用的我们锁模块的逻辑。

step4:添加 branchSession,主要是将其添加到 globalSession 对象中,并写入到我们的文件中。

step5:返回 branchId,这个 ID 也很重要,我们后续需要用它来回滚我们的事务,或者对我们分支事务状态更新。

分支事务注册之后,还需要汇报分支事务的本地事务的执行到底是成功还是失败,在 Server 目前只是简单的做一下保存记录,汇报的目的是,就算这个分支事务失败,如果 TM 还是执意要提交全局事务(catch 异常不抛出),那么再遍历提交分支事务的时候,这个失败的分支事务就不需要提交(用户选择性跳过)。

3.4 GlobalCommit - 全局提交

当我们分支事务执行完成之后,就轮到我们的 TM-事务管理器来决定是提交还是回滚,如果是提交,那么就会走到下面的逻辑:

step1:首先找到我们的 globalSession。如果它为 null 证明已经被 commit 过了,那么直接幂等操作,返回成功。

step2:关闭我们的 GloabSession 防止再次有新的 branch 进来(跨服务调用超时回滚,provider 在继续执行)。

step3:如果 status 是等于 Begin,那么久证明还没有提交过,改变其状态为 Committing 也就是正在提交。

step4:判断是否是可以异步提交,目前只有 AT 模式可以异步提交,二阶段全局提交时只是删除 undolog 并无严格顺序,此处使用定时任务,客户端收到后批量合并删除。

step5:如果是异步提交,直接将其放进我们 ASYNC_COMMITTING_SESSION_MANAGER,让其再后台线程异步去做我们的 step6,如果是同步的那么直接执行我们的 step6。

step6:遍历我们的 BranchSession 进行提交,如果某个分支事务失败,根据不同的条件来判断是否进行重试,可异步执行此 branchSession 不成功可以继续执行下一个,因为其本身都在 manager 中,只要没有成功就不会被删除会一直重试,如果是同步提交的会放进重试队列进行定时重试并卡住按照顺序提交。

3.5 GlobalRollback - 全局回滚

如果我们的 TM 决定全局回滚,那么会走到下面的逻辑:

这个逻辑和提交流程基本一致,可以看作是它的反向,这里就不展开讲了。

4.总结

最后在总结一下开始我们提出了分布式事务的关键 4 点,Seata 到底是怎么解决的:

  • 正确的协调:通过后台定时任务各种正确的重试,并且未来会推出监控平台有可能可以手动回滚。
  • 高可用: 通过 HA-Cluster 保证高可用。
  • 高性能:文件顺序写,RPC 通过 netty 实现,Seata 未来可以水平扩展,提高处理性能。
  • 高扩展性:提供给用户可以自由实现的地方,比如配置,服务发现和注册,全局锁等等。

最后希望大家能从这篇文章能了解 Seata-Server 的核心设计原理,当然你也可以想象如果你自己去实现一个分布式事务的 Server 应该怎样去设计?

Seata GitHub 地址:https://github.com/apache/incubator-seata

本文作者:

李钊,GitHub ID @CoffeeLatte007,公众号「咖啡拿铁」作者,Seata 社区 Committer,猿辅导 Java 工程师,曾就职于美团。对分布式中间件,分布式系统有浓厚的兴趣。
季敏(清铭),GitHub ID @slievrly,Seata 开源项目负责人,阿里巴巴中间件 TXC/GTS 核心研发成员,长期从事于分布式中间件核心研发工作,在分布式事务领域有着较丰富的技术积累。

· 阅读需 17 分钟

Fescar 0.4.0 版本发布了 TCC 模式,由蚂蚁金服团队贡献,欢迎大家试用,文末也提供了项目后续的 Roadmap,欢迎关注。

前言:基于 TCC 模型的应用场景

 
1.png

TCC 分布式事务模型直接作用于服务层。不与具体的服务框架耦合,与底层 RPC 协议无关,与底层存储介质无关,可以灵活选择业务资源的锁定粒度,减少资源锁持有时间,可扩展性好,可以说是为独立部署的 SOA 服务而设计的。

一、TCC 模型优势

对于 TCC 分布式事务模型,笔者认为其在业务场景应用上,有两方面的意义。

1.1 跨服务的分布式事务

服务的拆分,也可以认为是资源的横向扩展,只不过方向不同而已。

横向扩展可能沿着两个方向发展:

  1. 功能扩展,根据功能对数据进行分组,并将不同的功能组分布在多个不同的数据库上,这实际上就是 SOA 架构下的服务化。
  2. 数据分片,在功能组内部将数据拆分到多个数据库上,为横向扩展增加一个新的维度。

下图简要阐释了横向数据扩展策略:

2.png

因此,TCC 的其中一个作用就是在按照功能横向扩展资源时,保证多资源访问的事务属性。

1.2 两阶段拆分

TCC 另一个作用就是把两阶段拆分成了两个独立的阶段,通过资源业务锁定的方式进行关联。资源业务锁定方式的好处在于,既不会阻塞其他事务在第一阶段对于相同资源的继续使用,也不会影响本事务第二阶段的正确执行。

传统模型的并发事务:
3.png

TCC 模型的并发事务:
4.png

这对业务有什么好处呢?拿支付宝的担保交易场景来说,简化情况下,只需要涉及两个服务,交易服务和账务服务。交易作为主业务服务,账务作为从业务服务,提供 Try、Commit、Cancel 接口:

  1. Try 接口扣除用户可用资金,转移到预冻结资金。预冻结资金就是业务锁定方案,每个事务第二阶段只能使用本事务的预冻结资金,在第一阶段执行结束后,其他并发事务也可以继续处理用户的可用资金。
  2. Commit 接口扣除预冻结资金,增加中间账户可用资金(担保交易不能立即把钱打给商户,需要有一个中间账户来暂存)。

假设只有一个中间账户的情况下,每次调用支付服务的 Commit 接口,都会锁定中间账户,中间账户存在热点性能问题。 但是,在担保交易场景中,七天以后才需要将资金从中间账户划拨给商户,中间账户并不需要对外展示。因此,在执行完支付服务的第一阶段后,就可以认为本次交易的支付环节已经完成,并向用户和商户返回支付成功的结果,并不需要马上执行支付服务二阶段的 Commit 接口,等到低锋期时,再慢慢消化,异步地执行。
5.png

这就是 TCC 分布式事务模型的二阶段异步化功能,从业务服务的第一阶段执行成功,主业务服务就可以提交完成,然后再由框架异步的执行各从业务服务的第二阶段。

二、通用型 TCC 解决方案

通用型 TCC 解决方案就是最典型的 TCC 分布式事务模型实现,所有从业务服务都需要参与到主业务服务的决策当中。
6.png
 

适用场景

由于从业务服务是同步调用,其结果会影响到主业务服务的决策,因此通用型 TCC 分布式事务解决方案适用于执行时间确定且较短的业务,比如互联网金融企业最核心的三个服务:交易、支付、账务:
7.png
 
当用户发起一笔交易时,首先访问交易服务,创建交易订单;然后交易服务调用支付服务为该交易创建支付订单,执行收款动作,最后支付服务调用账务服务记录账户流水和记账。

为了保证三个服务一起完成一笔交易,要么同时成功,要么同时失败,可以使用通用型 TCC 解决方案,将这三个服务放在一个分布式事务中,交易作为主业务服务,支付作为从业务服务,账务作为支付服务的嵌套从业务服务,由 TCC 模型保证事务的原子性。
8.png

支付服务的 Try 接口创建支付订单,开启嵌套分布式事务,并调用账务服务的 Try 接口;账务服务在 Try 接口中冻结买家资金。一阶段调用完成后,交易完成,提交本地事务,由 TCC 框架完成分布式事务各从业务服务二阶段的调用。

支付服务二阶段先调用账务服务的 Confirm 接口,扣除买家冻结资金;增加卖家可用资金。调用成功后,支付服务修改支付订单为完成状态,完成支付。

当支付和账务服务二阶段都调用完成后,整个分布式事务结束。

三、异步确保型 TCC 解决方案

异步确保型 TCC 解决方案的直接从业务服务是可靠消息服务,而真正的从业务服务则通过消息服务解耦,作为消息服务的消费端,异步地执行。
9.png
 
可靠消息服务需要提供 Try,Confirm,Cancel 三个接口。Try 接口预发送,只负责持久化存储消息数据;Confirm 接口确认发送,这时才开始真正的投递消息;Cancel 接口取消发送,删除消息数据。

消息服务的消息数据独立存储,独立伸缩,降低从业务服务与消息系统间的耦合,在消息服务可靠的前提下,实现分布式事务的最终一致性。

此解决方案虽然增加了消息服务的维护成本,但由于消息服务代替从业务服务实现了 TCC 接口,从业务服务不需要任何改造,接入成本非常低。

适用场景

由于从业务服务消费消息是一个异步的过程,执行时间不确定,可能会导致不一致时间窗口增加。因此,异步确保性 TCC 分布式事务解决方案只适用于对最终一致性时间敏感度较低的一些被动型业务(从业务服务的处理结果不影响主业务服务的决策,只被动的接收主业务服务的决策结果)。比如会员注册服务和邮件发送服务:
10.png
 
当用户注册会员成功,需要给用户发送一封邮件,告诉用户注册成功,并提示用户激活该会员。但要注意两点:

  1. 如果用户注册成功,一定要给用户发送一封邮件;
  2. 如果用户注册失败,一定不能给用户发送邮件。

因此,这同样需要会员服务和邮件服务保证原子性,要么都执行,要么都不执行。不一样的是,邮件服务只是一种被动型的业务,并不影响用户是否能够注册成功,它只需要在用户注册成功以后发送邮件给用户即可,邮件服务不需要参与到会员服务的活动决策中。

对于此种业务场景,可以使用异步确保型TCC分布式事务解决方案,如下:
11.png
 
 
由可靠消息服务来解耦会员和邮件服务,会员服务与消息服务组成 TCC 事务模型,保证事务原子性。然后通过消息服务的可靠特性,确保消息一定能够被邮件服务消费,从而使得会员与邮件服务在同一个分布式事务中。同时,邮件服务也不会影响会员服务的执行过程,只在会员服务执行成功后被动接收发送邮件的请求。

四、补偿型 TCC 解决方案

补偿型 TCC 解决方案与通用型 TCC 解决方案的结构相似,其从业务服务也需要参与到主业务服务的活动决策当中。但不一样的是,前者的从业务服务只需要提供 Do 和 Compensate 两个接口,而后者需要提供三个接口。
12.png
 
Do 接口直接执行真正的完整业务逻辑,完成业务处理,业务执行结果外部可见;Compensate 操作用于业务补偿,抵消或部分抵消正向业务操作的业务结果,Compensate操作需满足幂等性。
与通用型解决方案相比,补偿型解决方案的从业务服务不需要改造原有业务逻辑,只需要额外增加一个补偿回滚逻辑即可,业务改造量较小。但要注意的是,业务在一阶段就执行完整个业务逻辑,无法做到有效的事务隔离,当需要回滚时,可能存在补偿失败的情况,还需要额外的异常处理机制,比如人工介入。

适用场景

由于存在回滚补偿失败的情况,补偿型 TCC 分布式事务解决方案只适用于一些并发冲突较少或者需要与外部交互的业务,这些外部业务不属于被动型业务,其执行结果会影响主业务服务的决策,比如机票代理商的机票预订服务:
13.png
 
该机票服务提供多程机票预订服务,可以同时预订多趟行程航班机票,比如从北京到圣彼得堡,需要第一程从北京到莫斯科,以及第二程从莫斯科到圣彼得堡。

当用户预订机票时,肯定希望能同时预订这两趟航班的机票,只预订一趟航班对用户来说没有意义。因此,对于这样的业务服务同样提出了原子性要求,如果其中一趟航班的机票预订失败,另外一趟需要能够取消预订。

但是,由于航空公司相对于机票代理商来说属于外部业务,只提供订票接口和取消预订接口,想要推动航空公司改造是极其困难的。因此,对于此类业务服务,可以使用补偿型 TCC 分布式事务解决方案,如下:
14.png

网关服务在原有逻辑基础上增加 Compensate 接口,负责调用对应航空公司的取消预订接口。

在用户发起机票预订请求时,机票服务先通过网关 Do 接口,调用各航空公司的预订接口,如果所有航班都预订成功,则整个分布式事务直接执行成功;一旦某趟航班机票预订失败,则分布式事务回滚,由 TCC 事务框架调用各网关的 Compensate 补偿接口,其再调用对应航空公司的取消预订接口。通过这种方式,也可以保证多程机票预订服务的原子性。

五. 总结

对于现在的互联网应用来说,资源横向扩展提供了更多的灵活性,是一种比较容易实现的向外扩展方案,但是同时也明显增加了复杂度,引入一些新的挑战,比如资源之间的数据一致性问题。

横向数据扩展既可以按数据分片扩展,也可以按功能扩展。TCC 模型能在功能横向扩展资源的同时,保证多资源访问的事务属性。

TCC 模型除了跨服务的分布式事务这一层作用之外,还具有两阶段划分的功能,通过业务资源锁定,允许第二阶段的异步执行,而异步化思想正是解决热点数据并发性能问题的利器之一。
 

Roadmap

当前已经发布到 0.4.0,后续我们会发布 0.5 ~ 1.0 版本,继续对 AT、TCC 模式进行功能完善和和丰富,并解决服务端高可用问题,在 1.0 版本之后,本开源产品将达到生产环境使用的标准。


图片1.png

· 阅读需 9 分钟

Fescar 0.4.0 版本发布了 TCC 模式,由蚂蚁金服团队贡献,欢迎大家试用,
Sample 地址:https://github.com/fescar-group/fescar-samples/tree/master/tcc
文末也提供了项目后续的 Roadmap,欢迎关注。

一、TCC 简介

在两阶段提交协议(2PC,Two Phase Commitment Protocol)中,资源管理器(RM, resource manager)需要提供“准备”、“提交”和“回滚” 3 个操作;而事务管理器(TM, transaction manager)分 2 阶段协调所有资源管理器,在第一阶段询问所有资源管理器“准备”是否成功,如果所有资源均“准备”成功则在第二阶段执行所有资源的“提交”操作,否则在第二阶段执行所有资源的“回滚”操作,保证所有资源的最终状态是一致的,要么全部提交要么全部回滚。

资源管理器有很多实现方式,其中 TCC(Try-Confirm-Cancel)是资源管理器的一种服务化的实现;TCC 是一种比较成熟的分布式事务解决方案,可用于解决跨数据库、跨服务业务操作的数据一致性问题;TCC 其 Try、Confirm、Cancel 3 个方法均由业务编码实现,故 TCC 可以被称为是服务化的资源管理器。

TCC 的 Try 操作作为一阶段,负责资源的检查和预留;Confirm 操作作为二阶段提交操作,执行真正的业务;Cancel 是二阶段回滚操作,执行预留资源的取消,使资源回到初始状态。

如下图所示,用户实现 TCC 服务之后,该 TCC 服务将作为分布式事务的其中一个资源,参与到整个分布式事务中;事务管理器分 2 阶段协调 TCC 服务,在第一阶段调用所有 TCC 服务的 Try 方法,在第二阶段执行所有 TCC 服务的 Confirm 或者 Cancel 方法;最终所有 TCC 服务要么全部都是提交的,要么全部都是回滚的。

image.png

二、TCC 设计

用户在接入 TCC 时,大部分工作都集中在如何实现 TCC 服务上,经过蚂蚁金服多年的 TCC 应用,总结如下主要的TCC 设计和实现主要事项:

1、业务操作分两阶段完成

接入 TCC 前,业务操作只需要一步就能完成,但是在接入 TCC 之后,需要考虑如何将其分成 2 阶段完成,把资源的检查和预留放在一阶段的 Try 操作中进行,把真正的业务操作的执行放在二阶段的 Confirm 操作中进行。

以下举例说明业务模式如何分成两阶段进行设计,举例场景:“账户A的余额中有 100 元,需要扣除其中 30 元”;

在接入 TCC 之前,用户编写 SQL:“update 账户表 set 余额 = 余额 - 30 where 账户 = A”,便能一步完成扣款操作。

在接入 TCC 之后,就需要考虑如何将扣款操作分成 2 步完成:

  • Try 操作:资源的检查和预留;

在扣款场景,Try 操作要做的事情就是先检查 A 账户余额是否足够,再冻结要扣款的 30 元(预留资源);此阶段不会发生真正的扣款。

  • Confirm 操作:执行真正业务的提交;

在扣款场景下,Confirm 阶段走的事情就是发生真正的扣款,把A账户中已经冻结的 30 元钱扣掉。

  • Cancel 操作:预留资源的是否释放;

在扣款场景下,扣款取消,Cancel 操作执行的任务是释放 Try 操作冻结的 30 元钱,是 A 账户回到初始状态。

image.png

2、并发控制

用户在实现 TCC 时,应当考虑并发性问题,将锁的粒度降到最低,以最大限度的提高分布式事务的并发性。

以下还是以A账户扣款为例,“账户 A 上有 100 元,事务 T1 要扣除其中的 30 元,事务 T2 也要扣除 30 元,出现并发”。

在一阶段 Try 操作中,分布式事务 T1 和分布式事务 T2 分别冻结资金的那一部分资金,相互之间无干扰;这样在分布式事务的二阶段,无论 T1 是提交还是回滚,都不会对 T2 产生影响,这样 T1 和 T2 在同一笔业务数据上并行执行。

image.png

3、允许空回滚

如下图所示,事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因为丢包而导致的网络超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,而 Cancel 操作调用未出现超时。

TCC 服务在未收到 Try 请求的情况下收到 Cancel 请求,这种场景被称为空回滚;空回滚在生产环境经常出现,用户在实现TCC服务时,应允许允许空回滚的执行,即收到空回滚时返回成功。

image.png

4、防悬挂控制

如下图所示,事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因网络拥堵而导致的超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,Cancel 调用未超时;在此之后,拥堵在网络上的一阶段 Try 数据包被 TCC 服务收到,出现了二阶段 Cancel 请求比一阶段 Try 请求先执行的情况,此 TCC 服务在执行晚到的 Try 之后,将永远不会再收到二阶段的 Confirm 或者 Cancel ,造成 TCC 服务悬挂。

用户在实现  TCC 服务时,要允许空回滚,但是要拒绝执行空回滚之后 Try 请求,要避免出现悬挂。

image.png

5、幂等控制

无论是网络数据包重传,还是异常事务的补偿执行,都会导致 TCC 服务的 Try、Confirm 或者 Cancel 操作被重复执行;用户在实现 TCC 服务时,需要考虑幂等控制,即 Try、Confirm、Cancel 执行一次和执行多次的业务结果是一样的。
image.png

Roadmap

当前已经发布到 0.4.0 版本,后续我们会发布 0.5 ~ 1.0 版本,继续对 AT、TCC 模式进行功能完善和和丰富,并解决服务端高可用问题,在 1.0 版本之后,本开源产品将达到生产环境使用的标准。

图片1.png