从上一篇文章「分布式事务中间件Seata的设计原理」讲了下 Seata AT 模式的一些设计原理,从中也知道了 AT 模式的三个角色(RM、TM、TC),接下来我会更新 Seata 源码分析系列文章。今天就来分析 Seata AT 模式在启动的时候都做了哪些操作。
客户端启动逻辑
TM 是负责整个全局事务的管理器,因此一个全局事务是由 TM 开启的,TM 有个全局管理类 GlobalTransaction,结构如下:
io.seata.tm.api.GlobalTransaction
public interface GlobalTransaction {
void begin() throws TransactionException;
void begin(int timeout) throws TransactionException;
void begin(int timeout, String name) throws TransactionException;
void commit() throws TransactionException;
void rollback() throws TransactionException;
GlobalStatus getStatus() throws TransactionException;
// ...
}
可以通过 GlobalTransactionContext 创建一个 GlobalTransaction,然后用 GlobalTransaction 进行全局事务的开启、提交、回滚等操作,因此我们直接用 API 方式使用 Seata AT 模式:
//init seata;
TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);
//trx
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
tx.begin(60000, "testBiz");
// 事务处理
// ...
tx.commit();
} catch (Exception exx) {
tx.rollback();
throw exx;
}
如果每次使用全局事务都这样写,难免会造成代码冗余,我们的项目都是基于 Spring 容器,这时我们可以利用 Spring AOP 的特性,用模板模式把这些冗余代码封装模版里,参考 Mybatis-spring 也是做了这么一件事情,那么接下来我们来分析一下基于 Spring 的项目启动 Seata 并注册全局事务时都做了哪些工作。
我们开启一个全局事务是在方法上加上 @GlobalTransactional
注解,Seata 的 Spring 模块中,有个 GlobalTransactionScanner,它的继承关系如下:
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean, ApplicationContextAware, DisposableBean {
// ...
}
在基于 Spring 项目的启动过程中,对该类会有如下初始化流程:
InitializingBean 的 afterPropertiesSet() 方法调用了 initClient() 方法:
io.seata.spring.annotation.GlobalTransactionScanner#initClient
TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);
对 TM 和 RM 做了初始化操作。
- TM 初始化
io.seata.tm.TMClient#init
public static void init(String applicationId, String transactionServiceGroup) {
// 获取 TmRpcClient 实例
TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
// 初始化 TM Client
tmRpcClient.init();
}
调用 TmRpcClient.getInstance() 方法会获取一个 TM 客户端实例,在获取过程中,会创建 Netty 客户端配置文件对象,以及创建 messageExecutor 线程池,该线程池用于在处理各种与服务端的消息交互,在创建 TmRpcClient 实例时,创建 ClientBootstrap,用于管理 Netty 服务的启停,以及 ClientChannelManager,它是专门用于管理 Netty 客户端对象池,Seata 的 Netty 部分配合使用了对象池,后面在分析网络模块会讲到。
io.seata.core.rpc.netty.AbstractRpcRemotingClient#init
public void init() {
clientBootstrap.start();
// 定时尝试连接服务端
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
super.init();
}
调用 TM 客户端 init() 方法,最终会启动 netty 客户端(此时还未真正启动,在对象池被调用时才会被真正启动);开启一个定时任务,定时重新发送 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端,具体逻辑是在 NettyClientChannelManager 中的 channels 中缓存了客户端 channel,如果此时 channels 不存在获取已过期,那么就会尝试连接服务端以重新获取 channel 并将其缓存到 channels 中;开启一条单独线程,用于处理异步请求发送,这里用得很巧妙,之后在分析网络模块在具体对其进行分析。
io.seata.core.rpc.netty.AbstractRpcRemoting#init
public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
if (entry.getValue().isTimeout()) {
futures.remove(entry.getKey());
entry.getValue().setResultMessage(null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}
在 AbstractRpcRemoting 的 init 方法中,又是开启了一个定时任务,该定时任务主要是用于定时清除 futures 已过期的 futrue,futures 是保存发送请求需要返回结果的 future 对象,该对象有个超时时间,过了超时时间就会自动抛异常,因此需要定时清除已过期的 future 对象。
- RM 初始化
io.seata.rm.RMClient#init
public static void init(String applicationId, String transactionServiceGroup) {
RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
rmRpcClient.setResourceManager(DefaultResourceManager.get());
rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
rmRpcClient.init();
}