跳到主要内容

· 阅读需 26 分钟

前言

在分布式系统中,分布式事务是一个必须要解决的问题,目前使用较多的是最终一致性方案。自年初阿里开源了Fescar(四月初更名为Seata)后,该项目受到了极大的关注度,目前已接近8000Star。Seata以高性能和零侵入的方式为目标解决微服务领域的分布式事务难题,目前正处于快速迭代中,近期小目标是生产可用的Mysql版本。关于Seata的总体介绍,可以查看官方WIKI获得更多更全面的内容介绍。

本文主要基于spring cloud+spring jpa+spring cloud alibaba fescar+mysql+seata的结构,搭建一个分布式系统的demo,通过seata的debug日志和源代码,从client端(RM、TM)的角度分析说明其工作流程及原理。

文中代码基于fescar-0.4.1,由于项目刚更名为seata不久,例如一些包名、类名、jar包名称还都是fescar的命名,故下文中仍使用fescar进行表述。

示例项目:https://github.com/fescar-group/fescar-samples/tree/master/springcloud-jpa-seata

相关概念

  • XID:全局事务的唯一标识,由ip:port:sequence组成
  • Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚
  • Transaction Manager (TM ):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议
  • Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚

分布式框架支持

Fescar使用XID表示一个分布式事务,XID需要在一次分布式事务请求所涉的系统中进行传递,从而向feacar-server发送分支事务的处理情况,以及接收feacar-server的commit、rollback指令。 Fescar官方已支持全版本的dubbo协议,而对于spring cloud(spring-boot)的分布式项目社区也提供了相应的实现

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-fescar</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>

该组件实现了基于RestTemplate、Feign通信时的XID传递功能。

业务逻辑

业务逻辑是经典的下订单、扣余额、减库存流程。 根据模块划分为三个独立的服务,且分别连接对应的数据库

  • 订单:order-server
  • 账户:account-server
  • 库存:storage-server

另外还有发起分布式事务的业务系统

  • 业务:business-server

项目结构如下图 在这里插入图片描述

正常业务

  1. business发起购买请求
  2. storage扣减库存
  3. order创建订单
  4. account扣减余额

异常业务

  1. business发起购买请求
  2. storage扣减库存
  3. order创建订单
  4. account扣减余额异常

正常流程下2、3、4步的数据正常更新全局commit,异常流程下的数据则由于第4步的异常报错全局回滚。

配置文件

fescar的配置入口文件是registry.conf,查看代码ConfigurationFactory得知目前还不能指定该配置文件,所以配置文件名称只能为registry.conf

private static final String REGISTRY_CONF = "registry.conf";
public static final Configuration FILE_INSTANCE = new FileConfiguration(REGISTRY_CONF);

registry中可以指定具体配置的形式,默认使用file类型,在file.conf中有3部分配置内容

  1. transport transport部分的配置对应NettyServerConfig类,用于定义Netty相关的参数,TM、RM与fescar-server之间使用Netty进行通信
  2. service
	 service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#配置Client连接TC的地址
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
是否启用seata的分布式事务
disableGlobalTransaction = false
}
  1. client
	client {
#RM接收TC的commit通知后缓冲上限
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}

数据源Proxy

除了前面的配置文件,fescar在AT模式下稍微有点代码量的地方就是对数据源的代理指定,且目前只能基于DruidDataSource的代理。 注:在最新发布的0.4.2版本中已支持任意数据源类型

@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}

使用DataSourceProxy的目的是为了引入ConnectionProxy,fescar无侵入的一方面就体现在ConnectionProxy的实现上,即分支事务加入全局事务的切入点是在本地事务的commit阶段,这样设计可以保证业务数据与undo_log是在一个本地事务中。

undo_log是需要在业务库上创建的一个表,fescar依赖该表记录每笔分支事务的状态及二阶段rollback的回放数据。不用担心该表的数据量过大形成单点问题,在全局事务commit的场景下事务对应的undo_log会异步删除。

CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

启动Server

前往https://github.com/apache/incubator-seata/releases 下载与Client版本对应的fescar-server,避免由于版本的不同导致的协议不一致问题 进入解压之后的 bin 目录,执行

./fescar-server.sh 8091 ../data

启动成功输出

2019-04-09 20:27:24.637 INFO [main]c.a.fescar.core.rpc.netty.AbstractRpcRemotingServer.start:152 -Server started ... 

启动Client

fescar的加载入口类位于GlobalTransactionAutoConfiguration,对基于spring boot的项目能够自动加载,当然也可以通过其他方式示例化GlobalTransactionScanner

@Configuration
@EnableConfigurationProperties({FescarProperties.class})
public class GlobalTransactionAutoConfiguration {
private final ApplicationContext applicationContext;
private final FescarProperties fescarProperties;

public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext, FescarProperties fescarProperties) {
this.applicationContext = applicationContext;
this.fescarProperties = fescarProperties;
}

/**
* 示例化GlobalTransactionScanner
* scanner为client初始化的发起类
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name");
String txServiceGroup = this.fescarProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-fescar-service-group";
this.fescarProperties.setTxServiceGroup(txServiceGroup);
}

return new GlobalTransactionScanner(applicationName, txServiceGroup);
}
}

可以看到支持一个配置项FescarProperties,用于配置事务分组名称

spring.cloud.alibaba.fescar.tx-service-group=my_test_tx_group

如果不指定服务组,则默认使用spring.application.name+ -fescar-service-group生成名称,所以不指定spring.application.name启动会报错

@ConfigurationProperties("spring.cloud.alibaba.fescar")
public class FescarProperties {
private String txServiceGroup;

public FescarProperties() {
}

public String getTxServiceGroup() {
return this.txServiceGroup;
}

public void setTxServiceGroup(String txServiceGroup) {
this.txServiceGroup = txServiceGroup;
}
}

获取applicationId和txServiceGroup后,创建GlobalTransactionScanner对象,主要看类中initClient方法

private void initClient() {
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(
"applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
}
//init TM
TMClient.init(applicationId, txServiceGroup);

//init RM
RMClient.init(applicationId, txServiceGroup);

}

方法中可以看到初始化了TMClientRMClient,对于一个服务既可以是TM角色也可以是RM角色,至于什么时候是TM或者RM则要看在一次全局事务中@GlobalTransactional注解标注在哪。 Client创建的结果是与TC的一个Netty连接,所以在启动日志中可以看到两个Netty Channel,其中标明了transactionRole分别为TMROLERMROLE

2019-04-09 13:42:57.417  INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory   : NettyPool create channel to {"address":"127.0.0.1:8091","message":{"applicationId":"business-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"transactionServiceGroup":"my_test_tx_group","typeCode":101,"version":"0.4.1"},"transactionRole":"TMROLE"}
2019-04-09 13:42:57.505 INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory : NettyPool create channel to {"address":"127.0.0.1:8091","message":{"applicationId":"business-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"transactionServiceGroup":"my_test_tx_group","typeCode":103,"version":"0.4.1"},"transactionRole":"RMROLE"}
2019-04-09 13:42:57.629 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:RegisterTMRequest{applicationId='business-service', transactionServiceGroup='my_test_tx_group'}
2019-04-09 13:42:57.629 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:RegisterRMRequest{resourceIds='null', applicationId='business-service', transactionServiceGroup='my_test_tx_group'}
2019-04-09 13:42:57.699 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:1
2019-04-09 13:42:57.699 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:2
2019-04-09 13:42:57.701 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.RmRpcClient@3b06d101 msgId:1, future :com.alibaba.fescar.core.protocol.MessageFuture@28bb1abd, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 13:42:57.701 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.TmRpcClient@65fc3fb7 msgId:2, future :com.alibaba.fescar.core.protocol.MessageFuture@9a1e3df, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 13:42:57.710 INFO 93715 --- [imeoutChecker_1] c.a.fescar.core.rpc.netty.RmRpcClient : register RM success. server version:0.4.1,channel:[id: 0xe6468995, L:/127.0.0.1:57397 - R:/127.0.0.1:8091]
2019-04-09 13:42:57.710 INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory : register success, cost 114 ms, version:0.4.1,role:TMROLE,channel:[id: 0xd22fe0c5, L:/127.0.0.1:57398 - R:/127.0.0.1:8091]
2019-04-09 13:42:57.711 INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory : register success, cost 125 ms, version:0.4.1,role:RMROLE,channel:[id: 0xe6468995, L:/127.0.0.1:57397 - R:/127.0.0.1:8091]

日志中可以看到

  1. 创建Netty连接
  2. 发送注册请求
  3. 得到响应结果
  4. RmRpcClientTmRpcClient成功实例化

TM处理流程

在本例中,TM的角色是business-service,BusinessService的purchase方法标注了@GlobalTransactional注解

@Service
public class BusinessService {

@Autowired
private StorageFeignClient storageFeignClient;
@Autowired
private OrderFeignClient orderFeignClient;

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount){
storageFeignClient.deduct(commodityCode, orderCount);

orderFeignClient.create(userId, commodityCode, orderCount);
}
}

方法调用后将会创建一个全局事务,首先关注@GlobalTransactional注解的作用,在GlobalTransactionalInterceptor中被拦截处理

/**
* AOP拦截方法调用
*/
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

//获取方法GlobalTransactional注解
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);

//如果方法有GlobalTransactional注解,则拦截到相应方法处理
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}

handleGlobalTransaction方法中对TransactionalTemplate的execute进行了调用,从类名可以看到这是一个标准的模版方法,它定义了TM对全局事务处理的标准步骤,注释已经比较清楚了

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

try {
// 2. begin transaction
try {
triggerBeforeBegin();
tx.begin(business.timeout(), business.name());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. any business exception, rollback.
try {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. everything is fine, commit.
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}

通过DefaultGlobalTransaction的begin方法开启全局事务

public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
check();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid != null) {
throw new IllegalStateException();
}
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
//具体开启事务的方法,获取TC返回的XID
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Begin a NEW global transaction [" + xid + "]");
}
}

方法开头处if (role != GlobalTransactionRole.Launcher)对role的判断有关键的作用,表明当前是全局事务的发起者(Launcher)还是参与者(Participant)。如果在分布式事务的下游系统方法中也加上@GlobalTransactional注解,那么它的角色就是Participant,会忽略后面的begin直接return,而判断是Launcher还是Participant是根据当前上下文是否已存在XID来判断,没有XID的就是Launcher,已经存在XID的就是Participant. 由此可见,全局事务的创建只能由Launcher执行,而一次分布式事务中也只有一个Launcher存在。

