Seata Saga 模式
概述
Saga 模式是 SEATA 提供的长事务解决方案,在 Saga 模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
理论基础:Hector & Kenneth 发表论⽂ Sagas (1987)
适用场景:
- 业务流程长、业务流程多
- 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
优势:
- 一阶段提交本地事务,无锁, 高性能
- 事件驱动架构,参与者可异步执行,高吞吐
- 补偿服务易于实现
缺点:
- 不保证隔离性(应对方案见后面文档)
Saga 的实现:
基于状态机引擎的 Saga 实现:
目前 SEATA 提供的 Saga 模式是基于状态机引擎来实现的,机制是:
- 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件
- 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点
- 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚
注意: 异常发生时是否进行补偿也可由用户自定义决定
- 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能
示例状态图:
快速开始
Demo 简介
基于 dubbo 构建的微服务下,使用 Saga 模式演示分布式事务的提交和回滚;
业务流程图如下图所示:
先下载 seata-samples 工程:https://github.com/apache/incubator-seata-samples.git
注意 SEATA 版本需要 0.9.0 以上
在 dubbo-saga-sample 中一个分布式事务内会有 2 个 Saga 事务参与者,分别是: InventoryAction 和 BalanceAction ;分布式事务提交则两者均提交,分布式事务回滚则两者均回滚;
这 2 个 Saga 参与者均是 dubbo 服务,两个参与都有一个 reduce 方法,表示库存扣减或余额扣减,还有一个 compensateReduce 方法,表示补偿扣减操作。
- InventoryAction 接口定义如下:
public interface InventoryAction {
/**
* reduce
* @param businessKey
* @param amount
* @param params
* @return
*/
boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);
/**
* compensateReduce
* @param businessKey
* @param params
* @return
*/
boolean compensateReduce(String businessKey, Map<String, Object> params);
}
- 这个场景用状态语言定义就是下面的 json:src/main/resources/statelang/reduce_inventory_and_balance.json
{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": ["$.[businessKey]", "$.[count]"],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Expression": "[reduceInventoryResult] == true",
"Next": "ReduceBalance"
}
],
"Default": "Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException": "$.[mockReduceBalanceFail]"
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": ["java.lang.Throwable"],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}
该 json 表示的状态图:
状态语言在一定程度上参考了AWS Step Functions
"状态机" 属性简介:
- Name: 表示状态机的名称,必须唯一
- Comment: 状态机的描述
- Version: 状态机定义版本
- StartState: 启动时运行的第一个"状态"
- States: 状态列表,是一个 map 结构,key 是"状态"的名称,在状态机内必须唯一
- IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新
- IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新
"状态" 属性简介:
- Type: "状态" 的类型,比如有:
- ServiceTask: 执行调用服务任务
- Choice: 单条件选择路由
- CompensationTrigger: 触发补偿流程
- Succeed: 状态机正常结束
- Fail: 状态机异常结束
- SubStateMachine: 调用子状态机
- CompensateSubMachine: 用于补偿一个子状态机
- ServiceName: 服务名称,通常是服务的 beanId
- ServiceMethod: 服务方法名称
- CompensateState: 该"状态"的补偿"状态"
- Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行
- Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的SpringEL, 如果是常量直接写值即可
- Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个 map 结构,key 为放入到状态机上文时的 key(状态机上下文也是一个 map),value 中$.是表示 SpringEL 表达式,表示从服务的返回参数中取值,#root 表示服务的整个返回参数
- Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个 map 结构,key 是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是 SpringEL 表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value 是当这个条件表达式成立时则将服务执行状态映射成这个值
- Catch: 捕获到异常后的路由
- Next: 服务执行完成后下一个执行的"状态"
- Choices: Choice 类型的"状态"里, 可选的分支列表, 分支中的 Expression 为 SpringEL 表达式, Next 为当表达式成立时执行的下一个"状态"
- ErrorCode: Fail 类型"状态"的错误码
- Message: Fail 类型"状态"的错误信息
更多详细的状态语言解释请看State language referance章节
更多详细的状态语言使用示例见https://github.com/apache/incubator-seata/tree/develop/test/src/test/java/io/seata/saga/engine
Demo 运行指南
step 1 启动 SEATA Server
运行 SeataServerStarter ,启动 Seata Server;
step 2 启动 dubbo provider Demo
运行 DubboSagaProviderStarter ,启动 dubbo provider;
step 3 启动 Saga Demo
运行 DubboSagaTransactionStarter , 启动 demo 工程;
Demo 中的数据库使用的是 H2 内存数据库, 生产上建议使用与业务相同的库, 目前支持 Oracle, Mysql, DB2. 建表语句在 https://github.com/apache/incubator-seata/tree/develop/saga/seata-saga-engine-store/src/main/resources/sql
Demo 中还有调用本地服务和调用 SOFA RPC 服务的示例
状态机设计器
在线访问Seata Saga 提供了一个可视化的状态机设计器方便用户使用,代码和运行指南请参考: https://github.com/apache/incubator-seata/tree/refactor_designer/saga/seata-saga-statemachine-designer
状态机设计器截图: