Seata Saga 模式
概述
Saga 模式是 SEATA 提供的长事务解决方案,在 Saga 模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
理论基础:Hector & Kenneth 发表论⽂ Sagas (1987)
适用场景:
- 业务流程长、业务流程多
- 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
优势:
- 一阶段提交本地事务,无锁,高性能
- 事件驱动架构,参与者可异步执行,高吞吐
- 补偿服务易于实现
缺点:
- 不保证隔离性(应对方案见后面文档)
Saga 的实现:
基于状态机引擎的 Saga 实现:
目前 SEATA 提供的 Saga 模式是基于状态机引擎来实现的,机制是:
- 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件
- 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点
- 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚
注意: 异常发生时是否进行补偿也可由用户自定义决定
- 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能
示例状态图:
快速开始
Demo 简介
基于 dubbo 构建的微服务下,使用 Saga 模式演示分布式事务的提交和回滚;
业务流程图如下图 所示:
先下载 seata-samples 工程:https://github.com/apache/incubator-seata-samples.git
注意 SEATA 版本需要 0.9.0 以上
在 dubbo-saga-sample 中一个分布式事务内会有 2 个 Saga 事务参与者,分别是: InventoryAction 和 BalanceAction ;分布式事务提交则两者均提交,分布式事务回滚则两者均回滚;
这 2 个 Saga 参与者均是 dubbo 服务,两个参与都有一个 reduce 方法,表示库存扣减或余额扣减,还有一个 compensateReduce 方法,表示补偿扣减操作。
- InventoryAction 接口定义如下:
public interface InventoryAction {
/**
* reduce
* @param businessKey
* @param amount
* @param params
* @return
*/
boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);
/**
* compensateReduce
* @param businessKey
* @param params
* @return
*/
boolean compensateReduce(String businessKey, Map<String, Object> params);
}
- 这个场景用状态语言定义就是下面的 json:src/main/resources/statelang/reduce_inventory_and_balance.json
{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": ["$.[businessKey]", "$.[count]"],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Expression": "[reduceInventoryResult] == true",
"Next": "ReduceBalance"
}
],
"Default": "Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException": "$.[mockReduceBalanceFail]"
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": ["java.lang.Throwable"],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}
该 json 表示的状态图:
状态语言在一定程度上参考了AWS Step Functions