DefaultTransactionManager负责TM与TC通讯,发送begin、commit、rollback指令

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
return response.getXid();
}

至此拿到fescar-server返回的XID表示一个全局事务创建成功,日志中也反应了上述流程

2019-04-09 13:46:57.417 DEBUG 31326 --- [nio-8084-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : offer message: timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int)
2019-04-09 13:46:57.417 DEBUG 31326 --- [geSend_TMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : write message:FescarMergeMessage timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int), channel:[id: 0xa148545e, L:/127.0.0.1:56120 - R:/127.0.0.1:8091],active?true,writable?true,isopen?true
2019-04-09 13:46:57.418 DEBUG 31326 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:FescarMergeMessage timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int)
2019-04-09 13:46:57.421 DEBUG 31326 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:MergeResultMessage com.alibaba.fescar.core.protocol.transaction.GlobalBeginResponse@2dc480dc,messageId:1196
2019-04-09 13:46:57.421 DEBUG 31326 --- [nio-8084-exec-1] c.a.fescar.core.context.RootContext : bind 192.168.224.93:8091:2008502699
2019-04-09 13:46:57.421 DEBUG 31326 --- [nio-8084-exec-1] c.a.f.tm.api.DefaultGlobalTransaction : Begin a NEW global transaction [192.168.224.93:8091:2008502699]

全局事务创建后,就开始执行business.execute(),即业务代码storageFeignClient.deduct(commodityCode, orderCount)进入RM处理流程,此处的业务逻辑为调用storage-service的扣减库存接口。

RM处理流程

@GetMapping(path = "/deduct")
public Boolean deduct(String commodityCode, Integer count){
storageService.deduct(commodityCode,count);
return true;
}

@Transactional
public void deduct(String commodityCode, int count){
Storage storage = storageDAO.findByCommodityCode(commodityCode);
storage.setCount(storage.getCount()-count);

storageDAO.save(storage);
}

storage的接口和service方法并未出现fescar相关的代码和注解,体现了fescar的无侵入。那它是如何加入到这次全局事务中的呢?答案在ConnectionProxy中,这也是前面说为什么必须要使用DataSourceProxy的原因,通过DataSourceProxy才能在业务代码的本地事务提交时,fescar通过该切入点,向TC注册分支事务并发送RM的处理结果。

由于业务代码本身的事务提交被ConnectionProxy代理实现,所以在提交本地事务时,实际执行的是ConnectionProxy的commit方法

public void commit() throws SQLException {
//如果当前是全局事务,则执行全局事务的提交
//判断是不是全局事务,就是看当前上下文是否存在XID
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}

private void processGlobalTransactionCommit() throws SQLException {
try {
//首先是向TC注册RM,拿到TC分配的branchId
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}

try {
if (context.hasUndoLog()) {
//写入undolog
UndoLogManager.flushUndoLogs(this);
}

//提交本地事务,写入undo_log和业务数据在同一个本地事务中
targetConnection.commit();
} catch (Throwable ex) {
//向TC发送RM的事务处理失败的通知
report(false);
if (ex instanceof SQLException) {
throw new SQLException(ex);
}
}
//向TC发送RM的事务处理成功的通知
report(true);
context.reset();
}

private void register() throws TransactionException {
//注册RM,构建request通过netty向TC发送注册指令
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
//将返回的branchId存在上下文中
context.setBranchId(branchId);
}

通过日志印证一下上面的流程

2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor   : xid in RootContext null xid in RpcContext 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] c.a.fescar.core.context.RootContext : bind 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor : bind 192.168.0.2:8091:2008546211 to RootContext
2019-04-09 21:57:48.386 INFO 38933 --- [nio-8081-exec-1] o.h.h.i.QueryTranslatorFactoryInitiator : HHH000397: Using ASTQueryTranslatorFactory
Hibernate: select storage0_.id as id1_0_, storage0_.commodity_code as commodit2_0_, storage0_.count as count3_0_ from storage_tbl storage0_ where storage0_.commodity_code=?
Hibernate: update storage_tbl set count=? where id=?
2019-04-09 21:57:48.673 INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient : will connect to 192.168.0.2:8091
2019-04-09 21:57:48.673 INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient : RM will register :jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false
2019-04-09 21:57:48.673 INFO 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.NettyPoolableFactory : NettyPool create channel to {"address":"192.168.0.2:8091","message":{"applicationId":"storage-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"resourceIds":"jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false","transactionServiceGroup":"hello-service-fescar-service-group","typeCode":103,"version":"0.4.0"},"transactionRole":"RMROLE"}
2019-04-09 21:57:48.677 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false', applicationId='storage-service', transactionServiceGroup='hello-service-fescar-service-group'}
2019-04-09 21:57:48.680 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:9
2019-04-09 21:57:48.680 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.RmRpcClient@7d61f5d4 msgId:9, future :com.alibaba.fescar.core.protocol.MessageFuture@186cd3e0, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 21:57:48.680 INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient : register RM success. server version:0.4.1,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:48.680 INFO 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.NettyPoolableFactory : register success, cost 3 ms, version:0.4.1,role:RMROLE,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:48.680 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting : offer message: transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1
2019-04-09 21:57:48.681 DEBUG 38933 --- [geSend_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : write message:FescarMergeMessage transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1, channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091],active?true,writable?true,isopen?true
2019-04-09 21:57:48.681 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:FescarMergeMessage transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1
2019-04-09 21:57:48.687 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:MergeResultMessage BranchRegisterResponse: transactionId=2008546211,branchId=2008546212,result code =Success,getMsg =null,messageId:11
2019-04-09 21:57:48.702 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.rm.datasource.undo.UndoLogManager : Flushing UNDO LOG: {"branchId":2008546212,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"id","type":4,"value":1},{"keyType":"NULL","name":"count","type":4,"value":993}]}],"tableName":"storage_tbl"},"beforeImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"id","type":4,"value":1},{"keyType":"NULL","name":"count","type":4,"value":994}]}],"tableName":"storage_tbl"},"sqlType":"UPDATE","tableName":"storage_tbl"}],"xid":"192.168.0.2:8091:2008546211"}
2019-04-09 21:57:48.755 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting : offer message: transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null
2019-04-09 21:57:48.755 DEBUG 38933 --- [geSend_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : write message:FescarMergeMessage transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null, channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091],active?true,writable?true,isopen?true
2019-04-09 21:57:48.756 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:FescarMergeMessage transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null
2019-04-09 21:57:48.758 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:MergeResultMessage com.alibaba.fescar.core.protocol.transaction.BranchReportResponse@582a08cf,messageId:13
2019-04-09 21:57:48.799 DEBUG 38933 --- [nio-8081-exec-1] c.a.fescar.core.context.RootContext : unbind 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.799 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor : unbind 192.168.0.2:8091:2008546211 from RootContext
  1. 获取business-service传来的XID
  2. 绑定XID到当前上下文中
  3. 执行业务逻辑sql
  4. 向TC创建本次RM的Netty连接
  5. 向TC发送分支事务的相关信息
  6. 获得TC返回的branchId
  7. 记录Undo Log数据
  8. 向TC发送本次事务PhaseOne阶段的处理结果
  9. 从当前上下文中解绑XID

其中第1步和第9步,是在FescarHandlerInterceptor中完成的,该类并不属于fescar,是前面提到的spring-cloud-alibaba-fescar,它实现了基于feign、rest通信时将xid bind和unbind到当前请求上下文中。到这里RM完成了PhaseOne阶段的工作,接着看PhaseTwo阶段的处理逻辑。

事务提交

各分支事务执行完成后,TC对各RM的汇报结果进行汇总,给各RM发送commit或rollback的指令

2019-04-09 21:57:49.813 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null,messageId:1
2019-04-09 21:57:49.813 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.RmRpcClient@7d61f5d4 msgId:1, body:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null
2019-04-09 21:57:49.814 INFO 38933 --- [atch_RMROLE_1_8] c.a.f.core.rpc.netty.RmMessageListener : onMessage:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null
2019-04-09 21:57:49.816 INFO 38933 --- [atch_RMROLE_1_8] com.alibaba.fescar.rm.AbstractRMHandler : Branch committing: 192.168.0.2:8091:2008546211 2008546212 jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false null
2019-04-09 21:57:49.816 INFO 38933 --- [atch_RMROLE_1_8] com.alibaba.fescar.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
2019-04-09 21:57:49.817 INFO 38933 --- [atch_RMROLE_1_8] c.a.fescar.core.rpc.netty.RmRpcClient : RmRpcClient sendResponse branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null
2019-04-09 21:57:49.817 DEBUG 38933 --- [atch_RMROLE_1_8] c.a.f.c.rpc.netty.AbstractRpcRemoting : send response:branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:49.817 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null

从日志中可以看到

  1. RM收到XID=192.168.0.2:8091:2008546211,branchId=2008546212的commit通知
  2. 执行commit动作
  3. 将commit结果发送给TC,branchStatus为PhaseTwo_Committed

具体看下二阶段commit的执行过程,在AbstractRMHandler类的doBranchCommit方法

/**
* 拿到通知的xid、branchId等关键参数
* 然后调用RM的branchCommit
*/
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info("Branch commit result: " + status);
}

最终会将branchCommit的请求调用到AsyncWorker的branchCommit方法。AsyncWorker的处理方式是fescar架构的一个关键部分,因为大部分事务都是会正常提交的,所以在PhaseOne阶段就已经结束了,这样就可以将锁最快的释放。PhaseTwo阶段接收commit的指令后,异步处理即可。将PhaseTwo的时间消耗排除在一次分布式事务之外。

private static final List<Phase2Context> ASYNC_COMMIT_BUFFER = Collections.synchronizedList( new ArrayList<Phase2Context>());

/**
* 将需要提交的XID加入list
*/
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) {
ASYNC_COMMIT_BUFFER.add(new Phase2Context(branchType, xid, branchId, resourceId, applicationData));
} else {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
}
return BranchStatus.PhaseTwo_Committed;
}

/**
* 通过定时任务消费list中的XID
*/
public synchronized void init() {
LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
timerExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("AsyncWorker", 1, true));
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
doBranchCommits();
} catch (Throwable e) {
LOGGER.info("Failed at async committing ... " + e.getMessage());
}
}
}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}

