Seata Saga 模式
概述
Saga 模式是 SEATA 提供的长事务解决方案,在 Saga 模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
理论基础:Hector & Kenneth 发表论⽂ Sagas (1987)
适用场景:
- 业务流程长、业务流程多
- 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
优势:
- 一阶段提交本地事务,无锁,高性能
- 事件驱动架构,参与者可异步执行,高吞吐
- 补偿服务易于实现
缺点:
- 不保证隔离性(应对方案见后面文档)
Saga 的实现:
基于状态机引擎的 Saga 实现:
目前 SEATA 提供的 Saga 模式是基于状态机引擎来实现的,机制是:
- 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件
- 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点
- 状态图 json 由状态机引擎驱动执行,当出现异常时  状态引擎反向执行已成功节点对应的补偿节点将事务回滚
注意: 异常发生时是否进行补偿也可由用户自定义决定 
- 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能
示例状态图:

快速开始
Demo 简介
基于 dubbo 构建的微服务下,使用 Saga 模式演示分布式事务的提交和回滚;
业务流程图如下图所示:

先下载 seata-samples 工程:https://github.com/apache/incubator-seata-samples.git
注意 SEATA 版本需要 0.9.0 以上
在 dubbo-saga-sample 中一个分布式事务内会有 2 个 Saga 事务参与者,分别是: InventoryAction 和 BalanceAction ;分布式事务提交则两者均提交,分布式事务回滚则两者均回滚;
这 2 个 Saga 参与者均是 dubbo 服务,两个参与都有一个 reduce 方法,表示库存扣减或余额扣减,还有一个 compensateReduce 方法,表示补偿扣减操作。
- InventoryAction 接口定义如下:
public interface InventoryAction {
    /**
     * reduce
     * @param businessKey
     * @param amount
     * @param params
     * @return
     */
    boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);
    /**
     * compensateReduce
     * @param businessKey
     * @param params
     * @return
     */
    boolean compensateReduce(String businessKey, Map<String, Object> params);
}
- 这个场景用状态语言定义就是下面的 json:src/main/resources/statelang/reduce_inventory_and_balance.json
{
  "Name": "reduceInventoryAndBalance",
  "Comment": "reduce inventory then reduce balance in a transaction",
  "StartState": "ReduceInventory",
  "Version": "0.0.1",
  "States": {
    "ReduceInventory": {
      "Type": "ServiceTask",
      "ServiceName": "inventoryAction",
      "ServiceMethod": "reduce",
      "CompensateState": "CompensateReduceInventory",
      "Next": "ChoiceState",
      "Input": ["$.[businessKey]", "$.[count]"],
      "Output": {
        "reduceInventoryResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      }
    },
    "ChoiceState": {
      "Type": "Choice",
      "Choices": [
        {
          "Expression": "[reduceInventoryResult] == true",
          "Next": "ReduceBalance"
        }
      ],
      "Default": "Fail"
    },
    "ReduceBalance": {
      "Type": "ServiceTask",
      "ServiceName": "balanceAction",
      "ServiceMethod": "reduce",
      "CompensateState": "CompensateReduceBalance",
      "Input": [
        "$.[businessKey]",
        "$.[amount]",
        {
          "throwException": "$.[mockReduceBalanceFail]"
        }
      ],
      "Output": {
        "compensateReduceBalanceResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      },
      "Catch": [
        {
          "Exceptions": ["java.lang.Throwable"],
          "Next": "CompensationTrigger"
        }
      ],
      "Next": "Succeed"
    },
    "CompensateReduceInventory": {
      "Type": "ServiceTask",
      "ServiceName": "inventoryAction",
      "ServiceMethod": "compensateReduce",
      "Input": ["$.[businessKey]"]
    },
    "CompensateReduceBalance": {
      "Type": "ServiceTask",
      "ServiceName": "balanceAction",
      "ServiceMethod": "compensateReduce",
      "Input": ["$.[businessKey]"]
    },
    "CompensationTrigger": {
      "Type": "CompensationTrigger",
      "Next": "Fail"
    },
    "Succeed": {
      "Type": "Succeed"
    },
    "Fail": {
      "Type": "Fail",
      "ErrorCode": "PURCHASE_FAILED",
      "Message": "purchase failed"
    }
  }
}
该 json 表示的状态图:

状态语言在一定程度上参考了AWS Step Functions
"状态机" 属性简介:
- Name: 表示状态机的名称,必须唯一
- Comment: 状态机的描述
- Version: 状态机定义版本
- StartState: 启动时运行的第一个"状态"
- States: 状态列表,是一个 map 结构,key 是"状态"的名称,在状态机内必须唯一
- IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新
- IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新
"状态" 属性简介:
- Type: "状态" 的类型,比如有:
- ServiceTask: 执行调用服务任务
- Choice: 单条件选择路由
- CompensationTrigger: 触发补偿流程
- Succeed: 状态机正常结束
- Fail: 状态机异常结束
- SubStateMachine: 调用子状态机
- CompensateSubMachine: 用于补偿一个子状态机
 
- ServiceName: 服务名称,通常是服务的 beanId
- ServiceMethod: 服务方法名称
- CompensateState: 该"状态"的补偿"状态"
- Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行
- Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的SpringEL, 如果是常量直接写值即可
- Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个 map 结构,key 为放入到状态机上文时的 key(状态机上下文也是一个 map),value 中$.是表示 SpringEL 表达式,表示从服务的返回参数中取值,#root 表示服务的整个返回参数
- Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个 map 结构,key 是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是 SpringEL 表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value 是当这个条件表达式成立时则将服务执行状态映射成这个值
- Catch: 捕获到异常后的路由
- Next: 服务执行完成后下一个执行的"状态"
- Choices: Choice 类型的"状态"里, 可选的分支列表, 分支中的 Expression 为 SpringEL 表达式, Next 为当表达式成立时执行的下一个"状态"
- ErrorCode: Fail 类型"状态"的错误码
- Message: Fail 类型"状态"的错误信息
更多详细的状态语言解释请看State language reference章节
更多详细的状态语言使用示例见https://github.com/apache/incubator-seata/tree/develop/test/src/test/java/io/seata/saga/engine
Demo 运行指南
step 1 启动 SEATA Server
运行 SeataServerStarter ,启动 Seata Server;
step 2 启动 dubbo provider Demo
运行 DubboSagaProviderStarter ,启动 dubbo provider;
step 3 启动 Saga Demo
运行 DubboSagaTransactionStarter , 启动 demo 工程;
Demo 中的数据库使用的是 H2 内存数据库, 生产上建议使用与业务相同的库, 目前支持 Oracle, Mysql, DB2. 建表语句在 https://github.com/apache/incubator-seata/tree/develop/saga/seata-saga-engine-store/src/main/resources/sql
Demo 中还有调用本地服务和调用 SOFA RPC 服务的示例
状态机设计器
在线访问Seata Saga 提供了一个可视化的状态机设计器方便用户使用,代码和运行指南请参考: https://github.com/apache/incubator-seata/tree/refactor_designer/saga/seata-saga-statemachine-designer
状态机设计器截图:

最佳实践
Saga 服务设计的实践经验
允许空补偿
- 空补偿:原服务未执行,补偿服务执行了
- 出现原因:
- 原服务 超时(丢包)
- Saga 事务触发 回滚
- 未收到 原服务请求,先收到 补偿请求
 
所以服务设计时需要允许空补偿, 即没有找到要补偿的业务主键时返回补偿成功并将原业务主键记录下来
防悬挂控制
- 悬挂:补偿服务 比 原服务 先执行
- 出现原因:
- 原服务 超时(拥堵)
- Saga 事务回滚,触发 回滚
- 拥堵的 原服务 到达
 
所以要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝服务的执行
幂等控制
- 原服务与补偿服务都需要保证幂等性, 由于网络可能超时, 可以设置重试策略,重试发生时要通过幂等控制避免业务数据重复更新
缺乏隔离性的应对
- 由于 Saga 事务不保证隔离性, 在极端情况下可能由于脏写无法完成回滚操作, 比如举一个极端的例子, 分布式事务内先给用户 A 充值, 然后给用户 B 扣减余额, 如果在给 A 用户充值成功, 在事务提交以前, A 用户把余额消费掉了, 如果事务发生回滚, 这时则没有办法进行补偿了。这就是缺乏隔离性造成的典型的问题, 实践中一般的应对方法是:
- 业务流程设计时遵循“宁可长款, 不可短款”的原则, 长款意思是客户少了钱机构多了钱, 以机构信誉可以给客户退款, 反之则是短款, 少的钱可能追不回来了。所以在业务流程设计上一定是先扣款。
- 有些业务场景可以允许让业务最终成功, 在回滚不了的情况下可以继续重试完成后面的流程, 所以状态机引擎除了提供“回滚”能力还需要提供“向前”恢复上下文继续执行的能力, 让业务最终执行成功, 达到最终一致性的目的。
 
性能优化
- 配置客户端参数client.rm.report.success.enable=false,可以在当分支事务执行成功时不上报分支状态到 server,从而提升性能。当上一个分支事务的状态还没有上报的时候,下一个分支事务已注册,可以认为上一个实际已成功 
API referance
StateMachineEngine API
public interface StateMachineEngine {
    /**
     * start a state machine instance
     * @param stateMachineName
     * @param tenantId
     * @param startParams
     * @return
     * @throws EngineExecutionException
     */
    StateMachineInstance start(String stateMachineName, String tenantId, Map<String, Object> startParams) throws EngineExecutionException;
    /**
     * start a state machine instance with businessKey
     * @param stateMachineName
     * @param tenantId
     * @param businessKey
     * @param startParams
     * @return
     * @throws EngineExecutionException
     */
    StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) throws EngineExecutionException;
    /**
     * start a state machine instance asynchronously
     * @param stateMachineName
     * @param tenantId
     * @param startParams
     * @param callback
     * @return
     * @throws EngineExecutionException
     */
    StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;
    /**
     * start a state machine instance asynchronously with businessKey
     * @param stateMachineName
     * @param tenantId
     * @param businessKey
     * @param startParams
     * @param callback
     * @return
     * @throws EngineExecutionException
     */
    StateMachineInstance startWithBusinessKeyAsync(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;
    /**
     * forward restart a failed state machine instance
     * @param stateMachineInstId
     * @param replaceParams
     * @return
     * @throws ForwardInvalidException
     */
    StateMachineInstance forward(String stateMachineInstId, Map<String, Object> replaceParams) throws ForwardInvalidException;
    /**
     * forward restart a failed state machine instance asynchronously
     * @param stateMachineInstId
     * @param replaceParams
     * @param callback
     * @return
     * @throws ForwardInvalidException
     */
    StateMachineInstance forwardAsync(String stateMachineInstId, Map<String, Object> replaceParams, AsyncCallback callback) throws ForwardInvalidException;
    /**
     * compensate a state machine instance
     * @param stateMachineInstId
     * @param replaceParams
     * @return
     * @throws EngineExecutionException
     */
    StateMachineInstance compensate(String stateMachineInstId, Map<String, Object> replaceParams) throws EngineExecutionException;
    /**
     * compensate a state machine instance asynchronously
     * @param stateMachineInstId
     * @param replaceParams
     * @param callback
     * @return
     * @throws EngineExecutionException
     */
    StateMachineInstance compensateAsync(String stateMachineInstId, Map<String, Object> replaceParams, AsyncCallback callback) throws EngineExecutionException;
    /**
     * skip current failed state instance and forward restart state machine instance
     * @param stateMachineInstId
     * @return
     * @throws EngineExecutionException
     */
    StateMachineInstance skipAndForward(String stateMachineInstId) throws EngineExecutionException;
    /**
     * skip current failed state instance and forward restart state machine instance asynchronously
     * @param stateMachineInstId
     * @param callback
     * @return
     * @throws EngineExecutionException
     */
    StateMachineInstance skipAndForwardAsync(String stateMachineInstId, AsyncCallback callback) throws EngineExecutionException;
    /**
     * get state machine configurations
     * @return
     */
    StateMachineConfig getStateMachineConfig();
}
StateMachine Execution Instance API:
StateLogRepository stateLogRepository = stateMachineEngine.getStateMachineConfig().getStateLogRepository();
StateMachineInstance stateMachineInstance = stateLogRepository.getStateMachineInstanceByBusinessKey(businessKey, tenantId);
/**
 * State Log Repository
 *
 * @author lorne.cl
 */
public interface StateLogRepository {
    /**
     * Get state machine instance
     *
     * @param stateMachineInstanceId
     * @return
     */
    StateMachineInstance getStateMachineInstance(String stateMachineInstanceId);
    /**
     * Get state machine instance by businessKey
     *
     * @param businessKey
     * @param tenantId
     * @return
     */
    StateMachineInstance getStateMachineInstanceByBusinessKey(String businessKey, String tenantId);
    /**
     * Query the list of state machine instances by parent id
     *
     * @param parentId
     * @return
     */
    List<StateMachineInstance> queryStateMachineInstanceByParentId(String parentId);
    /**
     * Get state instance
     *
     * @param stateInstanceId
     * @param machineInstId
     * @return
     */
    StateInstance getStateInstance(String stateInstanceId, String machineInstId);
    /**
     * Get a list of state instances by state machine instance id
     *
     * @param stateMachineInstanceId
     * @return
     */
    List<StateInstance> queryStateInstanceListByMachineInstanceId(String stateMachineInstanceId);
}
StateMachine Definition API:
StateMachineRepository stateMachineRepository = stateMachineEngine.getStateMachineConfig().getStateMachineRepository();
StateMachine stateMachine = stateMachineRepository.getStateMachine(stateMachineName, tenantId);
/**
 * StateMachineRepository
 *
 * @author lorne.cl
 */
public interface StateMachineRepository {
    /**
     * Gets get state machine by id.
     *
     * @param stateMachineId the state machine id
     * @return the get state machine by id
     */
    StateMachine getStateMachineById(String stateMachineId);
    /**
     * Gets get state machine.
     *
     * @param stateMachineName the state machine name
     * @param tenantId         the tenant id
     * @return the get state machine
     */
    StateMachine getStateMachine(String stateMachineName, String tenantId);
    /**
     * Gets get state machine.
     *
     * @param stateMachineName the state machine name
     * @param tenantId         the tenant id
     * @param version          the version
     * @return the get state machine
     */
    StateMachine getStateMachine(String stateMachineName, String tenantId, String version);
    /**
     * Register the state machine to the repository (if the same version already exists, return the existing version)
     *
     * @param stateMachine
     */
    StateMachine registryStateMachine(StateMachine stateMachine);
    /**
     * registry by resources
     *
     * @param resources
     * @param tenantId
     */
    void registryByResources(Resource[] resources, String tenantId) throws IOException;
}
Config referance
在 Spring Bean 配置文件中配置一个 StateMachineEngine
<bean id="dataSource" class="...">
...
<bean>
<bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
        <property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
</bean>
<bean id="dbStateMachineConfig" class="io.seata.saga.engine.config.DbStateMachineConfig">
    <property name="dataSource" ref="dataSource" />
    <property name="resources" value="statelang/*.json" />
    <property name="enableAsync" value="true" />
    <!-- 事件驱动执行时使用的线程池, 如果所有状态机都同步执行且不存在循环任务可以不需要 -->
    <property name="threadPoolExecutor" ref="threadExecutor" />
    <property name="applicationId" value="saga_sample" />
    <property name="txServiceGroup" value="my_test_tx_group" />
    <property name="sagaBranchRegisterEnable" value="false" />
    <property name="sagaJsonParser" value="fastjson" />
    <property name="sagaRetryPersistModeUpdate" value="false" />
    <property name="sagaCompensatePersistModeUpdate" value="false" />
</bean>
<bean id="threadExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
    <property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" />
    <property name="corePoolSize" value="1" />
    <property name="maxPoolSize" value="20" />
</bean>
<!-- Seata Server进行事务恢复时需要通过这个Holder拿到stateMachineEngine实例 -->
<bean class="io.seata.saga.rm.StateMachineEngineHolder">
    <property name="stateMachineEngine" ref="stateMachineEngine"/>
</bean>