private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.size() == 0) {
return;
}
Map<String, List<Phase2Context>> mappedContexts = new HashMap<>();
Iterator<Phase2Context> iterator = ASYNC_COMMIT_BUFFER.iterator();

//一次定时循环取出ASYNC_COMMIT_BUFFER中的所有待办数据
//以resourceId作为key分组待commit数据,resourceId是一个数据库的连接url
//在前面的日志中可以看到,目的是为了覆盖应用的多数据源创建
while (iterator.hasNext()) {
Phase2Context commitContext = iterator.next();
List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
if (contextsGroupedByResourceId == null) {
contextsGroupedByResourceId = new ArrayList<>();
mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
}
contextsGroupedByResourceId.add(commitContext);

iterator.remove();

}

for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
Connection conn = null;
try {
try {
//根据resourceId获取数据源以及连接
DataSourceProxy dataSourceProxy = DataSourceManager.get().get(entry.getKey());
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
continue;
}
List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
for (Phase2Context commitContext : contextsGroupedByResourceId) {
try {
//执行undolog的处理,即删除xid、branchId对应的记录
UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn);
} catch (Exception ex) {
LOGGER.warn(
"Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex);
}
}

} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}
}
}

所以对于commit动作的处理,RM只需删除xid、branchId对应的undo_log即可。

事务回滚

对于rollback场景的触发有两种情况

  1. 分支事务处理异常,即ConnectionProxyreport(false)的情况
  2. TM捕获到下游系统上抛的异常,即发起全局事务标有@GlobalTransactional注解的方法捕获到的异常。在前面TransactionalTemplate类的execute模版方法中,对business.execute()的调用进行了catch,catch后会调用rollback,由TM通知TC对应XID需要回滚事务
 public void rollback() throws TransactionException {
//只有Launcher能发起这个rollback
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid == null) {
throw new IllegalStateException();
}

status = transactionManager.rollback(xid);
if (RootContext.getXID() != null) {
if (xid.equals(RootContext.getXID())) {
RootContext.unbind();
}
}
}

TC汇总后向参与者发送rollback指令,RM在AbstractRMHandler类的doBranchRollback方法中接收这个rollback的通知

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info("Branch rolling back: " + xid + " " + branchId + " " + resourceId);
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info("Branch rollback result: " + status);
}

然后将rollback请求传递到DataSourceManager类的branchRollback方法

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
//根据resourceId获取对应的数据源
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}

最终会执行UndoLogManager类的undo方法,因为是纯jdbc操作代码比较长就不贴出来了,可以通过连接到github查看源码,说一下undo的具体流程

  1. 根据xid和branchId查找PhaseOne阶段提交的undo_log
  2. 如果找到了就根据undo_log中记录的数据生成回放sql并执行,即还原PhaseOne阶段修改的数据
  3. 第2步处理完后,删除该条undo_log数据
  4. 如果第1步没有找到对应的undo_log,就插入一条状态为GlobalFinished的undo_log. 出现没找到的原因可能是PhaseOne阶段的本地事务异常了,导致没有正常写入。 因为xid和branchId是唯一索引,所以第4步的插入,可以防止PhaseOne阶段恢复后的成功写入,那么PhaseOne阶段就会异常,这样一来业务数据也就不会提交成功,数据达到了最终回滚了的效果

总结

本地结合分布式业务场景,分析了fescar client侧的主要处理流程,对TM和RM角色的主要源码进行了解析,希望能对大家理解fescar的工作原理有所帮助。

随着fescar的快速迭代以及后期的Roadmap规划,假以时日相信fescar能够成为开源分布式事务的标杆解决方案。

· 阅读需 30 分钟

再前不久,我写了一篇关于分布式事务中间件 Fescar 的解析,没过几天 Fescar 团队对其进行了品牌升级,取名为 Seata(Simpe Extensible Autonomous Transaction Architecture),而以前的 Fescar 的英文全称为 Fast & EaSy Commit And Rollback。可以看见 Fescar 从名字上来看更加局限于 Commit 和 Rollback,而新的品牌名字 Seata 旨在打造一套一站式分布式事务解决方案。更换名字之后,我对其未来的发展更有信心。

这里先大概回忆一下 Seata 的整个过程模型:

  • TM:事务的发起者。用来告诉 TC,全局事务的开始,提交,回滚。
  • RM:具体的事务资源,每一个 RM 都会作为一个分支事务注册在 TC。
  • TC:事务的协调者。也可以看做是 Fescar-server,用于接收我们的事务的注册,提交和回滚。

在之前的文章中对整个角色有个大体的介绍,在这篇文章中我将重点介绍其中的核心角色 TC,也就是事务协调器。

2.Transcation Coordinator

为什么之前一直强调 TC 是核心呢?那因为 TC 这个角色就好像上帝一样,管控着云云众生的 RM 和 TM。如果 TC 一旦不好使,那么 RM 和 TM 一旦出现小问题,那必定会乱的一塌糊涂。所以要想了解 Seata,那么必须要了解它的 TC。

那么一个优秀的事务协调者应该具备哪些能力呢?我觉得应该有以下几个:

  • 正确的协调:能正确的协调 RM 和 TM 接下来应该做什么,做错了应该怎么办,做对了应该怎么办。
  • 高可用: 事务协调器在分布式事务中很重要,如果不能保证高可用,那么它也没有存在的必要了。
  • 高性能:事务协调器的性能一定要高,如果事务协调器性能有瓶颈那么它所管理的 RM 和 TM 那么会经常遇到超时,从而引起回滚频繁。
  • 高扩展性:这个特点是属于代码层面的,如果是一个优秀的框架,那么需要给使用方很多自定义扩展,比如服务注册/发现,读取配置等等。

下面我也将逐步阐述 Seata 是如何做到上面四点。

2.1 Seata-Server 的设计

Seata-Server 整体的模块图如上所示:

  • Coordinator Core: 在最下面的模块是事务协调器核心代码,主要用来处理事务协调的逻辑,如是否 commit,rollback 等协调活动。
  • Store:存储模块,用来将我们的数据持久化,防止重启或者宕机数据丢失。
  • Discovery: 服务注册/发现模块,用于将 Server 地址暴露给我们 Client。
  • Config: 用来存储和查找我们服务端的配置。
  • Lock: 锁模块,用于给 Seata 提供全局锁的功能。
  • RPC:用于和其它端通信。
  • HA-Cluster:高可用集群,目前还没开源,为 Seata 提供可靠的高可用服务,预计将会在 0.6 版本开源。

2.2 Discovery

首先来讲讲比较基础的 Discovery 模块,又称服务注册/发现模块。我们将 Seata-Sever 启动之后,需要将自己的地址暴露给其它使用者,那么就需要我们这个模块帮忙。

这个模块有个核心接口 RegistryService,如上图所示:

  • register:服务端使用,进行服务注册。
  • unregister:服务端使用,一般在 JVM 关闭钩子,ShutdownHook 中调用。
  • subscribe:客户端使用,注册监听事件,用来监听地址的变化。
  • unsubscribe:客户端使用,取消注册监听事件。
  • lookup:客户端使用,根据 key 查找服务地址列表。
  • close:都可以使用,用于关闭 Registry 资源。

如果需要添加自己定义的服务注册/发现,那么实现这个接口即可。截止目前在社区的不断开发推动下,已经有五种服务注册/发现,分别是 redis、zk、nacos、eruka 和 consul。下面简单介绍下 Nacos 的实现:

2.2.1 register 接口:

step1:校验地址是否合法

step2:获取 Nacos 的 Naming 实例,然后将地址注册到服务名为 serverAddr(固定服务名) 的对应集群分组(registry.conf 文件配置)上面。

unregister 接口类似,这里不做详解。

2.2.2 lookup 接口:

step1:获取当前 clusterName 名字。

step2:判断当前集群名对应的服务是否已经订阅过了,如果是直接从 map 中取订阅返回的数据。

step3:如果没有订阅先主动查询一次服务实例列表,然后添加订阅并将订阅返回的数据存放到 map 中,之后直接从 map 获取最新数据。

2.2.3 subscribe 接口

这个接口比较简单,具体分两步:

step1:对将要订阅的 cluster-> listener 存放到 map 中,此处 nacos 未提交单机已订阅列表,所以需要自己实现。

step2:使用 Nacos api 订阅。

2.3 Config

配置模块也是一个比较基础,比较简单的模块。我们需要配置一些常用的参数比如:Netty 的 select 线程数量,work 线程数量,session 允许最大为多少等等,当然这些参数再 Seata 中都有自己的默认设置。

同样的在 Seata 中也提供了一个接口 Configuration,用来自定义我们需要的获取配置的地方:

  • getInt/Long/Boolean/getConfig():通过 dataId 来获取对应的值,读取不到配置、异常或超时将返回参数中的默认值。
  • putConfig:用于添加配置。
  • removeConfig:删除一个配置。
  • add/remove/get ConfigListener:添加/删除/获取 配置监听器,一般用来监听配置的变更。

目前为止有四种方式获取 Config:File(文件获取)、Nacos、Apollo 和 ZK(不推荐)。在 Seata 中首先需要配置 registry.conf,来配置 config.type 。实现 conf 比较简单这里就不深入分析。

2.4 Store

存储层的实现对于 Seata 是否高性能,是否可靠非常关键。 如果存储层没有实现好,那么如果发生宕机,在 TC 中正在进行分布式事务处理的数据将会被丢失,既然使用了分布式事务,那么其肯定不能容忍丢失。如果存储层实现好了,但是其性能有很大问题,RM 可能会发生频繁回滚那么其完全无法应对高并发的场景。

在 Seata 中默认提供了文件方式的存储,下面我们定义我们存储的数据为 Session,而我们的 TM 创造的全局事务操作数据叫 GloabSession,RM 创造的分支事务操作数据叫 BranchSession,一个 GloabSession 可以拥有多个 BranchSession。我们的目的就是要将这么多 Session 存储下来。

在 FileTransactionStoreManager#writeSession 代码中:

上面的代码主要分为下面几步:

  • step1:生成一个 TransactionWriteFuture。
  • step2:将这个 futureRequest 丢进一个 LinkedBlockingQueue 中。为什么需要将所有数据都丢进队列中呢?当然这里其实也可以用锁来实现,再另外一个阿里开源的 RocketMQ 中,使用的锁。不论是队列还是锁它们的目的是为了保证单线程写,这又是为什么呢?有人会解释说,需要保证顺序写,这样速度就很快,这个理解是错误的,我们的 FileChannel 的写方法是线程安全的,已经能保证顺序写了。保证单线程写其实是为了让我们这个写逻辑都是单线程的,因为可能有些文件写满或者记录写数据位置等等逻辑,当然这些逻辑都可以主动加锁去做,但是为了实现简单方便,直接再整个写逻辑排队处理是最为合适的。
  • step3:调用 future.get,等待我们该条数据写逻辑完成通知。

我们将数据提交到队列之后,我们接下来需要对其进行消费,代码如下:

这里将一个 WriteDataFileRunnable()提交进我们的线程池,这个 Runnable 的 run()方法如下:

分为下面几步:

step1: 判断是否停止,如果 stopping 为 true 则返回 null。

step2:从我们的队列中获取数据。

step3:判断 future 是否已经超时了,如果超时,则设置结果为 false,此时我们生产者 get()方法会接触阻塞。

step4:将我们的数据写进文件,此时数据还在 pageCahce 层并没有刷新到磁盘,如果写成功然后根据条件判断是否进行刷盘操作。

step5:当写入数量到达一定的时候,或者写入时间到达一定的时候,需要将我们当前的文件保存为历史文件,删除以前的历史文件,然后创建新的文件。这一步是为了防止我们文件无限增长,大量无效数据浪费磁盘资源。

在我们的 writeDataFile 中有如下代码:

step1:首先获取我们的 ByteBuffer,如果超出最大循环 BufferSize 就直接创建一个新的,否则就使用我们缓存的 Buffer。这一步可以很大的减少 GC。

step2:然后将数据添加进入 ByteBuffer。

step3:最后将 ByteBuffer 写入我们的 fileChannel,这里会重试三次。此时的数据还在 pageCache 层,受两方面的影响,OS 有自己的刷新策略,但是这个业务程序不能控制,为了防止宕机等事件出现造成大量数据丢失,所以就需要业务自己控制 flush。下面是 flush 的代码:

这里 flush 的条件写入一定数量或者写的时间超过一定时间,这样也会有个小问题如果是停电,那么 pageCache 中有可能还有数据并没有被刷盘,会导致少量的数据丢失。目前还不支持同步模式,也就是每条数据都需要做刷盘操作,这样可以保证每条消息都落盘,但是性能也会受到极大的影响,当然后续会不断的演进支持。

我们的 store 核心流程主要是上面几个方法,当然还有一些比如,session 重建等,这些比较简单,读者可以自行阅读。

2.5 Lock

大家知道数据库实现隔离级别主要是通过锁来实现的,同样的再分布式事务框架 Seata 中要实现隔离级别也需要通过锁。一般在数据库中数据库的隔离级别一共有四种:读未提交,读已提交,可重复读,串行化。在 Seata 中可以保证隔离级别是读已提交,但是提供了达到读已提交隔离的手段。

Lock 模块也就是 Seata 实现隔离级别的核心模块。在 Lock 模块中提供了一个接口用于管理我们的锁:

其中有三个方法:

  • acquireLock:用于对我们的 BranchSession 加锁,这里虽然是传的分支事务 Session,实际上是对分支事务的资源加锁,成功返回 true。
  • isLockable:根据事务 ID,资源 Id,锁住的 Key 来查询是否已经加锁。
  • cleanAllLocks:清除所有的锁。 对于锁我们可以在本地实现,也可以通过 redis 或者 mysql 来帮助我们实现。官方默认提供了本地全局锁的实现:

在本地锁的实现中有两个常量需要关注:

  • BUCKET_PER_TABLE:用来定义每个 table 有多少个 bucket,目的是为了后续对同一个表加锁的时候减少竞争。
  • LOCK_MAP:这个 map 从定义上来看非常复杂,里里外外套了很多层 Map,这里用个表格具体说明一下:
层数keyvalue
1-LOCK_MAPresourceId(jdbcUrl)dbLockMap
2- dbLockMaptableName (表名)tableLockMap
3- tableLockMapPK.hashcode%Bucket (主键值的 hashcode%bucket)bucketLockMap
4- bucketLockMapPKtrascationId

可以看见实际上的加锁在 bucketLockMap 这个 map 中,这里具体的加锁方法比较简单就不作详细阐述,主要是逐步的找到 bucketLockMap,然后将当前 trascationId 塞进去,如果这个主键当前有 TranscationId,那么比较是否是自己,如果不是则加锁失败。

2.6 RPC

保证 Seata 高性能的关键之一也是使用了 Netty 作为 RPC 框架,采用默认配置的线程模型如下图所示:

如果采用默认的基本配置那么会有一个 Acceptor 线程用于处理客户端的链接,会有 cpu*2 数量的 NIO-Thread,再这个线程中不会做业务太重的事情,只会做一些速度比较快的事情,比如编解码,心跳事件,和 TM 注册。一些比较费时间的业务操作将会交给业务线程池,默认情况下业务线程池配置为最小线程为 100,最大为 500。

Seata 目前允许配置的传输层配置如图所示,用户可根据需要进行 Netty 传输层面的调优,配置通过配置中心配置,首次加载时生效。

这里需要提一下的是 Seata 的心跳机制,这里是使用 Netty 的 IdleStateHandler 完成的,如下:

在 Sever 端对于写没有设置最大空闲时间,对于读设置了最大空闲时间,默认为 15s(客户端默认写空闲为 5s,发送 ping 消息),如果超过 15s 则会将链接断开,关闭资源。

step1:判断是否是读空闲的检测事件。

step2:如果是则断开链接,关闭资源。
另外 Seata 做了内存池、客户端做了批量小包合并发送、Netty 连接池(减少连接创建时的服务不可用时间)等功能,以下为批量小包合并功能。

客户端的消息发送并不是真正的消息发送通过 AbstractRpcRemoting#sendAsyncRequest 包装成 RpcMessage 存储至 basket 中并唤醒合并发送线程。合并发送线程通过 while true 的形式 最长等待 1ms 对 basket 的消息取出包装成 merge 消息进行真正发送,此时若 channel 出现异常则会通过 fail-fast 快速失败返回结果。merge 消息发送前在 map 中标识,收到结果后批量确认(AbstractRpcRemotingClient#channelRead),并通过 dispatch 分发至 messageListener 和 handler 去处理。同时,timerExecutor 定时对已发送消息进行超时检测,若超时置为失败。具体消息协议设计将会在后续的文章中给出,敬请关注。
Seata 的 Netty Client 由 TMClient 和 RMClient 组成,根据事务角色功能区分,都继承 AbstractRpcRemotingClient,AbstractRpcRemotingClient 实现了 RemotingService(服务启停), RegisterMsgListener(netty 连接池连接创建回调)和 ClientMessageSender(消息发送)继承了 AbstractRpcRemoting( Client 和 Server 顶层消息发送和处理的模板)。
RMClient 类关系图如下图所示: TMClient 和 RMClient 又会根据自身的 poolConfig 配置与 NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> 进行 channel 连接的交互,channel 连接池根据角色 key+ip 作为连接池的 key 来定位各个连接池 ,连接池对 channel 进行统一的管理。TMClient 和 RMClient 在发送过程中对于每个 ip 只会使用一个长连接,但连接不可用时,会从连接池中快速取出已经创建好并可用的连接,减少服务的不可用时间。

2.7 HA-Cluster

目前官方没有公布 HA-Cluster,但是通过一些其它中间件和官方的一些透露,可以将 HA-Cluster 用如下方式设计:

具体的流程如下:

step1:客户端发布信息的时候根据 transcationId 保证同一个 transcation 是在同一个 master 上,通过多个 Master 水平扩展,提供并发处理性能。

step2:在 server 端中一个 master 有多个 slave,master 中的数据近实时同步到 slave 上,保证当 master 宕机的时候,还能有其它 slave 顶上来可以用。

当然上述一切都是猜测,具体的设计实现还得等 0.5 版本之后。目前有一个 Go 版本的 Seata-Server 也捐赠给了 Seata(还在流程中),其通过 raft 实现副本一致性,其它细节不是太清楚。

2.8 Metrics

这个模块也是一个没有具体公布实现的模块,当然有可能会提供插件口,让其它第三方 metric 接入进来,最近 Apache SkyWalking 正在和 Seata 小组商讨如何接入进来。

3.Coordinator Core

上面我们讲了很多 Server 基础模块,想必大家对 Seata 的实现已经有个大概,接下来我会讲解事务协调器具体逻辑是如何实现的,让大家更加了解 Seata 的实现内幕。

3.1 启动流程

启动方法在 Server 类有个 main 方法,定义了我们启动流程:

step1:创建一个 RpcServer,再这个里面包含了我们网络的操作,用 Netty 实现了服务端。

step2:解析端口号、本地文件地址(用户 Server 宕机未处理完成事务恢复)、IP(可选,本机只能获取内网 ip,在跨网络时需要一个对外的 vip 注册服务)。

step3:初始化 SessionHoler,其中最重要的重要就是重我们 dataDir 这个文件夹中恢复我们的数据,重建我们的 Session。

step4:创建一个 CoorDinator,这个也是我们事务协调器的逻辑核心代码,然后将其初始化,其内部初始化的逻辑会创建四个定时任务:

  • retryRollbacking:重试 rollback 定时任务,用于将那些失败的 rollback 进行重试的,每隔 5ms 执行一次。
  • retryCommitting:重试 commit 定时任务,用于将那些失败的 commit 进行重试的,每隔 5ms 执行一次。
  • asyncCommitting:异步 commit 定时任务,用于执行异步的 commit,每隔 10ms 一次。
  • timeoutCheck:超时定时任务检测,用于检测超时的任务,然后执行超时的逻辑,每隔 2ms 执行一次。

step5: 初始化 UUIDGenerator 这个也是我们生成各种 ID(transcationId,branchId)的基本类。

step6:将本地 IP 和监听端口设置到 XID 中,初始化 rpcServer 等待客户端的连接。

启动流程比较简单,下面我会介绍分布式事务框架中的常见的一些业务逻辑 Seata 是如何处理的。

3.2 Begin-开启全局事务

一次分布式事务的起始点一定是开启全局事务,首先我们看看全局事务 Seata 是如何实现的:

step1: 根据应用 ID,事务分组,名字,超时时间创建一个 GloabSession,这个在前面也提到过它和 branchSession 分别是什么。

step2:对其添加一个 RootSessionManager 用于监听一些事件,这里要说一下目前在 Seata 里面有四种类型的 Listener(这里要说明的是所有的 sessionManager 都实现了 SessionLifecycleListener):

  • ROOT_SESSION_MANAGER:最全,最大的,拥有所有的 Session。
  • ASYNC_COMMITTING_SESSION_MANAGER:用于管理需要做异步 commit 的 Session。
  • RETRY_COMMITTING_SESSION_MANAGER:用于管理重试 commit 的 Session。
  • RETRY_ROLLBACKING_SESSION_MANAGER:用于管理重试回滚的 Session。 由于这里是开启事务,其它 SessionManager 不需要关注,我们只添加 RootSessionManager 即可。

step3:开启 Globalsession

这一步会把状态变为 Begin,记录开始时间,并且调用 RootSessionManager 的 onBegin 监听方法,将 Session 保存到 map 并写入到我们的文件。

step4:最后返回 XID,这个 XID 是由 ip+port+transactionId 组成的,非常重要,当 TM 申请到之后需要将这个 ID 传到 RM 中,RM 通过 XID 来决定到底应该访问哪一台 Server。

3.3 BranchRegister-分支事务注册

当我们全局事务在 TM 开启之后,我们 RM 的分支事务也需要注册到我们的全局事务之上,这里看看是如何处理的:

step1:通过 transactionId 获取并校验全局事务是否是开启状态。

step2:创建一个新的分支事务,也就是我们的 BranchSession。

step3:对分支事务进行加全局锁,这里的逻辑就是使用的我们锁模块的逻辑。

step4:添加 branchSession,主要是将其添加到 globalSession 对象中,并写入到我们的文件中。

step5:返回 branchId,这个 ID 也很重要,我们后续需要用它来回滚我们的事务,或者对我们分支事务状态更新。

分支事务注册之后,还需要汇报分支事务的本地事务的执行到底是成功还是失败,在 Server 目前只是简单的做一下保存记录,汇报的目的是,就算这个分支事务失败,如果 TM 还是执意要提交全局事务(catch 异常不抛出),那么再遍历提交分支事务的时候,这个失败的分支事务就不需要提交(用户选择性跳过)。

3.4 GlobalCommit - 全局提交

当我们分支事务执行完成之后,就轮到我们的 TM-事务管理器来决定是提交还是回滚,如果是提交,那么就会走到下面的逻辑:

step1:首先找到我们的 globalSession。如果它为 null 证明已经被 commit 过了,那么直接幂等操作,返回成功。

step2:关闭我们的 GloabSession 防止再次有新的 branch 进来(跨服务调用超时回滚,provider 在继续执行)。

step3:如果 status 是等于 Begin,那么久证明还没有提交过,改变其状态为 Committing 也就是正在提交。

step4:判断是否是可以异步提交,目前只有 AT 模式可以异步提交,二阶段全局提交时只是删除 undolog 并无严格顺序,此处使用定时任务,客户端收到后批量合并删除。

step5:如果是异步提交,直接将其放进我们 ASYNC_COMMITTING_SESSION_MANAGER,让其再后台线程异步去做我们的 step6,如果是同步的那么直接执行我们的 step6。

step6:遍历我们的 BranchSession 进行提交,如果某个分支事务失败,根据不同的条件来判断是否进行重试,可异步执行此 branchSession 不成功可以继续执行下一个,因为其本身都在 manager 中,只要没有成功就不会被删除会一直重试,如果是同步提交的会放进重试队列进行定时重试并卡住按照顺序提交。

3.5 GlobalRollback - 全局回滚

如果我们的 TM 决定全局回滚,那么会走到下面的逻辑:

这个逻辑和提交流程基本一致,可以看作是它的反向,这里就不展开讲了。

4.总结

最后在总结一下开始我们提出了分布式事务的关键 4 点,Seata 到底是怎么解决的:

  • 正确的协调:通过后台定时任务各种正确的重试,并且未来会推出监控平台有可能可以手动回滚。
  • 高可用: 通过 HA-Cluster 保证高可用。
  • 高性能:文件顺序写,RPC 通过 netty 实现,Seata 未来可以水平扩展,提高处理性能。
  • 高扩展性:提供给用户可以自由实现的地方,比如配置,服务发现和注册,全局锁等等。

最后希望大家能从这篇文章能了解 Seata-Server 的核心设计原理,当然你也可以想象如果你自己去实现一个分布式事务的 Server 应该怎样去设计?

Seata GitHub 地址:https://github.com/apache/incubator-seata

本文作者:

李钊,GitHub ID @CoffeeLatte007,公众号「咖啡拿铁」作者,Seata 社区 Committer,猿辅导 Java 工程师,曾就职于美团。对分布式中间件,分布式系统有浓厚的兴趣。
季敏(清铭),GitHub ID @slievrly,Seata 开源项目负责人,阿里巴巴中间件 TXC/GTS 核心研发成员,长期从事于分布式中间件核心研发工作,在分布式事务领域有着较丰富的技术积累。

· 阅读需 17 分钟

Fescar 0.4.0 版本发布了 TCC 模式,由蚂蚁金服团队贡献,欢迎大家试用,文末也提供了项目后续的 Roadmap,欢迎关注。

前言:基于 TCC 模型的应用场景

 
1.png

TCC 分布式事务模型直接作用于服务层。不与具体的服务框架耦合,与底层 RPC 协议无关,与底层存储介质无关,可以灵活选择业务资源的锁定粒度,减少资源锁持有时间,可扩展性好,可以说是为独立部署的 SOA 服务而设计的。

一、TCC 模型优势

对于 TCC 分布式事务模型,笔者认为其在业务场景应用上,有两方面的意义。

1.1 跨服务的分布式事务

服务的拆分,也可以认为是资源的横向扩展,只不过方向不同而已。

横向扩展可能沿着两个方向发展:

  1. 功能扩展,根据功能对数据进行分组,并将不同的功能组分布在多个不同的数据库上,这实际上就是 SOA 架构下的服务化。
  2. 数据分片,在功能组内部将数据拆分到多个数据库上,为横向扩展增加一个新的维度。

下图简要阐释了横向数据扩展策略:

2.png

因此,TCC 的其中一个作用就是在按照功能横向扩展资源时,保证多资源访问的事务属性。

1.2 两阶段拆分

TCC 另一个作用就是把两阶段拆分成了两个独立的阶段,通过资源业务锁定的方式进行关联。资源业务锁定方式的好处在于,既不会阻塞其他事务在第一阶段对于相同资源的继续使用,也不会影响本事务第二阶段的正确执行。

传统模型的并发事务:
3.png

TCC 模型的并发事务:
4.png

这对业务有什么好处呢?拿支付宝的担保交易场景来说,简化情况下,只需要涉及两个服务,交易服务和账务服务。交易作为主业务服务,账务作为从业务服务,提供 Try、Commit、Cancel 接口:

  1. Try 接口扣除用户可用资金,转移到预冻结资金。预冻结资金就是业务锁定方案,每个事务第二阶段只能使用本事务的预冻结资金,在第一阶段执行结束后,其他并发事务也可以继续处理用户的可用资金。
  2. Commit 接口扣除预冻结资金,增加中间账户可用资金(担保交易不能立即把钱打给商户,需要有一个中间账户来暂存)。

假设只有一个中间账户的情况下,每次调用支付服务的 Commit 接口,都会锁定中间账户,中间账户存在热点性能问题。 但是,在担保交易场景中,七天以后才需要将资金从中间账户划拨给商户,中间账户并不需要对外展示。因此,在执行完支付服务的第一阶段后,就可以认为本次交易的支付环节已经完成,并向用户和商户返回支付成功的结果,并不需要马上执行支付服务二阶段的 Commit 接口,等到低锋期时,再慢慢消化,异步地执行。
5.png

这就是 TCC 分布式事务模型的二阶段异步化功能,从业务服务的第一阶段执行成功,主业务服务就可以提交完成,然后再由框架异步的执行各从业务服务的第二阶段。

二、通用型 TCC 解决方案

通用型 TCC 解决方案就是最典型的 TCC 分布式事务模型实现,所有从业务服务都需要参与到主业务服务的决策当中。
6.png
 

适用场景

由于从业务服务是同步调用,其结果会影响到主业务服务的决策,因此通用型 TCC 分布式事务解决方案适用于执行时间确定且较短的业务,比如互联网金融企业最核心的三个服务:交易、支付、账务:
7.png
 
当用户发起一笔交易时,首先访问交易服务,创建交易订单;然后交易服务调用支付服务为该交易创建支付订单,执行收款动作,最后支付服务调用账务服务记录账户流水和记账。

为了保证三个服务一起完成一笔交易,要么同时成功,要么同时失败,可以使用通用型 TCC 解决方案,将这三个服务放在一个分布式事务中,交易作为主业务服务,支付作为从业务服务,账务作为支付服务的嵌套从业务服务,由 TCC 模型保证事务的原子性。
8.png

支付服务的 Try 接口创建支付订单,开启嵌套分布式事务,并调用账务服务的 Try 接口;账务服务在 Try 接口中冻结买家资金。一阶段调用完成后,交易完成,提交本地事务,由 TCC 框架完成分布式事务各从业务服务二阶段的调用。

支付服务二阶段先调用账务服务的 Confirm 接口,扣除买家冻结资金;增加卖家可用资金。调用成功后,支付服务修改支付订单为完成状态,完成支付。

当支付和账务服务二阶段都调用完成后,整个分布式事务结束。

三、异步确保型 TCC 解决方案

异步确保型 TCC 解决方案的直接从业务服务是可靠消息服务,而真正的从业务服务则通过消息服务解耦,作为消息服务的消费端,异步地执行。
9.png
 
可靠消息服务需要提供 Try,Confirm,Cancel 三个接口。Try 接口预发送,只负责持久化存储消息数据;Confirm 接口确认发送,这时才开始真正的投递消息;Cancel 接口取消发送,删除消息数据。

消息服务的消息数据独立存储,独立伸缩,降低从业务服务与消息系统间的耦合,在消息服务可靠的前提下,实现分布式事务的最终一致性。

此解决方案虽然增加了消息服务的维护成本,但由于消息服务代替从业务服务实现了 TCC 接口,从业务服务不需要任何改造,接入成本非常低。

适用场景

由于从业务服务消费消息是一个异步的过程,执行时间不确定,可能会导致不一致时间窗口增加。因此,异步确保性 TCC 分布式事务解决方案只适用于对最终一致性时间敏感度较低的一些被动型业务(从业务服务的处理结果不影响主业务服务的决策,只被动的接收主业务服务的决策结果)。比如会员注册服务和邮件发送服务:
10.png
 
当用户注册会员成功,需要给用户发送一封邮件,告诉用户注册成功,并提示用户激活该会员。但要注意两点:

  1. 如果用户注册成功,一定要给用户发送一封邮件;
  2. 如果用户注册失败,一定不能给用户发送邮件。

因此,这同样需要会员服务和邮件服务保证原子性,要么都执行,要么都不执行。不一样的是,邮件服务只是一种被动型的业务,并不影响用户是否能够注册成功,它只需要在用户注册成功以后发送邮件给用户即可,邮件服务不需要参与到会员服务的活动决策中。

对于此种业务场景,可以使用异步确保型TCC分布式事务解决方案,如下:
11.png
 
 
由可靠消息服务来解耦会员和邮件服务,会员服务与消息服务组成 TCC 事务模型,保证事务原子性。然后通过消息服务的可靠特性,确保消息一定能够被邮件服务消费,从而使得会员与邮件服务在同一个分布式事务中。同时,邮件服务也不会影响会员服务的执行过程,只在会员服务执行成功后被动接收发送邮件的请求。

四、补偿型 TCC 解决方案

补偿型 TCC 解决方案与通用型 TCC 解决方案的结构相似,其从业务服务也需要参与到主业务服务的活动决策当中。但不一样的是,前者的从业务服务只需要提供 Do 和 Compensate 两个接口,而后者需要提供三个接口。
12.png
 
Do 接口直接执行真正的完整业务逻辑,完成业务处理,业务执行结果外部可见;Compensate 操作用于业务补偿,抵消或部分抵消正向业务操作的业务结果,Compensate操作需满足幂等性。
与通用型解决方案相比,补偿型解决方案的从业务服务不需要改造原有业务逻辑,只需要额外增加一个补偿回滚逻辑即可,业务改造量较小。但要注意的是,业务在一阶段就执行完整个业务逻辑,无法做到有效的事务隔离,当需要回滚时,可能存在补偿失败的情况,还需要额外的异常处理机制,比如人工介入。

适用场景

由于存在回滚补偿失败的情况,补偿型 TCC 分布式事务解决方案只适用于一些并发冲突较少或者需要与外部交互的业务,这些外部业务不属于被动型业务,其执行结果会影响主业务服务的决策,比如机票代理商的机票预订服务:
13.png
 
该机票服务提供多程机票预订服务,可以同时预订多趟行程航班机票,比如从北京到圣彼得堡,需要第一程从北京到莫斯科,以及第二程从莫斯科到圣彼得堡。

当用户预订机票时,肯定希望能同时预订这两趟航班的机票,只预订一趟航班对用户来说没有意义。因此,对于这样的业务服务同样提出了原子性要求,如果其中一趟航班的机票预订失败,另外一趟需要能够取消预订。

但是,由于航空公司相对于机票代理商来说属于外部业务,只提供订票接口和取消预订接口,想要推动航空公司改造是极其困难的。因此,对于此类业务服务,可以使用补偿型 TCC 分布式事务解决方案,如下:
14.png

网关服务在原有逻辑基础上增加 Compensate 接口,负责调用对应航空公司的取消预订接口。

在用户发起机票预订请求时,机票服务先通过网关 Do 接口,调用各航空公司的预订接口,如果所有航班都预订成功,则整个分布式事务直接执行成功;一旦某趟航班机票预订失败,则分布式事务回滚,由 TCC 事务框架调用各网关的 Compensate 补偿接口,其再调用对应航空公司的取消预订接口。通过这种方式,也可以保证多程机票预订服务的原子性。

五. 总结

对于现在的互联网应用来说,资源横向扩展提供了更多的灵活性,是一种比较容易实现的向外扩展方案,但是同时也明显增加了复杂度,引入一些新的挑战,比如资源之间的数据一致性问题。

横向数据扩展既可以按数据分片扩展,也可以按功能扩展。TCC 模型能在功能横向扩展资源的同时,保证多资源访问的事务属性。

TCC 模型除了跨服务的分布式事务这一层作用之外,还具有两阶段划分的功能,通过业务资源锁定,允许第二阶段的异步执行,而异步化思想正是解决热点数据并发性能问题的利器之一。
 

Roadmap

当前已经发布到 0.4.0,后续我们会发布 0.5 ~ 1.0 版本,继续对 AT、TCC 模式进行功能完善和和丰富,并解决服务端高可用问题,在 1.0 版本之后,本开源产品将达到生产环境使用的标准。


图片1.png

· 阅读需 9 分钟

Fescar 0.4.0 版本发布了 TCC 模式,由蚂蚁金服团队贡献,欢迎大家试用,
Sample 地址:https://github.com/fescar-group/fescar-samples/tree/master/tcc
文末也提供了项目后续的 Roadmap,欢迎关注。

一、TCC 简介

在两阶段提交协议(2PC,Two Phase Commitment Protocol)中,资源管理器(RM, resource manager)需要提供“准备”、“提交”和“回滚” 3 个操作;而事务管理器(TM, transaction manager)分 2 阶段协调所有资源管理器,在第一阶段询问所有资源管理器“准备”是否成功,如果所有资源均“准备”成功则在第二阶段执行所有资源的“提交”操作,否则在第二阶段执行所有资源的“回滚”操作,保证所有资源的最终状态是一致的,要么全部提交要么全部回滚。

资源管理器有很多实现方式,其中 TCC(Try-Confirm-Cancel)是资源管理器的一种服务化的实现;TCC 是一种比较成熟的分布式事务解决方案,可用于解决跨数据库、跨服务业务操作的数据一致性问题;TCC 其 Try、Confirm、Cancel 3 个方法均由业务编码实现,故 TCC 可以被称为是服务化的资源管理器。

TCC 的 Try 操作作为一阶段,负责资源的检查和预留;Confirm 操作作为二阶段提交操作,执行真正的业务;Cancel 是二阶段回滚操作,执行预留资源的取消,使资源回到初始状态。

如下图所示,用户实现 TCC 服务之后,该 TCC 服务将作为分布式事务的其中一个资源,参与到整个分布式事务中;事务管理器分 2 阶段协调 TCC 服务,在第一阶段调用所有 TCC 服务的 Try 方法,在第二阶段执行所有 TCC 服务的 Confirm 或者 Cancel 方法;最终所有 TCC 服务要么全部都是提交的,要么全部都是回滚的。

image.png

二、TCC 设计

用户在接入 TCC 时,大部分工作都集中在如何实现 TCC 服务上,经过蚂蚁金服多年的 TCC 应用,总结如下主要的TCC 设计和实现主要事项:

1、业务操作分两阶段完成

接入 TCC 前,业务操作只需要一步就能完成,但是在接入 TCC 之后,需要考虑如何将其分成 2 阶段完成,把资源的检查和预留放在一阶段的 Try 操作中进行,把真正的业务操作的执行放在二阶段的 Confirm 操作中进行。

以下举例说明业务模式如何分成两阶段进行设计,举例场景:“账户A的余额中有 100 元,需要扣除其中 30 元”;

在接入 TCC 之前,用户编写 SQL:“update 账户表 set 余额 = 余额 - 30 where 账户 = A”,便能一步完成扣款操作。

在接入 TCC 之后,就需要考虑如何将扣款操作分成 2 步完成:

  • Try 操作:资源的检查和预留;

在扣款场景,Try 操作要做的事情就是先检查 A 账户余额是否足够,再冻结要扣款的 30 元(预留资源);此阶段不会发生真正的扣款。

  • Confirm 操作:执行真正业务的提交;

在扣款场景下,Confirm 阶段走的事情就是发生真正的扣款,把A账户中已经冻结的 30 元钱扣掉。

  • Cancel 操作:预留资源的是否释放;

在扣款场景下,扣款取消,Cancel 操作执行的任务是释放 Try 操作冻结的 30 元钱,是 A 账户回到初始状态。

image.png

2、并发控制

用户在实现 TCC 时,应当考虑并发性问题,将锁的粒度降到最低,以最大限度的提高分布式事务的并发性。

以下还是以A账户扣款为例,“账户 A 上有 100 元,事务 T1 要扣除其中的 30 元,事务 T2 也要扣除 30 元,出现并发”。

在一阶段 Try 操作中,分布式事务 T1 和分布式事务 T2 分别冻结资金的那一部分资金,相互之间无干扰;这样在分布式事务的二阶段,无论 T1 是提交还是回滚,都不会对 T2 产生影响,这样 T1 和 T2 在同一笔业务数据上并行执行。

image.png

3、允许空回滚

如下图所示,事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因为丢包而导致的网络超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,而 Cancel 操作调用未出现超时。

TCC 服务在未收到 Try 请求的情况下收到 Cancel 请求,这种场景被称为空回滚;空回滚在生产环境经常出现,用户在实现TCC服务时,应允许允许空回滚的执行,即收到空回滚时返回成功。

image.png

4、防悬挂控制

如下图所示,事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因网络拥堵而导致的超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,Cancel 调用未超时;在此之后,拥堵在网络上的一阶段 Try 数据包被 TCC 服务收到,出现了二阶段 Cancel 请求比一阶段 Try 请求先执行的情况,此 TCC 服务在执行晚到的 Try 之后,将永远不会再收到二阶段的 Confirm 或者 Cancel ,造成 TCC 服务悬挂。

用户在实现  TCC 服务时,要允许空回滚,但是要拒绝执行空回滚之后 Try 请求,要避免出现悬挂。

image.png

5、幂等控制

无论是网络数据包重传,还是异常事务的补偿执行,都会导致 TCC 服务的 Try、Confirm 或者 Cancel 操作被重复执行;用户在实现 TCC 服务时,需要考虑幂等控制,即 Try、Confirm、Cancel 执行一次和执行多次的业务结果是一样的。
image.png

Roadmap

当前已经发布到 0.4.0 版本,后续我们会发布 0.5 ~ 1.0 版本,继续对 AT、TCC 模式进行功能完善和和丰富,并解决服务端高可用问题,在 1.0 版本之后,本开源产品将达到生产环境使用的标准。

图片1.png

· 阅读需 4 分钟

案例

用户采购商品业务,整个业务包含3个微服务:

  • 库存服务: 扣减给定商品的库存数量。
  • 订单服务: 根据采购请求生成订单。
  • 账户服务: 用户账户金额扣减。

业务结构图

Architecture

StorageService

public interface StorageService {

/**
* deduct storage count
*/
void deduct(String commodityCode, int count);
}

OrderService

public interface OrderService {

/**
* create order
*/
Order create(String userId, String commodityCode, int orderCount);
}

AccountService

public interface AccountService {

/**
* debit balance of user's account
*/
void debit(String userId, int money);
}

主要的业务逻辑:

public class BusinessServiceImpl implements BusinessService {

private StorageService storageService;

private OrderService orderService;

/**
* purchase
*/
public void purchase(String userId, String commodityCode, int orderCount) {

storageService.deduct(commodityCode, orderCount);

orderService.create(userId, commodityCode, orderCount);
}
}
public class StorageServiceImpl implements StorageService {

private StorageDAO storageDAO;

@Override
public void deduct(String commodityCode, int count) {
Storage storage = new Storage();
storage.setCount(count);
storage.setCommodityCode(commodityCode);
storageDAO.update(storage);
}
}
public class OrderServiceImpl implements OrderService {

private OrderDAO orderDAO;

private AccountService accountService;

public Order create(String userId, String commodityCode, int orderCount) {

int orderMoney = calculate(commodityCode, orderCount);

accountService.debit(userId, orderMoney);

Order order = new Order();
order.userId = userId;
order.commodityCode = commodityCode;
order.count = orderCount;
order.money = orderMoney;

return orderDAO.insert(order);
}
}

Seata 分布式事务解决方案

undefined

此处仅仅需要一行注解 @GlobalTransactional 写在业务发起方的方法上:


@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
......
}

Dubbo 与 Seata 结合的例子

Step 1: 安装数据库

  • 要求: MySQL (InnoDB 存储引擎)。

提示: 事实上例子中3个微服务需要3个独立的数据库,但为了方便我们使用同一物理库并配置3个逻辑连接串。

更改以下xml文件中的数据库url、username和password

dubbo-account-service.xml dubbo-order-service.xml dubbo-storage-service.xml

    <property name="url" value="jdbc:mysql://x.x.x.x:3306/xxx" />
<property name="username" value="xxx" />
<property name="password" value="xxx" />

Step 2: 为 Seata 创建 UNDO_LOG 表

UNDO_LOG 此表用于 Seata 的AT模式。

CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_unionkey` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=159 DEFAULT CHARSET=utf8

Step 3: 创建相关业务表


DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Step 4: 启动 Seata-Server 服务

  • 下载Server package, 并解压。
  • 运行bin目录下的启动脚本。
sh seata-server.sh $LISTEN_PORT $PATH_FOR_PERSISTENT_DATA

e.g.

sh seata-server.sh 8091 /home/admin/seata/data/

Step 5: 运行例子

相关项目

· 阅读需 23 分钟

fescar发布已有时日,分布式事务一直是业界备受关注的领域,fescar发布一个月左右便受到了近5000个star足以说明其热度。当然,在fescar出来之前, 已经有比较成熟的分布式事务的解决方案开源了,比较典型的方案如 LCN 的2pc型无侵入事务, 目前lcn已发展到5.0,已支持和fescar事务模型类似的TCX型事务。还有如TCC型事务实现 hmily tcc-transaction 等。 在微服务架构流行的当下、阿里这种开源大户背景下,fescar的发布无疑又掀起了研究分布式事务的热潮。fescar脱胎于阿里云商业分布式事务服务GTS,在线上环境提供这种公共服务其模式肯定经受了非常严苛的考验。其分布式事务模型TXC又仿于传统事务模型XA方案,主要区别在于资源管理器的定位一个在应用层一个在数据库层。博主觉得fescar的txc模型实现非常有研究的价值,所以今天我们来好好翻一翻fescar项目的代码。本文篇幅较长,浏览并理解本文大概耗时30~60分钟左右。

项目地址

fescar:https://github.com/alibaba/fescar

本博文所述代码为fescar的0.1.2-SNAPSHOT版本,根据fescar后期的迭代计划,其项目结构和模块实现都可能有很大的改变,特此说明。

fescar的TXC模型

上图为fescar官方针对TXC模型制作的示意图。不得不说大厂的图制作的真的不错,结合示意图我们可以看到TXC实现的全貌。TXC的实现通过三个组件来完成。也就是上图的三个深黄色部分,其作用如下:

  1. TM:全局事务管理器,在标注开启fescar分布式事务的服务端开启,并将全局事务发送到TC事务控制端管理
  2. TC:事务控制中心,控制全局事务的提交或者回滚。这个组件需要独立部署维护,目前只支持单机版本,后续迭代计划会有集群版本
  3. RM:资源管理器,主要负责分支事务的上报,本地事务的管理

一段话简述其实现过程:服务起始方发起全局事务并注册到TC。在调用协同服务时,协同服务的事务分支事务会先完成阶段一的事务提交或回滚,并生成事务回滚的undo_log日志,同时注册当前协同服务到TC并上报其事务状态,归并到同一个业务的全局事务中。此时若没有问题继续下一个协同服务的调用,期间任何协同服务的分支事务回滚,都会通知到TC,TC在通知全局事务包含的所有已完成一阶段提交的分支事务回滚。如果所有分支事务都正常,最后回到全局事务发起方时,也会通知到TC,TC在通知全局事务包含的所有分支删除回滚日志。在这个过程中为了解决写隔离和度隔离的问题会涉及到TC管理的全局锁。

本博文的目标是深入代码细节,探究其基本思路是如何实现的。首先会从项目的结构来简述每个模块的作用,继而结合官方自带的examples实例来探究整个分布式事务的实现过程。

项目结构解析

项目拉下来,用IDE打开后的目录结构如下,下面先大致的看下每个模块的实现

  • common :公共组件,提供常用辅助类,静态变量、扩展机制类加载器、以及定义全局的异常等
  • config : 配置加载解析模块,提供了配置的基础接口,目前只有文件配置实现,后续会有nacos等配置中心的实现
  • core : 核心模块主要封装了TM、RM和TC通讯用RPC相关内容
  • dubbo :dubbo模块主要适配dubbo通讯框架,使用dubbo的filter机制来传统全局事务的信息到分支
  • examples :简单的演示实例模块,等下从这个模块入手探索
  • rm-datasource :资源管理模块,比较核心的一个模块,个人认为这个模块命名为core要更合理一点。代理了JDBC的一些类,用来解析sql生成回滚日志、协调管理本地事务
  • server : TC组件所在,主要协调管理全局事务,负责全局事务的提交或者回滚,同时管理维护全局锁。
  • spring :和spring集成的模块,主要是aop逻辑,是整个分布式事务的入口,研究fescar的突破口
  • tm : 全局事务事务管理模块,管理全局事务的边界,全局事务开启回滚点都在这个模块控制

通过【examples】模块的实例看下效果

第一步、先启动TC也就是【Server】模块,main方法直接启动就好,默认服务端口8091

第二步、回到examples模块,将订单,业务,账户、仓库四个服务的配置文件配置好,主要是mysql数据源和zookeeper连接地址,这里要注意下,默认dubbo的zk注册中心依赖没有,启动的时候回抛找不到class的异常,需要添加如下的依赖:

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

第三步、在BusinessServiceImpl中的模拟抛异常的地方打个断点,依次启动OrderServiceImpl、StorageServiceImpl、AccountServiceImpl、BusinessServiceImpl四个服务、等进断点后,查看数据库account_tbl表,金额已减去400元,变成了599元。然后放开断点、BusinessServiceImpl模块模拟的异常触发,全局事务回滚,account_tbl表的金额就又回滚到999元了

如上,我们已经体验到fescar事务的控制能力了,下面我们具体看下它是怎么控制的。

fescar事务过程分析

首先分析配置文件

这个是一个铁律,任何一个技术或框架要集成,配置文件肯定是一个突破口。从上面的例子我们了解到,实例模块的配置文件中配置了一个全局事务扫描器实例,如:

<bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner">
<constructor-arg value="dubbo-demo-app"/>
<constructor-arg value="my\_test\_tx_group"/>
</bean>

这个实例在项目启动时会扫描所有实例,具体实现见【spring】模块。并将标注了@GlobalTransactional注解的方法织入GlobalTransactionalInterceptor的invoke方法逻辑。同时应用启动时,会初始化TM(TmRpcClient)和RM(RmRpcClient)的实例,这个时候,服务已经和TC事务控制中心勾搭上了。在往下看就涉及到TM模块的事务模板类TransactionalTemplate。

【TM】模块启动全局事务

全局事务的开启,提交、回滚都被封装在TransactionalTemplate中完成了,代码如:


public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 2. begin transaction
try {
tx.begin(business.timeout(), business.name());
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. any business exception, rollback.
try {
tx.rollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. everything is fine, commit.
try {
tx.commit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
}

更详细的实现在【TM】模块中被分成了两个Class实现,如下:

DefaultGlobalTransaction :全局事务具体的开启,提交、回滚动作

DefaultTransactionManager :负责使用TmRpcClient向TC控制中心发送指令,如开启全局事务(GlobalBeginRequest)、提交(GlobalCommitRequest)、回滚(GlobalRollbackRequest)、查询状态(GlobalStatusRequest)等。

以上是TM模块核心内容点,TM模块完成全局事务开启后,接下来就开始看看全局事务iD,xid是如何传递、RM组件是如何介入的

【dubbo】全局事务xid的传递

首先是xid的传递,目前已经实现了dubbo框架实现的微服务架构下的传递,其他的像spring cloud和motan等的想要实现也很容易,通过一般RPC通讯框架都有的filter机制,将xid从全局事务的发起节点传递到服务协从节点,从节点接收到后绑定到当前线程上线文环境中,用于在分支事务执行sql时判断是否加入全局事务。fescar的实现见【dubbo】模块如下:

@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext\[" + xid + "\] xid in RpcContext\[" + rpcXid + "\]");
}
boolean bind = false;
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind\[" + rpcXid + "\] to RootContext");
}
}
}
try {
return invoker.invoke(invocation);

} finally {
if (bind) {
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind\[" + unbindXid + "\] from RootContext");
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind \[" + unbindXid + "\] back to RootContext");
}
}
}
}
}
}

上面代码rpcXid不为空时,就加入到了RootContext的ContextCore中,这里稍微深入讲下。ContextCore是一个可扩展实现的接口,目前默认的实现是ThreadLocalContextCore,基于ThreadLocal来保存维护当前的xid。这里fescar提供了可扩展的机制,实现在【common】模块中,通过一个自定义的类加载器EnhancedServiceLoader加载需要扩展的服务类,这样只需要在扩展类加上@LoadLevel注解。标记order属性声明高优先级别,就可以达到扩展实现的目的。

【RM】模块本地资源管理的介入

fescar针对本地事务相关的接口,通过代理机制都实现了一遍代理类,如数据源(DataSourceProxy)、ConnectionProxy、StatementProxy等。这个在配置文件中也可以看出来,也就是说,我们要使用fescar分布式事务,一定要配置fescar提供的代理数据源。如:

配置好代理数据源后,从DataSourceProxy出发,本地针对数据库的所有操作过程我们就可以随意控制了。从上面xid传递,已经知道了xid被保存在RootContext中了,那么请看下面的代码,就非常清楚了:

首先看StatementProxy的一段代码

在看ExecuteTemplate中的代码

和【TM】模块中的事务管理模板类TransactionlTemplate类似,这里非常关键的逻辑代理也被封装在了ExecuteTemplate模板类中。因重写了Statement有了StatementProxy实现,在执行原JDBC的executeUpdate方法时,会调用到ExecuteTemplate的execute逻辑。在sql真正执行前,会判断RootCOntext当前上下文中是否包含xid,也就是判断当前是否是全局分布式事务。如果不是,就直接使用本地事务,如果是,这里RM就会增加一些分布式事务相关的逻辑了。这里根据sql的不同的类型,fescar封装了五个不同的执行器来处理,分别是UpdateExecutor、DeleteExecutor、InsertExecutor、SelectForUpdateExecutor、PlainExecutor,结构如下图:

PlainExecutor:

原生的JDBC接口实现,未做任何处理,提供给全局事务中的普通的select查询使用

UpdateExecutor、DeleteExecutor、InsertExecutor:

三个DML增删改执行器实现,主要在sql执行的前后对sql语句进行了解析,实现了如下两个抽象接口方法:

protected abstract TableRecords beforeImage() throws SQLException;

protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

在这个过程中通过解析sql生成了提供回滚操作的undo_log日志,日志目前是保存在msyql中的,和业务sql操作共用同一个事务。表的结构如下:

rollback_info保存的undo_log详细信息,是longblob类型的,结构如下:

{
    "branchId":3958194,
    "sqlUndoLogs":[
        {
            "afterImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PrimaryKey",
                                "name":"ID",
                                "type":4,
                                "value":10
                            },
                            {
                                "keyType":"NULL",
                                "name":"COUNT",
                                "type":4,
                                "value":98
                            }
                        ]
                    }
                ],
                "tableName":"storage_tbl"
            },
            "beforeImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PrimaryKey",
                                "name":"ID",
                                "type":4,
                                "value":10
                            },
                            {
                                "keyType":"NULL",
                                "name":"COUNT",
                                "type":4,
                                "value":100
                            }
                        ]
                    }
                ],
                "tableName":"storage_tbl"
            },
            "sqlType":"UPDATE",
            "tableName":"storage_tbl"
        }
    ],
    "xid":"192.168.7.77:8091:3958193"
}


这里贴的是一个update的操作,undo_log记录的非常的详细,通过全局事务xid关联branchid,记录数据操作的表名,操作字段名,以及sql执行前后的记录数,如这个记录,表名=storage_tbl,sql执行前ID=10,count=100,sql执行后id=10,count=98。如果整个全局事务失败,需要回滚的时候就可以生成:

update storage_tbl set count = 100 where id = 10;

这样的回滚sql语句执行了。

SelectForUpdateExecutor:

fescar的AT模式在本地事务之上默认支持读未提交的隔离级别,但是通过SelectForUpdateExecutor执行器,可以支持读已提交的隔离级别。代码如:

@Override
public Object doExecute(Object... args) throws Throwable {
SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;

Connection conn = statementProxy.getConnection();
ResultSet rs = null;
Savepoint sp = null;
LockRetryController lockRetryController = new LockRetryController();
boolean originalAutoCommit = conn.getAutoCommit();

StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
selectSQLAppender.append(getTableMeta().getPkName());
selectSQLAppender.append(" FROM " + getTableMeta().getTableName());
String whereCondition = null;
ArrayList<Object> paramAppender = new ArrayList<>();
if (statementProxy instanceof ParametersHolder) {
whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
} else {
whereCondition = recognizer.getWhereCondition();
}
if (!StringUtils.isEmpty(whereCondition)) {
selectSQLAppender.append(" WHERE " + whereCondition);
}
selectSQLAppender.append(" FOR UPDATE");
String selectPKSQL = selectSQLAppender.toString();

try {
if (originalAutoCommit) {
conn.setAutoCommit(false);
}
sp = conn.setSavepoint();
rs = statementCallback.execute(statementProxy.getTargetStatement(), args);

while (true) {
// Try to get global lock of those rows selected
Statement stPK = null;
PreparedStatement pstPK = null;
ResultSet rsPK = null;
try {
if (paramAppender.isEmpty()) {
stPK = statementProxy.getConnection().createStatement();
rsPK = stPK.executeQuery(selectPKSQL);
} else {
pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
for (int i = 0; i < paramAppender.size(); i++) {
pstPK.setObject(i + 1, paramAppender.get(i));
}
rsPK = pstPK.executeQuery();
}

TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);
break;

} catch (LockConflictException lce) {
conn.rollback(sp);
lockRetryController.sleep(lce);

} finally {
if (rsPK != null) {
rsPK.close();
}
if (stPK != null) {
stPK.close();
}
if (pstPK != null) {
pstPK.close();
}
}
}

} finally {
if (sp != null) {
conn.releaseSavepoint(sp);
}
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
}
return rs;
}

关键代码见:

TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);

通过selectPKRows表操作记录拿到lockKeys,然后到TC控制器端查询是否被全局锁定了,如果被锁定了,就重新尝试,直到锁释放返回查询结果。

分支事务的注册和上报

在本地事务提交前,fescar会注册和上报分支事务相关的信息,见ConnectionProxy类的commit部分代码:

@Override
public void commit() throws SQLException {
if (context.inGlobalTransaction()) {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}

try {
if (context.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
report(true);
context.reset();

} else {
targetConnection.commit();
}
}

从这段代码我们可以看到,首先是判断是了是否是全局事务,如果不是,就直接提交了,如果是,就先向TC控制器注册分支事务,为了写隔离,在TC端会涉及到全局锁的获取。然后保存了用于回滚操作的undo_log日志,继而真正提交本地事务,最后向TC控制器上报事务状态。此时,阶段一的本地事务已完成了。

【server】模块协调全局

关于server模块,我们可以聚焦在DefaultCoordinator这个类,这个是AbstractTCInboundHandler控制处理器默认实现。主要实现了全局事务开启,提交,回滚,状态查询,分支事务注册,上报,锁检查等接口,如:

回到一开始的TransactionlTemplate,如果整个分布式事务失败需要回滚了,首先是TM向TC发起回滚的指令,然后TC接收到后,解析请求后会被路由到默认控制器类的doGlobalRollback方法内,最终在TC控制器端执行的代码如下:

@Override
public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
continue;
}
try {
BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
branchSession.getResourceId(), branchSession.getApplicationData());

switch (branchStatus) {
case PhaseTwo_Rollbacked:
globalSession.removeBranch(branchSession);
LOGGER.error("Successfully rolled back branch " + branchSession);
continue;
case PhaseTwo\_RollbackFailed\_Unretryable:
GlobalStatus currentStatus = globalSession.getStatus();
if (currentStatus.name().startsWith("Timeout")) {
globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed);
} else {
globalSession.changeStatus(GlobalStatus.RollbackFailed);
}
globalSession.end();
LOGGER.error("Failed to rollback global\[" + globalSession.getTransactionId() + "\] since branch\[" + branchSession.getBranchId() + "\] rollback failed");
return;
default:
LOGGER.info("Failed to rollback branch " + branchSession);
if (!retrying) {
queueToRetryRollback(globalSession);
}
return;

}
} catch (Exception ex) {
LOGGER.info("Exception rollbacking branch " + branchSession, ex);
if (!retrying) {
queueToRetryRollback(globalSession);
if (ex instanceof TransactionException) {
throw (TransactionException) ex;
} else {
throw new TransactionException(ex);
}
}

}

}
GlobalStatus currentStatus = globalSession.getStatus();
if (currentStatus.name().startsWith("Timeout")) {
globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
} else {
globalSession.changeStatus(GlobalStatus.Rollbacked);
}
globalSession.end();
}

如上代码可以看到,回滚时从全局事务会话中迭代每个分支事务,然后通知每个分支事务回滚。分支服务接收到请求后,首先会被路由到RMHandlerAT中的doBranchRollback方法,继而调用了RM中的branchRollback方法,代码如下:

@Override
public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo\_RollbackFailed\_Unretryable;
} else {
return BranchStatus.PhaseTwo\_RollbackFailed\_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}

RM分支事务端最后执行的是UndoLogManager的undo方法,通过xid和branchid从数据库查询出回滚日志,完成数据回滚操作,整个过程都是同步完成的。如果全局事务是成功的,TC也会有类似的上述协调过程,只不过是异步的将本次全局事务相关的undo_log清除了而已。至此,就完成了2阶段的提交或回滚,也就完成了完整的全局事务事务的控制。

结语

如果你看到这里,那么非常感谢你,在繁忙工作之余耐心的花时间来学习。同时,我相信花的时间没白费,完整的浏览理解估计对fescar实现的大致流程了解的十之八九了。本文从构思立题到完成大概耗时1人天左右,博主在这个过程中,对fescar的实现也有了更加深入的了解。由于篇幅原因,并没有面面俱到的对每个实现的细节去深究,如sql是如何解析的等,更多的是在fescar的TXC模型的实现过程的关键点做了详细阐述。本文已校对,但由于个人知识水平及精力有限,文中不免出现错误或理解不当的地方,欢迎指正。

作者简介:

陈凯玲,2016年5月加入凯京科技。曾任职高级研发和项目经理,现任凯京科技研发中心架构&运维部负责人。pmp项目管理认证,阿里云MVP。热爱开源,先后开源过多个热门项目。热爱分享技术点滴,独立博客KL博客(http://www.kailing.pub)博主。

· 阅读需 2 分钟

回顾总览中的描述:一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

  • 一阶段 prepare 行为
  • 二阶段 commit 或 rollback 行为

Overview of a global transaction

根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction ModeManual (Branch) Transaction Mode.

AT 模式基于 支持本地 ACID 事务关系型数据库

  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
  • 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
  • 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。

相应的,MT 模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

所谓 MT 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。