跳到主要内容

· 阅读需 12 分钟

RPC 模块是我最初研究 Seata 源码开始的地方,因此我对 Seata 的 RPC 模块有过一些深刻研究,在我研究了一番后,发现 RPC 模块中的代码需要进行优化,使得代码更加优雅,交互逻辑更加清晰易懂,本着 “让天下没有难懂的 RPC 通信代码” 的初衷,我开始了 RPC 模块的重构之路。

这里建议想要深入了解 Seata 交互细节的,不妨从 RPC 模块的源码入手,RPC 模块相当于 Seata 的中枢,Seata 所有的交互逻辑在 RPC 模块中表现得淋漓尽致。

这次 RPC 模块的重构将会使得 Seata 的中枢变得更加健壮和易于解读。

重构继承关系

在 Seata 的旧版本中,RPC 模块的整体结构有点混乱,尤其是在各个类的继承关系上,主要体现在:

  1. 直接在 Remoting 类继承 Netty Handler,使得 Remoting 类与 Netty Handler 处理逻辑耦合在一起;
  2. 客户端和服务端的 Reomting 类继承关系不统一;
  3. RemotingClient 被 RpcClientBootstrap 实现,而 RemotingServer 却被 RpcServer 实现,没有一个独立的 ServerBootstrap,这个看起来关系非常混乱;
  4. 有些接口没必要抽取出来,比如 ClientMessageSender、ClientMessageListener、ServerMessageSender 等接口,因这些接口会增加整体结构继承关系的复杂性。

针对上面发现的问题,在重构过程中我大致做了如下事情:

  1. 将 Netty Handler 抽象成一个内部类放在 Remoting 类中;
  2. 将 RemotingClient 为客户端顶级接口,定义客户端与服务端交互的基本方法,抽象一层 AbstractNettyRemotingClient,下面分别有 RmNettyRemotingClient、TmNettyRemotingClient;将 RemotingServer 为服务端顶级接口,定义服务端与客户端交互的基本方法,实现类 NettyRemotingServer;
  3. 同时将 ClientMessageSender、ClientMessageListener、ServerMessageSender 等接口方法归入到 RemotingClient、RemotingServer 中,由 Reomting 类实现 RemotingClient、RemotingServer,统一 Remoting 类继承关系;
  4. 新建 RemotingBootstrap 接口,客户端和服务端分别实现 NettyClientBootstrap、NettyServerBootstrap,将引导类逻辑从 Reomting 类抽离出来。

在最新的 RPC 模块中的继承关系简单清晰,用如下类关系图表示:

  1. AbstractNettyRemoting:Remoting 类的最顶层抽象,包含了客户端和服务端公用的成员变量与公用方法,拥有通用的请求方法(文章后面会讲到),Processor 处理器调用逻辑(文章后面会讲到);
  2. RemotingClient:客户端最顶级接口,定义客户端与服务端交互的基本方法;
  3. RemotingServer:服务端最顶级接口,定义服务端与客户端交互的基本方法;
  4. AbstractNettyRemotingClient:客户端抽象类,继承 AbstractNettyRemoting 类并实现了 RemotingClient 接口;
  5. NettyRemotingServer:服务端实现类,继承 AbstractNettyRemoting 类并实现了 RemotingServer 接口;
  6. RmNettyRemotingClient:Rm 客户端实现类,继承 AbstractNettyRemotingClient 类;
  7. TmNettyRemotingClient:Tm 客户端实现类,继承 AbstractNettyRemotingClient 类。

同时将客户端和服务端的引导类逻辑抽象出来,如下类关系图表示:

  1. RemotingBootstrap:引导类接口,有 start 和 stop 两个抽象方法;
  2. NettyClientBootstrap:客户端引导实现类;
  3. NettyServerBootstrap:服务端引导实现类。

解耦处理逻辑

解耦处理逻辑即是将 RPC 交互的处理逻辑从 Netty Handler 中抽离出来,并将处理逻辑抽象成一个个 Processor,为什么要这么做呢?我大致讲下现在存在的一些问题:

  1. Netty Handler 与 处理逻辑是糅合在一起的,由于客户端与服务端都共用了一套处理逻辑,因此为了兼容更多的交互,在处理逻辑中你可以看到非常多难以理解的判断逻辑;
  2. 在 Seata 的交互中有些请求是异步处理的,也有一些请求是同步处理的,但是在旧的处理代码逻辑中对同步异步处理的表达非常隐晦,而且难以看明白;
  3. 无法从代码逻辑当中清晰地表达出请求消息类型与对应的处理逻辑关系;
  4. 在 Seata 后面的更新迭代中,如果不将处理处理逻辑抽离出来,这部分代码想要增加新的交互逻辑,将会非常困难。

在将处理逻辑从 Netty Handler 进行抽离之前,我们先梳理一下 Seata 现有的交互逻辑:

  • RM 客户端请求服务端的交互逻辑:

  • TM 客户端请求服务端的交互逻辑:

  • 服务端请求 RM 客户端的交互逻辑:

从以上的交互图中可以清晰地看到了 Seata 的交互逻辑。

客户端总共接收服务端的消息:

1)服务端请求消息

  1. BranchCommitRequest、BranchRollbackRequest、UndoLogDeleteRequest

2)服务端响应消息

  1. RegisterRMResponse、BranchRegisterResponse、BranchReportResponse、GlobalLockQueryResponse

RegisterTMResponse、GlobalBeginResponse、GlobalCommitResponse、GlobalRollbackResponse、GlobalStatusResponse、GlobalReportResponse 3. HeartbeatMessage(PONG)

服务端总共接收客户端的消息:

1)客户端请求消息:

  1. RegisterRMRequest、BranchRegisterRequest、BranchReportRequest、GlobalLockQueryRequest

RegisterTMRequest、GlobalBeginRequest、GlobalCommitRequest、GlobalRollbackRequest、GlobalStatusRequest、GlobalReportRequest 3. HeartbeatMessage(PING)

2)客户端响应消息:

  1. BranchCommitResponse、BranchRollbackResponse

基于以上的交互逻辑分析,我们可以将处理消息的逻辑抽象成若干个 Processor,一个 Processor 可以处理一个或者多个消息类型的消息,只需在 Seata 启动时注册将消息类型注册到 ProcessorTable 中即可,形成一个映射关系,这样就可以根据消息类型调用对应的 Processor 对消息进行处理,用如下图表示:

在抽象 Remoting 类中定一个 processMessage 方法,方法逻辑是根据消息类型从 ProcessorTable 中拿到消息类型对应的 Processor。

这样就成功将处理逻辑从 Netty Handler 中彻底抽离出来了,Handler#channelRead 方法只需要调用 processMessage 方法即可,且还可以灵活根据消息类型动态注册 Processor 到 ProcessorTable 中,处理逻辑的可扩展性得到了极大的提升。

以下是 Processor 的调用流程:

1)客户端

  1. RmBranchCommitProcessor:处理服务端全局提交请求;
  2. RmBranchRollbackProcessor:处理服务端全局回滚请求;
  3. RmUndoLogProcessor:处理服务端 undo log 删除请求;
  4. ClientOnResponseProcessor:客户端处理服务端响应请求,如:BranchRegisterResponse、GlobalBeginResponse、GlobalCommitResponse 等;
  5. ClientHeartbeatProcessor:处理服务端心跳响应。

2)服务端

  1. RegRmProcessor:处理 RM 客户端注册请求;
  2. RegTmProcessor:处理 TM 客户端注册请求;
  3. ServerOnRequestProcessor:处理客户端相关请求,如:BranchRegisterRequest、GlobalBeginRequest、GlobalLockQueryRequest 等;
  4. ServerOnResponseProcessor:处理客户端相关响应,如:BranchCommitResponse、BranchRollbackResponse 等;
  5. ServerHeartbeatProcessor:处理客户端心跳响应。

下面我以 TM 发起全局事务提交请求为例子,让大家感受下 Processor 在整个交互中所处的位置:

重构请求方法

在 Seata 的旧版本当中,RPC 的请求方法也是欠缺优雅,主要体现在:

  1. 请求方法过于杂乱无章,没有层次感;
  2. sendAsyncRequest 方法耦合的代码太多,逻辑过于混乱,客户端与服务端都共用了一套请求逻辑,方法中决定是否批量发送是根据参数 address 是否为 null 决定,决定是否同步请求是根据 timeout 是否大于 0 决定,显得极为不合理,且批量请求只有客户端有用到,服务端并没有批量请求,共用一套请求逻辑还会导致服务端异步请求也会创建 MessageFuture 放入 futures 中;
  3. 请求方法名称风格不统一,比如客户端 sendMsgWithResponse,服务端却叫 sendSyncRequest;

针对以上旧版本 RPC 请求方法的各种缺点,我作了以下改动:

  1. 将请求方法统一放入 RemotingClient、RemotingServer 接口当中,并作为顶级接口;
  2. 分离客户端与服务端请求逻辑,将批量请求逻辑单独抽到客户端相关请求方法中,使得是否批量发送不再根据参数 address 是否为 null 决定;
  3. 由于 Seata 自身的逻辑特点,客户端服务端请求方法的参数无法统一,可通过抽取通用的同步/异步请求方法,客户端和服务端根据自身请求逻辑特点实现自身的同步/异步请求逻辑,最后再调用通用的同步/异步请求方法,使得同步/异步请求都有明确的方法,不再根据 timeout 是否大于 0 决定;
  4. 统一请求名称风格。

最终,Seata RPC 的请求方法终于看起来更加优雅且有层次感了。

同步请求:

异步请求:

其它

  1. 类目录调整:RPC 模块目录中还有一个 netty 目录,也可以从目录结构中发现 Seata 的初衷是兼容多个 RPC 框架,目前只实现了 netty,但发现 netty 模块中有些类并不 ”netty“,且 RPC 跟目录的类也并不通用,因此需要将相关类的位置进行调整;
  2. 某些类重新命名,比如 netty 相关类包含 「netty」;

最终 RPC 模块看起来是这样的:

作者简介

张乘辉,目前就职于蚂蚁集团,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 19 分钟

【分布式事务Seata源码解读一】Server端启动流程

实现分布式事务的核心要点:

  1. 事务的持久化,事务所处的各种状态事务参与方的各种状态都需要持久化,当实例宕机时才能基于持久化的数据对事务回滚或提交,实现最终一致性
  2. 定时对超时未完成事务的处理(继续尝试提交或回滚),即通过重试机制实现事务的最终一致性
  3. 分布式事务的跨服务实例传播,当分布式事务跨多个实例时需要实现事务的传播,一般需要适配不同的rpc框架
  4. 事务的隔离级别:大多数分布式事务为了性能,默认的隔离级别是读未提交
  5. 幂等性:对于XA或者seata的AT这样的分布式事务来说,都已经默认实现了幂等性,而TCC、Saga这种接口级别实现的分布式事务都还需要业务开发者自己实现幂等性。

本片文章主要从seata-server的启动流程的角度介绍一下seata-server的源码,启动流程图如下:

在这里插入图片描述

1. 启动类Server

seata-server的入口类在Server类中,源码如下:

public static void main(String[] args) throws IOException {
// 从环境变量或运行时参数中获取监听端口,默认端口8091
int port = PortHelper.getPort(args);

// 把监听端口设置到SystemProperty中,Logback的LoggerContextListener实现类
// SystemPropertyLoggerContextListener会把Port写入到Logback的Context中,
// 在logback.xml文件中会使用Port变量来构建日志文件名称。
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

// 创建Logger
final Logger logger = LoggerFactory.getLogger(Server.class);
if (ContainerHelper.isRunningInContainer()) {
logger.info("The server is running in container.");
}

// 解析启动以及配置文件的各种配置参数
ParameterParser parameterParser = new ParameterParser(args);

// metrics相关,这里是使用SPI机制获取Registry实例对象
MetricsManager.get().init();

// 把从配置文件中读取到的storeMode写入SystemProperty中,方便其他类使用。
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

// 创建NettyRemotingServer实例,NettyRemotingServer是一个基于Netty实现的Rpc框架,
// 此时并没有初始化,NettyRemotingServer负责与客户端SDK中的TM、RM进行网络通信。
nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);

// 设置监听端口
nettyRemotingServer.setListenPort(parameterParser.getPort());

// UUIDGenerator初始化,UUIDGenerator基于雪花算法实现,
// 用于生成全局事务、分支事务的id。
// 多个Server实例配置不同的ServerNode,保证id的唯一性
UUIDGenerator.init(parameterParser.getServerNode());

// SessionHodler负责事务日志(状态)的持久化存储,
// 当前支持file、db、redis三种存储模式,集群部署模式要使用db或redis模式
SessionHolder.init(parameterParser.getStoreMode());

// 创建初始化DefaultCoordinator实例,DefaultCoordinator是TC的核心事务逻辑处理类,
// 底层包含了AT、TCC、SAGA等不同事务类型的逻辑处理。
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

// 127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());

try {
// 初始化Netty,开始监听端口并阻塞在这里,等待程序关闭
nettyRemotingServer.init();
} catch (Throwable e) {
logger.error("nettyServer init error:{}", e.getMessage(), e);
System.exit(-1);
}

System.exit(0);
}

2. 解析配置

参数解析的实现代码在ParameterParser类中,init方法源码如下:

private void init(String[] args) {
try {
// 判断是否运行在容器中,如果运行在容器中则配置从环境变量中获取
if (ContainerHelper.isRunningInContainer()) {
this.seataEnv = ContainerHelper.getEnv();
this.host = ContainerHelper.getHost();
this.port = ContainerHelper.getPort();
this.serverNode = ContainerHelper.getServerNode();
this.storeMode = ContainerHelper.getStoreMode();
} else {
// 基于JCommander获取启动应用程序时配置的参数,
// JCommander通过注解、反射的方式把参数赋值到当前类的字段上。
JCommander jCommander = JCommander.newBuilder().addObject(this).build();
jCommander.parse(args);
if (help) {
jCommander.setProgramName(PROGRAM_NAME);
jCommander.usage();
System.exit(0);
}
}
// serverNode用于雪花算中的实例的唯一标识,需要保证唯一。
// 如果没有指定基于当前服务器的I随机生成一个
if (this.serverNode == null) {
this.serverNode = IdWorker.initWorkerId();
}
if (StringUtils.isNotBlank(seataEnv)) {
System.setProperty(ENV_PROPERTY_KEY, seataEnv);
}
if (StringUtils.isBlank(storeMode)) {
// 这里牵扯到一个重要的Configuration类,ParameterParser只负责获取ip、port、storeMode等核心参数,
// 其他的参数都是从Configuration中获取的。这里如果没有启动参数没有指定storeMode,
// 就从Configuration类中获取。
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
SERVER_DEFAULT_STORE_MODE);
}
} catch (ParameterException e) {
printError(e);
}

}

在ParameterParser的init方法中第一次调用了ConfigurationFactory.getInstance(),初始化了一个单例的Configuration对象,Configuration负责初始化所有的其他配置参数信息。从Seata Server端的源码中我们能看到两个配置文件file.conf、registry.conf。那么这两个配置文件的区别是什么,两个文件都是必须的吗?我们继续看代码。

ConfigurationFactory.getInstance方法其实就是获取一个单例对象,核心在buildConfiguration方法中,不过在buidlConfiguration方法前,ConfigurationFactory类有一段static代码块会先执行。

// 获取Configuration的单例对象
public static Configuration getInstance() {
if (instance == null) {
synchronized (Configuration.class) {
if (instance == null) {
instance = buildConfiguration();
}
}
}
return instance;
}

// ConfigurationFactory的static代码块
static {
// 获取配置文件的名称,默认为registry.conf
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (seataConfigName == null) {
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}
if (seataConfigName == null) {
seataConfigName = REGISTRY_CONF_PREFIX;
}
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (envValue == null) {
envValue = System.getenv(ENV_SYSTEM_KEY);
}

// 读取registry.conf文件的配置,构建基础的Configuration对象
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName + REGISTRY_CONF_SUFFIX,
false) : new FileConfiguration(seataConfigName + "-" + envValue + REGISTRY_CONF_SUFFIX, false);
Configuration extConfiguration = null;
try {
// ExtConfigurationProvider当前只有一个SpringBootConfigurationProvider实现类
// 用于支持客户端SDK SpringBoot的配置文件方式,对于Server端来说这段逻辑可以忽略。
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null ? configuration.getClass().getSimpleName()
: extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
}

ConfigurationFactory中的static代码块是从registry.conf中读取配置信息。registry.conf中主有两个配置信息,注册中心配置源配置源用来指定其他更详细的配置项是file.conf或者是apollo等其他配置源。所以registry.conf配置文件时必须的,registry.conf配置文件中指定其他详细配置的配置源,当前配置源支持file、zk、apollo、nacos、etcd3等。所以file.conf不是必须的,只有当设置配置源为file类型时才会读取file.conf文件中的内容。

接下来ConfigurationFactory中的buildConfiguration就是根据registry.conf中设置的配置源来加载更多的配置项。

private static Configuration buildConfiguration() {
ConfigType configType;
String configTypeName;
try {
// 从registry.conf配置文件中读取config.type字段值,并解析为枚举ConfigType
configTypeName = CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);

if (StringUtils.isBlank(configTypeName)) {
throw new NotSupportYetException("config type can not be null");
}

configType = ConfigType.getType(configTypeName);
} catch (Exception e) {
throw e;
}
Configuration extConfiguration = null;
Configuration configuration;
if (ConfigType.File == configType) {
// 如果配置文件为file类型,则从registry.conf中读取config.file.name配置项,
// 即file类型配置文件的路径,示例中默认为file.conf
String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY);
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);

// 根据file配置文件的路径构建FileConfuguration对象
configuration = new FileConfiguration(name);
try {
// configuration的额外扩展,也是只对客户端SpringBoot的SDK才生效
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null
? configuration.getClass().getSimpleName() : extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
} else {
// 如果配置文件的类型不是file,如:nacos、zk等,
// 则通过SPI的方式生成对应的ConfigurationProvider对象
configuration = EnhancedServiceLoader
.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
}
try {
// ConfigurationCache是对Configuration做了一次层代理内存缓存,提升获取配置的性能
Configuration configurationCache;
if (null != extConfiguration) {
configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
} else {
configurationCache = ConfigurationCache.getInstance().proxy(configuration);
}
if (null != configurationCache) {
extConfiguration = configurationCache;
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load configurationCacheProvider:{}", e.getMessage(), e);
}
return null == extConfiguration ? configuration : extConfiguration;
}

3. 初始化UUIDGenerator

UUIDGenertor初始化接收一个serverNode参数,UUIDGenertor当前是使用了雪花算法来生成唯一Id,该serverNode用来保证多个seata-server实例生成的唯一id不重复。

public class UUIDGenerator {

/**
* Generate uuid long.
*
* @return the long
*/
public static long generateUUID() {
return IdWorker.getInstance().nextId();
}

/**
* Init.
*
* @param serverNode the server node id
*/
public static void init(Long serverNode) {
IdWorker.init(serverNode);
}
}

UUIDGenerator是对IdWorker做了封装,唯一id的核心实现逻辑在IdWoker类中,IdWorker是一个雪花算法实现的。此处的IdWorker又是一个单例

public class IdWorker
/**
* Constructor
*
* @param workerId就是上面提到的ServerNode, 取值范围在0·1023,也就是在64位的UUID中占10位
*/
public IdWorker(long workerId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(
String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
this.workerId = workerId;
}

/**
* Get the next ID (the method is thread-safe)
*
* @return SnowflakeId
*/
public long nextId() {
long timestamp = timeGen();

if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format(
"clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}

synchronized (this) {
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
}
//雪花算法64位唯一id组成:第一位0 + 41位时间戳 + 10位workerId + 12位自增序列化(同一时间戳内自增)
return ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
}

4. SessionHolder初始化

SessionHolder负责Session的持久化,一个Session对象对应一个事务,事务分为两种:全局事务(GlobalSession)和分支事务(BranchSession)。 SessionHolder支持file和db两种持久化方式,其中db支持集群模式,推荐使用db。SessionHolder中最主要的四个字段如下:

// ROOT_SESSION_MANAGER用于获取所有的Setssion,以及Session的创建、更新、删除等。
private static SessionManager ROOT_SESSION_MANAGER;
// 用于获取、更新所有的异步commit的Session
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于获取、更新所有需要重试commit的Session
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于获取、更新所有需要重试rollback的Session
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

SessionHolder的init方法

public static void init(String mode) throws IOException {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
}
StoreMode storeMode = StoreMode.get(mode);
if (StoreMode.DB.equals(storeMode)) {
// 这里又用到了SPI的方式加载SessionManager,
// 其实下面获取的四个SessionManager实例都是同一个类DataBaseSessionManager的不同实例,
// 只是给DataBaseSessionManager的构造函数传参不同。
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
} else if (StoreMode.FILE.equals(storeMode)) {
//file模式可以先不关心
...
} else {
throw new IllegalArgumentException("unknown store mode:" + mode);
}
// reload方法对于db模式可以忽略
reload();
}

上面看到SessionHolder中的四个SessionManager本质都是类DataBaseSessionManager的实例,只是给构造函数传参不同,看下DataBaseSessionManager的定义:

public DataBaseSessionManager(String name) {
super();
this.taskName = name;
}

// 根据实例的taskName来决定allSessions返回的事务列表,
// 如taskName等于ASYNC_COMMITTING_SESSION_MANAGER_NAME的
// 就返回所有状态为AsyncCommitting的事务。
public Collection<GlobalSession> allSessions() {
// get by taskName
if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
} else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying}));
} else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
} else {
// taskName为null,则对应ROOT_SESSION_MANAGER,即获取所有状态的事务
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {
GlobalStatus.UnKnown, GlobalStatus.Begin,
GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
GlobalStatus.RollbackRetrying,
GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying,
GlobalStatus.AsyncCommitting}));
}
}

5. 初始化DefaultCoordinator

DefaultCoordinator是事务协调器的核心,如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是由DefaultCoordinator负责协调处理的。DefaultCoordinato通过RpcServer与远程的TM、RM通信来实现分支事务的提交、回滚等。

public DefaultCoordinator(ServerMessageSender messageSender) {
// 接口messageSender的实现类就是上文提到的RpcServer
this.messageSender = messageSender;

// DefaultCore封装了AT、TCC、Saga等分布式事务模式的具体实现类
this.core = new DefaultCore(messageSender);
}

// init方法初始化了5个定时器,主要用于分布式事务的重试机制,
// 因为分布式环境的不稳定性会造成事务处于中间状态,
// 所以要通过不断的重试机制来实现事务的最终一致性。
// 下面的定时器除了undoLogDelete之外,其他的定时任务默认都是1秒执行一次。
public void init() {
// 处理处于回滚状态可重试的事务
retryRollbacking.scheduleAtFixedRate(() -> {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// 处理二阶段可以重试提交的状态可重试的事务
retryCommitting.scheduleAtFixedRate(() -> {
try {
handleRetryCommitting();
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// 处理异步提交的事务
asyncCommitting.scheduleAtFixedRate(() -> {
try {
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// 检查事务的第一阶段已经超时的事务,设置事务状态为TimeoutRollbacking,
// 该事务会由其他定时任务执行回滚操作
timeoutCheck.scheduleAtFixedRate(() -> {
try {
timeoutCheck();
} catch (Exception e) {
LOGGER.info("Exception timeout checking ... ", e);
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// 根据unlog的保存天数调用RM删除unlog
undoLogDelete.scheduleAtFixedRate(() -> {
try {
undoLogDelete();
} catch (Exception e) {
LOGGER.info("Exception undoLog deleting ... ", e);
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

6. 初始化NettyRemotingServer

NettyRemotingServer是基于Netty实现的简化版的Rpc服务端,NettyRemotingServer初始化时主要做了两件事:

  1. registerProcessor:注册与Client通信的Processor。
  2. super.init():super.init()方法中负责初始化Netty,并把当前实例的IP端口注册到注册中心中
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}

private void registerProcessor() {
// 1. 注册核心的ServerOnRequestProcessor,即与事务处理相关的Processor,
// 如:全局事务开始、提交,分支事务注册、反馈当前状态等。
// ServerOnRequestProcessor的构造函数中传入getHandler()返回的示例,这个handler
// 就是前面提到的DefaultCoordinator,DefaultCoordinator是分布式事务的核心处理类
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);

// 2.注册ResponseProcessor,ResponseProcessor用于处理当Server端主动发起请求时,
// Client端回复的消息,即Response。如:Server向Client端发送分支事务提交或者回滚的请求时,
// Client返回提交/回滚的结果
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

// 3. Client端发起RM注册请求时对应的Processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);

// 4. Client端发起TM注册请求时对应的Processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

// 5. Client端发送心跳请求时对应的Processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

在NettyRemotingServer中有调用基类AbstractNettyRemotingServer的init方法,代码如下:

public void init() {
// super.init()方法中启动了一个定时清理超时Rpc请求的定时任务,3S执行一次。
super.init();
// 配置Netty Server端,开始监听端口。
serverBootstrap.start();
}

// serverBootstrap.start();
public void start() {
// Netty server端的常规配置,其中添加了两个ChannelHandler:
// ProtocolV1Decoder、ProtocolV1Encoder,
// 分别对应Seata自定义RPC协议的解码器和编码器
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(listenPort))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}

}
});

try {
// 开始监听配置的端口
ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
LOGGER.info("Server started, listen port: {}", listenPort);
// Netty启动成功之后把当前实例注册到registry.conf配置文件配置的注册中心上
RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
initialized.set(true);
future.channel().closeFuture().sync();
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

· 阅读需 21 分钟

作者简介:煊檍,GitHub ID:sharajava,阿里巴巴中件间 GTS 研发团队负责人,SEATA 开源项目发起人,曾在 Oracle 北京研发中心多年,从事 WebLogic 核心研发工作。长期专注于中间件,尤其是分布式事务领域的技术实践。

Seata 1.2.0 版本重磅发布新的事务模式:XA 模式,实现对 XA 协议的支持。

这里,我们从三个方面来深入解读这个新的特性:

  • 是什么(What):XA 模式是什么?
  • 为什么(Why):为什么支持 XA?
  • 怎么做(How):XA 模式是如何实现的,以及怎样使用?

1. XA 模式是什么?

这里有两个基本的前置概念:

  1. 什么是 XA?
  2. 什么是 Seata 定义的所谓 事务模式?

基于这两点,再来理解 XA 模式就很自然了。

1.1 什么是 XA?

XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准。

XA 规范 描述了全局的事务管理器与局部的资源管理器之间的接口。 XA规范 的目的是允许的多个资源(如数据库,应用服务器,消息队列等)在同一事务中访问,这样可以使 ACID 属性跨越应用程序而保持有效。

XA 规范 使用两阶段提交(2PC,Two-Phase Commit)来保证所有资源同时提交或回滚任何特定的事务。

XA 规范 在上世纪 90 年代初就被提出。目前,几乎所有主流的数据库都对 XA 规范 提供了支持。

1.2 什么是 Seata 的事务模式?

Seata 定义了全局事务的框架。

全局事务 定义为若干 分支事务 的整体协调:

  1. TM 向 TC 请求发起(Begin)、提交(Commit)、回滚(Rollback)全局事务。
  2. TM 把代表全局事务的 XID 绑定到分支事务上。
  3. RM 向 TC 注册,把分支事务关联到 XID 代表的全局事务中。
  4. RM 把分支事务的执行结果上报给 TC。(可选)
  5. TC 发送分支提交(Branch Commit)或分支回滚(Branch Rollback)命令给 RM。
seata-mod

Seata 的 全局事务 处理过程,分为两个阶段:

  • 执行阶段 :执行 分支事务,并 保证 执行结果满足是 可回滚的(Rollbackable)持久化的(Durable)
  • 完成阶段: 根据 执行阶段 结果形成的决议,应用通过 TM 发出的全局提交或回滚的请求给 TC,TC 命令 RM 驱动 分支事务 进行 Commit 或 Rollback。

Seata 的所谓 事务模式 是指:运行在 Seata 全局事务框架下的 分支事务 的行为模式。准确地讲,应该叫作 分支事务模式。

不同的 事务模式 区别在于 分支事务 使用不同的方式达到全局事务两个阶段的目标。即,回答以下两个问题:

  • 执行阶段 :如何执行并 保证 执行结果满足是 可回滚的(Rollbackable)持久化的(Durable)
  • 完成阶段: 收到 TC 的命令后,如何做到分支的提交或回滚?

以我们 Seata 的 AT 模式和 TCC 模式为例来理解:

AT 模式

at-mod
  • 执行阶段:

    • 可回滚:根据 SQL 解析结果,记录回滚日志
    • 持久化:回滚日志和业务 SQL 在同一个本地事务中提交到数据库
  • 完成阶段:

    • 分支提交:异步删除回滚日志记录
    • 分支回滚:依据回滚日志进行反向补偿更新

TCC 模式

tcc-mod
  • 执行阶段:

    • 调用业务定义的 Try 方法(完全由业务层面保证 可回滚持久化
  • 完成阶段:

    • 分支提交:调用各事务分支定义的 Confirm 方法
    • 分支回滚:调用各事务分支定义的 Cancel 方法

1.3 什么是 Seata 的 XA 模式?

XA 模式:

在 Seata 定义的分布式事务框架内,利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式。

xa-mod
  • 执行阶段:

    • 可回滚:业务 SQL 操作放在 XA 分支中进行,由资源对 XA 协议的支持来保证 可回滚
    • 持久化:XA 分支完成后,执行 XA prepare,同样,由资源对 XA 协议的支持来保证 持久化(即,之后任何意外都不会造成无法回滚的情况)
  • 完成阶段:

    • 分支提交:执行 XA 分支的 commit
    • 分支回滚:执行 XA 分支的 rollback

2. 为什么支持 XA?

为什么要在 Seata 中增加 XA 模式呢?支持 XA 的意义在哪里呢?

2.1 补偿型事务模式的问题

本质上,Seata 已经支持的 3 大事务模式:AT、TCC、Saga 都是 补偿型 的。

补偿型 事务处理机制构建在 事务资源 之上(要么在中间件层面,要么在应用层面),事务资源 本身对分布式事务是无感知的。

img

事务资源 对分布式事务的无感知存在一个根本性的问题:无法做到真正的 全局一致性 。

比如,一条库存记录,处在 补偿型 事务处理过程中,由 100 扣减为 50。此时,仓库管理员连接数据库,查询统计库存,就看到当前的 50。之后,事务因为异外回滚,库存会被补偿回滚为 100。显然,仓库管理员查询统计到的 50 就是 脏 数据。

可以看到,补偿型 分布式事务机制因为不要求 事务资源 本身(如数据库)的机制参与,所以无法保证从事务框架之外的全局视角的数据一致性。

2.2 XA 的价值

与 补偿型 不同,XA 协议 要求 事务资源 本身提供对规范和协议的支持。

nct

因为 事务资源 感知并参与分布式事务处理过程,所以 事务资源(如数据库)可以保障从任意视角对数据的访问有效隔离,满足全局数据一致性。

比如,上一节提到的库存更新场景,XA 事务处理过程中,中间态数据库存 50 由数据库本身保证,是不会仓库管理员的查询统计 到的。(当然隔离级别需要 读已提交 以上)

除了 全局一致性 这个根本性的价值外,支持 XA 还有如下几个方面的好处:

  1. 业务无侵入:和 AT 一样,XA 模式将是业务无侵入的,不给应用设计和开发带来额外负担。
  2. 数据库的支持广泛:XA 协议被主流关系型数据库广泛支持,不需要额外的适配即可使用。
  3. 多语言支持容易:因为不涉及 SQL 解析,XA 模式对 Seata 的 RM 的要求比较少,为不同语言开发 SDK 较之 AT 模式将更 ,更容易。
  4. 传统基于 XA 应用的迁移:传统的,基于 XA 协议的应用,迁移到 Seata 平台,使用 XA 模式将更平滑。

2.3 XA 广泛被质疑的问题

不存在某一种分布式事务机制可以完美适应所有场景,满足所有需求。

XA 规范早在上世纪 90 年代初就被提出,用以解决分布式事务处理这个领域的问题。

现在,无论 AT 模式、TCC 模式还是 Saga 模式,这些模式的提出,本质上都源自 XA 规范对某些场景需求的无法满足。

XA 规范定义的分布式事务处理机制存在一些被广泛质疑的问题,针对这些问题,我们是如何思考的呢?

  1. 数据锁定:数据在整个事务处理过程结束前,都被锁定,读写都按隔离级别的定义约束起来。

思考:

数据锁定是获得更高隔离性和全局一致性所要付出的代价。

补偿型 的事务处理机制,在 执行阶段 即完成分支(本地)事务的提交,(资源层面)不锁定数据。而这是以牺牲 隔离性 为代价的。

另外,AT 模式使用 全局锁 保障基本的 写隔离,实际上也是锁定数据的,只不过锁在 TC 侧集中管理,解锁效率高且没有阻塞的问题。

  1. 协议阻塞:XA prepare 后,分支事务进入阻塞阶段,收到 XA commit 或 XA rollback 前必须阻塞等待。

思考:

协议的阻塞机制本身并不是问题,关键问题在于 协议阻塞 遇上 数据锁定。

如果一个参与全局事务的资源 “失联” 了(收不到分支事务结束的命令),那么它锁定的数据,将一直被锁定。进而,甚至可能因此产生死锁。

这是 XA 协议的核心痛点,也是 Seata 引入 XA 模式要重点解决的问题。

基本思路是两个方面:避免 “失联” 和 增加 “自解锁” 机制。(这里涉及非常多技术细节,暂时不展开,在后续 XA 模式演进过程中,会专门拿出来讨论)

  1. 性能差:性能的损耗主要来自两个方面:一方面,事务协调过程,增加单个事务的 RT;另一方面,并发事务数据的锁冲突,降低吞吐。

思考:

和不使用分布式事务支持的运行场景比较,性能肯定是下降的,这点毫无疑问。

本质上,事务(无论是本地事务还是分布式事务)机制就是拿部分 性能的牺牲 ,换来 编程模型的简单 。

与同为 业务无侵入 的 AT 模式比较:

首先,因为同样运行在 Seata 定义的分布式事务框架下,XA 模式并没有产生更多事务协调的通信开销。

其次,并发事务间,如果数据存在热点,产生锁冲突,这种情况,在 AT 模式(默认使用全局锁)下同样存在的。

所以,在影响性能的两个主要方面,XA 模式并不比 AT 模式有非常明显的劣势。

AT 模式性能优势主要在于:集中管理全局数据锁,锁的释放不需要 RM 参与,释放锁非常快;另外,全局提交的事务,完成阶段 异步化。

3. XA 模式如何实现以及怎样用?

3.1 XA 模式的设计

3.1.1 设计目标

XA 模式的基本设计目标,两个主要方面:

  1. 从 场景 上,满足 全局一致性 的需求。
  2. 从 应用上,保持与 AT 模式一致的无侵入。
  3. 从 机制 上,适应分布式微服务架构的特点。

整体思路:

  1. 与 AT 模式相同的:以应用程序中 本地事务 的粒度,构建到 XA 模式的 分支事务。
  2. 通过数据源代理,在应用程序本地事务范围外,在框架层面包装 XA 协议的交互机制,把 XA 编程模型 透明化。
  3. 把 XA 的 2PC 拆开,在分支事务 执行阶段 的末尾就进行 XA prepare,把 XA 协议完美融合到 Seata 的事务框架,减少一轮 RPC 交互。

3.1.2 核心设计

1. 整体运行机制

XA 模式 运行在 Seata 定义的事务框架内:

xa-fw
  • 执行阶段(E xecute):

    • XA start/XA end/XA prepare + SQL + 注册分支
  • 完成阶段(F inish):

    • XA commit/XA rollback

2. 数据源代理

XA 模式需要 XAConnection。

获取 XAConnection 两种方式:

  • 方式一:要求开发者配置 XADataSource
  • 方式二:根据开发者的普通 DataSource 来创建

第一种方式,给开发者增加了认知负担,需要为 XA 模式专门去学习和使用 XA 数据源,与 透明化 XA 编程模型的设计目标相违背。

第二种方式,对开发者比较友好,和 AT 模式使用一样,开发者完全不必关心 XA 层面的任何问题,保持本地编程模型即可。

我们优先设计实现第二种方式:数据源代理根据普通数据源中获取的普通 JDBC 连接创建出相应的 XAConnection。

类比 AT 模式的数据源代理机制,如下:

img

但是,第二种方法有局限:无法保证兼容的正确性。

实际上,这种方法是在做数据库驱动程序要做的事情。不同的厂商、不同版本的数据库驱动实现机制是厂商私有的,我们只能保证在充分测试过的驱动程序上是正确的,开发者使用的驱动程序版本差异很可能造成机制的失效。

这点在 Oracle 上体现非常明显。参见 Druid issue:https://github.com/alibaba/druid/issues/3707

综合考虑,XA 模式的数据源代理设计需要同时支持第一种方式:基于 XA 数据源进行代理。

类比 AT 模式的数据源代理机制,如下:

img

3. 分支注册

XA start 需要 Xid 参数。

这个 Xid 需要和 Seata 全局事务的 XID 和 BranchId 关联起来,以便由 TC 驱动 XA 分支的提交或回滚。

目前 Seata 的 BranchId 是在分支注册过程,由 TC 统一生成的,所以 XA 模式分支注册的时机需要在 XA start 之前。

将来一个可能的优化方向:

把分支注册尽量延后。类似 AT 模式在本地事务提交之前才注册分支,避免分支执行失败情况下,没有意义的分支注册。

这个优化方向需要 BranchId 生成机制的变化来配合。BranchId 不通过分支注册过程生成,而是生成后再带着 BranchId 去注册分支。

4. 小结

这里只通过几个重要的核心设计,说明 XA 模式的基本工作机制。

此外,还有包括 连接保持异常处理 等重要方面,有兴趣可以从项目代码中进一步了解。

以后会陆续写出来和大家交流。

3.1.3 演进规划

XA 模式总体的演进规划如下:

  1. 第 1 步(已经完成):首个版本(1.2.0),把 XA 模式原型机制跑通。确保只增加,不修改,不给其他模式引入的新问题。
  2. 第 2 步(计划 5 月完成):与 AT 模式必要的融合、重构。
  3. 第 3 步(计划 7 月完成):完善异常处理机制,进行上生产所必需的打磨。
  4. 第 4 步(计划 8 月完成):性能优化。
  5. 第 5 步(计划 2020 年内完成):结合 Seata 项目正在进行的面向云原生的 Transaction Mesh 设计,打造云原生能力。

3.2 XA 模式的使用

从编程模型上,XA 模式与 AT 模式保持完全一致。

可以参考 Seata 官网的样例:seata-xa

样例场景是 Seata 经典的,涉及库存、订单、账户 3 个微服务的商品订购业务。

在样例中,上层编程模型与 AT 模式完全相同。只需要修改数据源代理,即可实现 XA 模式与 AT 模式之间的切换。

    @Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
// DataSourceProxy for AT mode
// return new DataSourceProxy(druidDataSource);

// DataSourceProxyXA for XA mode
return new DataSourceProxyXA(druidDataSource);
}

4. 总结

在当前的技术发展阶段,不存一个分布式事务处理机制可以完美满足所有场景的需求。

一致性、可靠性、易用性、性能等诸多方面的系统设计约束,需要用不同的事务处理机制去满足。

Seata 项目最核心的价值在于:构建一个全面解决分布式事务问题的 标准化 平台。

基于 Seata,上层应用架构可以根据实际场景的需求,灵活选择合适的分布式事务解决方案。

img

XA 模式的加入,补齐了 Seata 在 全局一致性 场景下的缺口,形成 AT、TCC、Saga、XA 四大 事务模式 的版图,基本可以满足所有场景的分布式事务处理诉求。

当然 XA 模式和 Seata 项目本身都还不尽完美,有很多需要改进和完善的地方。非常欢迎大家参与到项目的建设中,共同打造一个标准化的分布式事务平台。

· 阅读需 14 分钟

Seata阿里开源的一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

1.1 四种事务模式

Seata 目标打造一站式的分布事务的解决方案,最终会提供四种事务模式:

目前使用的流行度情况是:AT > TCC > Saga。因此,我们在学习 Seata 的时候,可以花更多精力在 AT 模式上,最好搞懂背后的实现原理,毕竟分布式事务涉及到数据的正确性,出问题需要快速排查定位并解决。

友情提示:具体的流行度,朋友可以选择看看 Wanted: who's using Seata 每个公司登记的使用方式。

1.2 三种角色

在 Seata 的架构中,一共有三个角色:

三个角色

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围,开始全局事务、提交或回滚全局事务。
  • RM ( Resource Manager ) - 资源管理器:管理分支事务处理的资源( Resource ),与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

其中,TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端。

在 Seata 中,一个分布式事务的生命周期如下:

架构图

友情提示:看下艿艿添加的红色小勾。

  • TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。

    XID,会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。

  • RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。

  • TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。

  • TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。

1.3 框架支持情况

Seata 目前提供了对主流的微服务框架的支持:

同时方便我们集成到 Java 项目当中,Seata 也提供了相应的 Starter 库:

因为 Seata 是基于 DataSource 数据源进行代理来拓展,所以天然对主流的 ORM 框架提供了非常好的支持:

  • MyBatis、MyBatis-Plus
  • JPA、Hibernate

1.4 案例情况

Wanted: who's using Seata 的登记情况,Seata 已经在国内很多团队开始落地,其中不乏有滴滴、韵达等大型公司。可汇总如下图:

汇总图

另外,在 awesome-seata 仓库中,艿艿看到了滴滴等等公司的落地时的技术分享,还是非常真实可靠的。如下图所示:awesome-seata 滴滴

从案例的情况来说,Seata 可能给是目前已知最可靠的分布式事务解决方案,至少对它进行技术投入是非常不错的选择。

2. 部署单机 TC Server

本小节,我们来学习部署单机 Seata TC Server,常用于学习或测试使用,不建议在生产环境中部署单机。

因为 TC 需要进行全局事务和分支事务的记录,所以需要对应的存储。目前,TC 有两种存储模式( store.mode ):

  • file 模式:适合单机模式,全局事务会话信息在内存中读写,并持久化本地文件 root.data,性能较高。
  • db 模式:适合集群模式,全局事务会话信息通过 db 共享,相对性能差点。

显然,我们将采用 file 模式,最终我们部署单机 TC Server 如下图所示:单机 TC Server

哔哔完这么多,我们开始正式部署单机 TC Server,这里艿艿使用 macOS 系统,和 Linux、Windows 是差不多的,朋友脑补翻译。

2.1 下载 Seata 软件包

打开 Seata 下载页面,选择想要的 Seata 版本。这里,我们选择 v1.1.0 最新版本。

# 创建目录
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# 下载
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# 解压
$ tar -zxvf seata-server-1.1.0.tar.gz

# 查看目录
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # 执行脚本
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # 配置文件
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + 依赖库

2.2 启动 TC Server

执行 nohup sh bin/seata-server.sh & 命令,启动 TC Server 在后台。在 nohup.out 文件中,我们看到如下日志,说明启动成功:

# 使用 File 存储器
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[FILE] extension by class[io.seata.server.store.file.FileTransactionStoreManager]
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[FILE] extension by class[io.seata.server.session.file.FileBasedSessionManager]
# 启动成功
2020-04-02 08:36:01.597 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
  • 默认配置下,Seata TC Server 启动在 8091 端点。

因为我们使用 file 模式,所以可以看到用于持久化的本地文件 root.data。操作命令如下:

$ ls -ls sessionStore/
total 0
0 -rw-r--r-- 1 yunai staff 0 Apr 2 08:36 root.data

后续,朋友可以阅读「4. 接入 Java 应用」小节,开始使用 Seata 实现分布式事务。

3. 部署集群 TC Server

本小节,我们来学习部署集群 Seata TC Server,实现高可用,生产环境下必备。在集群时,多个 Seata TC Server 通过 db 数据库,实现全局事务会话信息的共享。

同时,每个 Seata TC Server 可以注册自己到注册中心上,方便应用从注册中心获得到他们。最终我们部署 集群 TC Server 如下图所示:集群 TC Server

Seata TC Server 对主流的注册中心都提供了集成,具体可见 discovery 目录。考虑到国内使用 Nacos 作为注册中心越来越流行,这里我们就采用它。

友情提示:如果对 Nacos 不了解的朋友,可以参考《Nacos 安装部署》文章。

哔哔完这么多,我们开始正式部署单机 TC Server,这里艿艿使用 macOS 系统,和 Linux、Windows 是差不多的,朋友脑补翻译。

3.1 下载 Seata 软件包

打开 Seata 下载页面,选择想要的 Seata 版本。这里,我们选择 v1.1.0 最新版本。

# 创建目录
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# 下载
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# 解压
$ tar -zxvf seata-server-1.1.0.tar.gz

# 查看目录
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # 执行脚本
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # 配置文件
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + 依赖库

3.2 初始化数据库

① 使用 mysql.sql 脚本,初始化 Seata TC Server 的 db 数据库。脚本内容如下:

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

在 MySQL 中,创建 seata 数据库,并在该库下执行该脚本。最终结果如下图:seata 数据库 - MySQL 5.X

② 修改 conf/file 配置文件,修改使用 db 数据库,实现 Seata TC Server 的全局事务会话信息的共享。如下图所示:conf/file 配置文件

③ MySQL8 的支持

如果朋友使用的 MySQL 是 8.X 版本,则需要看该步骤。否则,可以直接跳过。

首先,需要下载 MySQL 8.X JDBC 驱动,命令行操作如下:

$ cd lib
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar

然后,修改 conf/file 配置文件,使用该 MySQL 8.X JDBC 驱动。如下图所示:seata 数据库 - MySQL 8.X

3.3 设置使用 Nacos 注册中心

修改 conf/registry.conf 配置文件,设置使用 Nacos 注册中心。如下图所示:conf/registry.conf 配置文件

3.4 启动 TC Server

① 执行 nohup sh bin/seata-server.sh -p 18091 -n 1 & 命令,启动第一个 TC Server 在后台。

  • -p:Seata TC Server 监听的端口。
  • -n:Server node。在多个 TC Server 时,需区分各自节点,用于生成不同区间的 transactionId 事务编号,以免冲突。

nohup.out 文件中,我们看到如下日志,说明启动成功:

# 使用 DB 存储器
2020-04-05 16:54:12.793 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load DataSourceGenerator[dbcp] extension by class[io.seata.server.store.db.DbcpDataSourceGenerator]
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load LogStore[DB] extension by class[io.seata.core.store.db.LogStoreDataBaseDAO]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[DB] extension by class[io.seata.server.store.db.DatabaseTransactionStoreManager]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[DB] extension by class[io.seata.server.session.db.DataBaseSessionManager]
# 启动成功
2020-04-05 16:54:13.779 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
# 使用 Nacos 注册中心
2020-04-05 16:54:13.788 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load RegistryProvider[Nacos] extension by class[io.seata.discovery.registry.nacos.NacosRegistryProvider]

② 执行 nohup sh bin/seata-server.sh -p 28091 -n 2 & 命令,启动第二个 TC Server 在后台。

③ 打开 Nacos 注册中心的控制台,我们可以看到有两个 Seata TC Server 示例。如下图所示:Nacos 控制台

4. 接入 Java 应用

4.1 AT 模式

① Spring Boot

1、《芋道 Spring Boot 分布式事务 Seata 入门》「2. AT 模式 + 多数据源」小节,实现单体 Spring Boot 项目在多数据源下的分布式事务。

整体图

2、《芋道 Spring Boot 分布式事务 Seata 入门》「AT 模式 + HttpClient 远程调用」小节,实现多个 Spring Boot 项目的分布事务。

整体图

② Dubbo

《Dubbo 分布式事务 Seata 入门》「2. AT 模式」小节,实现多个 Dubbo 服务下的分布式事务。

整体图

③ Spring Cloud

《芋道 Spring Cloud Alibaba 分布式事务 Seata 入门》「3. AT 模式 + Feign」小节,实现多个 Spring Cloud 服务下的分布式事务。

整体图

4.2 TCC 模式

4.3 Saga 模式

4.4 XA 模式

Seata 正在开发中...

· 阅读需 7 分钟

使用配置中心和数据库来实现 Seata 的高可用,以 Nacos 和 MySQL 为例,将cloud-seata-nacos应用部署到 Kubernetes 集群中

该应用使用 Nacos 作为配置和注册中心,总共有三个服务: order-service, pay-service, storage-service, 其中 order-service 对外提供下单接口,当余额和库存充足时,下单成功,会提交事务,当不足时会抛出异常,下单失败,回滚事务

准备工作

需要准备可用的注册中心、配置中心 Nacos 和 MySQL,通常情况下,注册中心、配置中心和数据库都是已有的,不需要特别配置,在这个实践中,为了简单,只部署单机的注册中心、配置中心和数据库,假设他们是可靠的

  • 部署 Nacos

在服务器部署 Nacos,开放 8848 端口,用于 seata-server 注册,服务器地址为 192.168.199.2

docker run --name nacos -p 8848:8848 -e MODE=standalone nacos/nacos-server
  • 部署 MySQL

部署一台MySQL 数据库,用于保存事务数据,服务器地址为 192.168.199.2

docker run --name mysql -p 30060:3306-e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17

部署 seata-server

  • 创建seata-server需要的表

具体的 SQL 参考 script/server/db,这里使用的是 MySQL 的脚本,数据库名称为 seata

同时,也需要创建 undo_log 表, 可以参考 script/client/at/db/

  • 修改seata-server配置

将以下配置添加到 Nacos 配置中心,具体添加方法可以参考 script/config-center

service.vgroupMapping.my_test_tx_group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true
store.db.user=root
store.db.password=123456

部署 seata-server 到 Kubernetes

  • seata-server.yaml

需要将 ConfigMap 的注册中心和配置中心地址改成相应的地址

apiVersion: v1
kind: Service
metadata:
name: seata-ha-server
namespace: default
labels:
app.kubernetes.io/name: seata-ha-server
spec:
type: ClusterIP
ports:
- port: 8091
protocol: TCP
name: http
selector:
app.kubernetes.io/name: seata-ha-server

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
name: seata-ha-server
namespace: default
labels:
app.kubernetes.io/name: seata-ha-server
spec:
serviceName: seata-ha-server
replicas: 3
selector:
matchLabels:
app.kubernetes.io/name: seata-ha-server
template:
metadata:
labels:
app.kubernetes.io/name: seata-ha-server
spec:
containers:
- name: seata-ha-server
image: docker.io/seataio/seata-server:latest
imagePullPolicy: IfNotPresent
env:
- name: SEATA_CONFIG_NAME
value: file:/root/seata-config/registry
ports:
- name: http
containerPort: 8091
protocol: TCP
volumeMounts:
- name: seata-config
mountPath: /root/seata-config
volumes:
- name: seata-config
configMap:
name: seata-ha-server-config


---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-ha-server-config
data:
registry.conf: |
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "192.168.199.2"
}
}
config {
type = "nacos"
nacos {
serverAddr = "192.168.199.2"
group = "SEATA_GROUP"
}
}
  • 部署
kubectl apply -f seata-server.yaml

部署完成后,会有三个 pod

kubectl get pod | grep seata-ha-server

seata-ha-server-645844b8b6-9qh5j 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-pzczs 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-wkpw8 1/1 Running 0 3m14s

待启动完成后,可以在 Nacos 的服务列表中发现三个 seata-server 的实例,至此,已经完成 seata-server 的高可用部署

  • 查看服务日志
kubelet logs -f seata-ha-server-645844b8b6-9qh5j
[0.012s][info   ][gc] Using Serial
2020-04-15 00:55:09.880 INFO [main]io.seata.server.ParameterParser.init:90 -The server is running in container.
2020-04-15 00:55:10.013 INFO [main]io.seata.config.FileConfiguration.<init>:110 -The configuration file used is file:/root/seata-config/registry.conf
2020-04-15 00:55:12.426 INFO [main]com.alibaba.druid.pool.DruidDataSource.init:947 -{dataSource-1} inited
2020-04-15 00:55:13.127 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started

其中{dataSource-1} 说明使用了数据库,并正常初始化完成

  • 查看注册中心,此时seata-serve 这个服务会有三个实例

seata-ha-nacos-list.png

部署业务服务

  • 创建业务表并初始化数据

具体的业务表可以参考 cloud-seata-nacos/README.md

  • 添加 Nacos 配置

在 public 的命名空间下,分别创建 data-id 为 order-service.properties, pay-service.properties, storage-service.properties 的配置,内容相同,需要修改数据库的地址、用户名和密码

# MySQL
spring.datasource.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Seata
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
  • 部署服务

通过 application.yaml 配置文件部署服务,需要注意的是修改 ConfigMap 的 NACOS_ADDR为自己的 Nacos 地址

apiVersion: v1
kind: Service
metadata:
namespace: default
name: seata-ha-service
labels:
app.kubernetes.io/name: seata-ha-service
spec:
type: NodePort
ports:
- port: 8081
nodePort: 30081
protocol: TCP
name: http
selector:
app.kubernetes.io/name: seata-ha-service

---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-ha-service-config
data:
NACOS_ADDR: 192.168.199.2:8848

---
apiVersion: v1
kind: ServiceAccount
metadata:
name: seata-ha-account
namespace: default

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: seata-ha-account
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: seata-ha-account
namespace: default

---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: default
name: seata-ha-service
labels:
app.kubernetes.io/name: seata-ha-service
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: seata-ha-service
template:
metadata:
labels:
app.kubernetes.io/name: seata-ha-service
spec:
serviceAccountName: seata-ha-account
containers:
- name: seata-ha-order-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-order-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8081
protocol: TCP
- name: seata-ha-pay-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-pay-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8082
protocol: TCP
- name: seata-ha-storage-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-storage-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8083
protocol: TCP

通过以下命令,将应用部署到集群中

kubectl apply -f application.yaml

然后查看创建的 pod,seata-ha-service 这个服务下有三个 pod

kubectl get pod | grep seata-ha-service

seata-ha-service-7dbdc6894b-5r8q4 3/3 Running 0 12m

待应用启动后,在 Nacos 的服务列表中,会有相应的服务

seata-ha-service-list.png

此时查看服务的日志,会看到服务向每一个 TC 都注册了

kubectl logs -f seata-ha-service-7dbdc6894b-5r8q4 seata-ha-order-service

seata-ha-service-register.png

查看任意的 TC 日志,会发现每一个服务都向 TC 注册了

kubelet logs -f seata-ha-server-645844b8b6-9qh5j

seata-ha-tc-register.png

测试

测试成功场景

调用下单接口,将 price 设置为 1,因为初始化的余额为 10,可以下单成功

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 1
}'

此时返回结果为:

{"success":true,"message":null,"data":null}

查看TC 的日志,事务成功提交:

seata-ha-commit-tc-success.png

查看 order-service 服务日志 seata-ha-commit-success.png

测试失败场景

设置 price 为 100,此时余额不足,会下单失败抛出异常,事务会回滚

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 100
}'

查看 TC 的日志: seata-ha-commit-tc-rollback.png

查看服务的日志 : seata-ha-commit-service-rollback.png

多次调用查看服务日志,发现会随机的向其中某台TC发起事务注册,当扩容或缩容后,有相应的 TC 参与或退出,证明高可用部署生效

· 阅读需 8 分钟

一 . 导读

根据大佬定义的分类,配置可以有三种:环境配置、描述配置、扩展配置。

环境配置:像一些组件启动时的参数等,通常是离散的简单值,多是 key-value 型数据。

描述配置:与业务逻辑相关,比如:事务发起方和参与方,通常会嵌到业务的生命周期管理中。描述配置信息较多,甚至有层次关系。

扩展配置:产品需要发现第三方实现,对配置的聚合要求比较高,比如:各种配置中心和注册中心,通常做法是在 jar 包的 META-INF/services 下放置接口类全名文件,内容为每行一个实现类类名。

二. 环境配置

seata server 在加载的时候,会使用 resources/registry.conf 来确定配置中心和注册中心的类型。而 seata client 在 1.0 版本后,不仅能使用 conf 文件进行配置的加载,也可以在 springboot 的 yml 配置文件中,使用 seata.config.{type} 来进行配置中心的选择,注册中心与之类似。通过 yml 加载配置的源码在 io.seata.spring.boot.autoconfigure.properties.registry 包下。

如果 seata 客户端的使用者既在 resources 下放了 conf 配置文件又在 yml 文件中配置,那么会优先使用 yml 中配置的。代码:

CURRENT_FILE_INSTANCE = null == extConfiguration ? configuration : extConfiguration;

这里 extConfiguration 是外部配置实例,即 ExtConfigurationProvider#provide() 外部配置提供类提供的,而 configuration 是另一个配置提供类提供的 ConfigurationProvider#provide(),这两个配置提供类是在 config 模块 ConfigurationFactory 静态块中,通过 SPI 的方式加载。

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);

上面说的是配置中心类型的选择,而配置环境的加载,是在确定了使用什么配置中心类型后,再通过相应的配置中心加载环境配置。File 即文本方式配置也是一种配置中心。

client 和 server 获取配置参数,是通过 ConfigurationFactory#getInstance() 获取配置类实例,再使用配置类实例获取配置参数,配置的 key 这些常量的定义,主要在 core 模块下 config 文件中。

一些重要的环境配置属性的意义,官网都有介绍

在实例化的时候通过 ConfigurationFactory 获取后注入构造函数中的,需要重启才能生效,而在使用时通过 ConfigurationFactory 实时获取的,配置改了就可以生效。

但是 config 模块提供了 ConfigurationChangeListener#onChangeEvent 接口方法来修改实例内部的属性。即在这个方法中,监听动态变化的属性,如果检测到自身使用的属性和刚开始注入时不一样了,就修改实例中保存的属性,和配置中心保持一致,这样就实现了动态配置。

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener {
private volatile boolean disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,false);
@Override public Object invoke(Param param) {
if(disable){//事务业务处理}
}
@Override public void onChangeEvent(Param param) {
disable = param;
}}

上面是 spring 模块下的 GlobalTransactionalInterceptor 与降级属性相关的伪代码。 GlobalTrarnsactionalScanner 在上面的 interceptor 类被实例化时,把 interceptor 注册到了配置变化监听列表中,当配置被改变的时候,会调用监听器:

ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);

降级的意思是,当服务某一项功能不可用的时候,通过动态配置的属性,把某一项功能给关了,这样就可以避免一直尝试失败的处理。interceptor#invoke() 只有当这个 disable 属性为 true 时,才会执行 seata 事务相关业务。

三. 描述配置

一般性框架描述性配置通常信息比较多,甚至有层次关系,用 xml 配置比较方便,因为树结构描述性更强。而现在的习惯都在提倡去繁琐的约束性配置,采用约定的方式。

seata AT 模式是通过代理数据源的方式来进行事务处理,对业务方入侵较小,只需让 seata 在启动时,识别哪些业务方需要开启全局事务,所以用注解就可以实现描述性配置。

@GlobalTransactional(timeoutMills = 300000, name = "busi-doBiz")
public String doBiz(String msg) {}

如果是 tcc 模式,事务参与方还需使用注解标识:

@TwoPhaseBusinessAction(name = "tccActionForSpringTest" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, int i);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);

四 .扩展配置

扩展配置,通常对产品的聚合要求比较高,因为产品需要发现第三方实现,将其加入产品内部。

在这里插入图片描述 这是一个自定义配置中心提供类的例子,在 META-INF/services 下放置一个接口同名的文本文件,文件的内容为接口的实现类。这是标准的 spi 方式。然后修改配置文件 registry.conf 中的 config.type=test 。

但是如果你认为这样就可以被 seata 识别到,并且替换掉配置中心,那你就错了。seata 在加载配置中心的时候,使用 enum ConfigType 包裹了一下配置文件中配置的配置中心的类型的值:

private static Configuration buildConfiguration() {
configTypeName = "test";//registry.conf中配置的config.type
configType = ConfigType.getType(configTypeName);//ConfigType获取不到会抛异常
}

如果在 ConfigType 中没有定义 test 这种配置中心类型,那么会抛异常。所以单纯的修改配置文件而不改变源码是无法使用 ConfigType 中定义的配置中心提供类以外的配置中心提供类。

目前 1.0 版本在 ConfigType 中定义的配置中心类型有:File,ZK,Nacos,Apollo,Consul,Etcd3,SpringCloudConfig,Custom。如果用户想使用自定义的配置中心类型,可以使用 Custom 这种类型。

在这里插入图片描述 这里可以使用不优雅的方式,即提供一个指定名称 ZK 但是级别 order=3 更高的实现类(ZK 默认 order=1),就可以让 ConfigurationFactory 使用 TestConfigurationProvider 作为配置中心提供类。

通过上面的步骤,就可以让 seata 使用我们自己提供的代码。seata 中 codec、compressor、discovery、integration 等模块,都是使用 spi 机制加载功能类,符合微内核 + 插件化,平等对待第三方的设计思想。

五 . seata 源码分析系列地址

作者:赵润泽,系列地址

· 阅读需 4 分钟

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

​ 1.首先来看下包结构,在 seata-dubbo 和 seata-dubbo-alibaba 下有统一由 TransactionPropagationFilter 这个类,分别对应 apache-dubbo 跟 alibaba-dubbo.

20200101203229

分析源码

package io.seata.integration.dubbo;

import io.seata.core.context.RootContext;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100)
public class TransactionPropagationFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//获取本地XID
String xid = RootContext.getXID();
String xidInterceptorType = RootContext.getXIDInterceptorType();
//获取Dubbo隐式传参中的XID
String rpcXid = getRpcXid();
String rpcXidInterceptorType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
//传递XID
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE, xidInterceptorType);
} else {
if (rpcXid != null) {
//绑定XID
RootContext.bind(rpcXid);
RootContext.bindInterceptorType(rpcXidInterceptorType);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] interceptorType[{}] to RootContext", rpcXid, rpcXidInterceptorType);
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
//进行剔除已完成事务的XID
String unbindInterceptorType = RootContext.unbindInterceptorType();
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[{}] interceptorType[{}] from RootContext", unbindXid, unbindInterceptorType);
}
//如果发现解绑的XID并不是当前接收到的XID
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}, xidInterceptorType from {} to {} ", rpcXid, unbindXid, rpcXidInterceptorType, unbindInterceptorType);
if (unbindXid != null) {
//重新绑定XID
RootContext.bind(unbindXid);
RootContext.bindInterceptorType(unbindInterceptorType);
LOGGER.warn("bind [{}] interceptorType[{}] back to RootContext", unbindXid, unbindInterceptorType);
}
}
}
}
}

/**
* get rpc xid
* @return
*/
private String getRpcXid() {
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (rpcXid == null) {
rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());
}
return rpcXid;
}

}

​ 1.根据源码,我们可以推出相应的逻辑处理

20200101213336

要点知识

​ 1.Dubbo @Activate 注解:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* Group过滤条件。
* <br />
* 包含{@link ExtensionLoader#getActivateExtension}的group参数给的值,则返回扩展。
* <br />
* 如没有Group设置,则不过滤。
*/
String[] group() default {};

/**
* Key过滤条件。包含{@link ExtensionLoader#getActivateExtension}的URL的参数Key中有,则返回扩展。
* <p/>
* 示例:<br/>
* 注解的值 <code>@Activate("cache,validatioin")</code>,
* 则{@link ExtensionLoader#getActivateExtension}的URL的参数有<code>cache</code>Key,或是<code>validatioin</code>则返回扩展。
* <br/>
* 如没有设置,则不过滤。
*/
String[] value() default {};

/**
* 排序信息,可以不提供。
*/
String[] before() default {};

/**
* 排序信息,可以不提供。
*/
String[] after() default {};

/**
* 排序信息,可以不提供。
*/
int order() default 0;
}

可以分析得知,Seata 的 dubbo 过滤器上的注解@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100),表示 dubbo 的服务提供方跟消费方都会触发到这个过滤器,所以我们的 Seata 发起者会产生一个 XID 的传递,上述流程图跟代码已经很清晰的表示了.

​ 2.Dubbo 隐式传参可以通过 RpcContext 上的 setAttachmentgetAttachment 在服务消费方和提供方之间进行参数的隐式传递。

获取:RpcContext.getContext().getAttachment(RootContext.KEY_XID);

传递:RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);

总结

更多源码阅读请访问Seata 官网

· 阅读需 11 分钟

一 .导读

spring 模块分析中讲到,Seata 的 spring 模块会对涉及到分布式业务的 bean 进行处理。项目启动时,当 GlobalTransactionalScanner 扫描到 TCC 服务的 reference 时(即tcc事务参与方),会对其进行动态代理,即给 bean 织入 TCC 模式下的 MethodInterceptor 的实现类。tcc 事务发起方依然使用 @GlobalTransactional 注解开启,织入的是通用的 MethodInterceptor 的实现类。

TCC 模式下的 MethodInterceptor 实现类即 TccActionInterceptor(spring模块) ,这个类中调用了 ActionInterceptorHandler(tcc模块) 进行 TCC 模式下事务流程的处理。

TCC 动态代理的主要功能是:生成TCC运行时上下文、透传业务参数、注册分支事务记录。

二 .TCC模式介绍

在2PC(两阶段提交)协议中,事务管理器分两阶段协调资源管理,资源管理器对外提供三个操作,分别是一阶段的准备操作,和二阶段的提交操作和回滚操作。

public interface TccAction {

@TwoPhaseBusinessAction(name = "tccActionForTest" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "a") int a,
@BusinessActionContextParameter(paramName = "b", index = 0) List b,
@BusinessActionContextParameter(isParamInProperty = true) TccParam tccParam);

public boolean commit(BusinessActionContext actionContext);

public boolean rollback(BusinessActionContext actionContext);
}

这是 TCC 参与者实例,参与者需要实现三个方法,第一个参数必须是 BusinessActionContext ,方法返回类型固定,对外发布成微服务,供事务管理器调用。

prepare:资源的检查和预留。例:扣减账户的余额,并增加相同的冻结余额。

commit:使用预留的资源,完成真正的业务操作。例:减少冻结余额,扣减资金业务完成。

cancel:释放预留资源。例:冻结余额加回账户的余额。

其中 BusinessActionContext 封装了本次事务的上下文环境:xid、branchId、actionName 和被 @BusinessActionContextParam 注解的参数等。

参与方业务有几个需要注意的地方: 1.控制业务幂等性,需要支持同一笔事务的重复提交和重复回滚。 2.防悬挂,即二阶段的回滚,比一阶段的 try 先执行。 3.放宽一致性协议,最终一致,所以是读已修改

三 . remoting 包解析

在这里插入图片描述

包中所有的类都是为包中的 DefaultRemotingParser 服务,Dubbo、LocalTCC、SofaRpc 分别负责解析各自RPC协议下的类。

DefaultRemotingParser 的主要方法: 1.判断 bean 是否是 remoting bean,代码:

    @Override
public boolean isRemoting(Object bean, String beanName) throws FrameworkException {
//判断是否是服务调用方或者是否是服务提供方
return isReference(bean, beanName) || isService(bean, beanName);
}

2.远程 bean 解析,把 rpc类 解析成 RemotingDesc,,代码:

@Override
public boolean isRemoting(Object bean, String beanName) throws FrameworkException {
//判断是否是服务调用方或者是否是服务提供方
return isReference(bean, beanName) || isService(bean, beanName);
}

利用 allRemotingParsers 来解析远程 bean 。allRemotingParsers是在:initRemotingParser() 中调用EnhancedServiceLoader.loadAll(RemotingParser.class) 动态进行 RemotingParser 子类的加载,即 SPI 加载机制。

如果想扩展,比如实现一个feign远程调用的解析类,只要把RemotingParser相关实现类写在 SPI 的配置中就可以了,扩展性很强。

RemotingDesc 事务流程需要的远程 bean 的一些具体信息,比如 targetBean、interfaceClass、interfaceClassName、protocol、isReference等等。

3.TCC资源注册

public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName) {
RemotingDesc remotingBeanDesc = getServiceDesc(bean, beanName);
if (remotingBeanDesc == null) {
return null;
}
remotingServiceMap.put(beanName, remotingBeanDesc);

Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if (isService(bean, beanName)) {
try {
//service bean, registry resource
Object targetBean = remotingBeanDesc.getTargetBean();
for (Method m : methods) {
TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
TCCResource tccResource = new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
tccResource.setTargetBean(targetBean);
tccResource.setPrepareMethod(m);
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
tccResource.setCommitMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),
new Class[] {BusinessActionContext.class}));
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
tccResource.setRollbackMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),
new Class[] {BusinessActionContext.class}));
//registry tcc resource
DefaultResourceManager.get().registerResource(tccResource);
}
}
} catch (Throwable t) {
throw new FrameworkException(t, "parser remoting service error");
}
}
if (isReference(bean, beanName)) {
//reference bean, TCC proxy
remotingBeanDesc.setReference(true);
}
return remotingBeanDesc;
}

首先判断是否是事务参与方,如果是,拿到 RemotingDesc 中的 interfaceClass,遍历接口中的方法,判断方法上是否有@TwoParserBusinessAction 注解,如果有,把参数封装成 TCCRecource,通过 DefaultResourceManager 进行 TCC 资源的注册。

这里 DefaultResourceManager 会根据 Resource 的 BranchType 来寻找对应的资源管理器,TCC 模式下资源管理类,在 tcc 模块中。

这个 rpc 解析类主要提供给 spring 模块进行使用。parserRemotingServiceInfo() 被封装到了 spring 模块的 TCCBeanParserUtils 工具类中。spring 模块的 GlobalTransactionScanner 在项目启动的时候,通过工具类解析 TCC bean,工具类 TCCBeanParserUtils 会调用 TCCResourceManager 进行资源的注册,并且如果是全局事务的服务提供者,会织入 TccActionInterceptor 代理。这些个流程是 spring 模块的功能,tcc 模块是提供功能类给 spring 模块使用。

三 .tcc 资源管理器

TCCResourceManager 负责管理 TCC 模式下资源的注册、分支的注册、提交、和回滚。

1.在项目启动时, spring 模块的 GlobalTransactionScanner 扫描到 bean 是 tcc bean 时,会本地缓存资源,并向 server 注册:

    @Override
public void registerResource(Resource resource) {
TCCResource tccResource = (TCCResource)resource;
tccResourceCache.put(tccResource.getResourceId(), tccResource);
super.registerResource(tccResource);
}

与server通信的逻辑被封装在了父类 AbstractResourceManage 中,这里根据 resourceId 对 TCCResource 进行缓存。父类 AbstractResourceManage 注册资源的时候,使用 resourceGroupId + actionName,actionName 就是 @TwoParseBusinessAction 注解中的 name,resourceGroupId 默认是 DEFAULT。

2.事务分支的注册在 rm-datasource 包下的 AbstractResourceManager 中,注册时参数 lockKeys 为 null,和 AT 模式下事务分支的注册还是有些不一样的。

3.分支的提交或者回滚:

    @Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
}
try {
boolean result = false;
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
LOGGER.error(msg, t);
throw new FrameworkException(t, msg);
}
}

通过参数 xid、branchId、resourceId、applicationData 恢复业务的上下文 businessActionContext。

根据获取到的上下文通过反射执行 commit 方法,并返回执行结果。回滚方法类似。

这里 branchCommit() 和 branchRollback() 提供给 rm 模块资源处理的抽象类 AbstractRMHandler 调用,这个 handler 是 core 模块定义的模板方法的进一步实现类。和 registerResource() 不一样,后者是 spring 扫描时主动注册资源。

四 . tcc 模式事务处理

spring 模块中的 TccActionInterceptor 的 invoke() 方法在被代理的 rpc bean 被调用时执行。该方法先获取 rpc 拦截器透传过来的全局事务 xid ,然后 TCC 模式下全局事务参与者的事务流程还是交给 tcc 模块 ActionInterceptorHandler 处理。

也就是说,事务参与者,在项目启动的时候,被代理。真实的业务方法,在 ActionInterceptorHandler 中,通过回调执行。

    public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback) throws Throwable {
Map<String, Object> ret = new HashMap<String, Object>(4);

//TCC name
String actionName = businessAction.name();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
//set action anme
actionContext.setActionName(actionName);

//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);

//set the parameter whose type is BusinessActionContext
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
//the final parameters of the try method
ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
//the final result
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
return ret;
}

这里有两个重要操作:

1.doTccActionLogStore() 这个方法中,调用了两个比较重要的方法: fetchActionRequestContext(method, arguments),这个方法把被 @BusinessActionContextParam 注解的参数取出来,在下面的 init 方法中塞入 BusinessActionComtext ,同时塞入的还有事务相关参数。 DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null),这个方法执行 TCC 模式下事务参与者事务分支的注册。

2.回调执行 targetCallback.execute() ,被代理的 bean 具体的业务,即 prepare() 方法。

五 .总结

tcc模块,主要提供以下功能 :

  1. 定义两阶段协议注解,提供 tcc 模式下事务流程需要的属性。
  2. 提供解析不同 rpc 框架 remoting bean 的 ParserRemoting 实现,供 spring 模块调用。
  3. 提供 TCC 模式下资源管理器,进行资源注册、事务分支注册提交回滚等。
  4. 提供 TCC 模式下事务流程的处理类,让 MethodInterceptor 代理类不执行具体模式的事务流程,而是下放到 tcc 模块。

五 .相关

作者:赵润泽,系列地址

· 阅读需 1 分钟

活动介绍

亮点解读

  • Seata 开源项目发起人带来《Seata 过去、现在和未来》以及 Seata 1.0 的新特性。
  • Seata 核心贡献者详解 Seata AT, TCC, Saga 模式。
  • Seata 落地互联网医疗,滴滴出行实践剖析。

如您不能前来参会

现场福利

  • 讲师 PPT 打包下载
  • 精美茶歇,阿里公仔,天猫精灵等好礼等你来拿

议程

· 阅读需 9 分钟

一 . 导读

core 模块定义了事务的类型、状态,通用的行为,client 和 server 通信时的协议和消息模型,还有异常处理方式,编译、压缩类型方式,配置信息名称,环境 context 等,还基于 netty 封装了 rpc ,供客户端和服务端使用。

按包顺序来分析一下 core 模块主要功能类:

在这里插入图片描述

codec:定义了一个 codec 的工厂类,提供了一个方法,根据序列化类型来找对应的处理类。还提供了一个接口类 Codec ,有两个抽象方法:

<T> byte[] encode(T t);
<T> T decode(byte[] bytes);

目前 1.0 版本在 codec 模块,有三种序列化的实现:SEATA、PROTOBUF、KRYO。

compressor:和 codec 包下面类一样,都是三个类,一个压缩类型类,一个工厂类,一个压缩和解压缩操作的抽象类。1.0 版本就只有一种压缩方式:Gzip

constants:两个 ClientTableColumnsName、ServerTableColumnsName 类,分别是 client 端存储事务的表和 server 端存储事务表对应的 model 类。还有定义支持的数据库类型类和一些定义配置信息属性的前缀的类。

context:环境类 RootContext 持有一个 ThreadLocalContextCore 用来存储事务的标识信息。比如 TX_XID 用来唯一的表示一个事务。TX_LOCK 如果存在,则表示本地事务对于 update/delete/insert/selectForUpdate SQL 需要用全局锁控制。

event:这里用到了 guava 中 EventBus 事件总线来进行注册和通知,监听器模式。在 server 模块的 metrics 包中,MetricsManager 在初始化的时候,对 GlobalStatus 即 server 模块处理事务的几个状态变化时,注册了监挺事件,当 server 处理事务时,会回调监听的方法,主要是为了进行统计各种状态事务的数量。

lock: server 在收到 registerBranch 消息进行分支注册的时候,会加锁。1.0 版本有两种锁的实现,DataBaseLocker 和 MemoryLocker,分别是数据库锁和内存锁,数据库锁根据 rowKey = resourceId + tableName + pk 进行加锁,内存锁直接就是根据 primary key。

model:BranchStatus、GlobalStatus、BranchType 用来定义事务的类型和全局、分支状态。还有 TransactionManager 和 ResourceManager,是 rm 和 tm 的抽象类。具体的 rm 和 tm 的实现,因为各种事务类型都不同,所以这里没有具体的实现类。

protocol:定义了 rpc 模块传输用的实体类,即每个事务状态场景下 request 和 response 的 model。

store:定了与数据库打交道的数据模型,和与数据库交互的语句。

二 . exception 包中 handler 类分析

这是 AbstractExceptionHandler 的 UML 图,Callback 、AbstractCallback 是 AbstractExceptionHandler 的内部接口和内部类,AbstractCallback 抽象类实现了接口 Callback 的三个方法,还有一个 execute() 未实现。AbstractExceptionHandler 使用了 AbstractCallback 作为模板方法的参数,并使用了其实现的三个方法,但是 execute() 方法仍留给子类实现。 在这里插入图片描述 从对外暴露的角度看 AbstractExceptionHandler 定义了一个带有异常处理的模板方法,模板中有四个行为,在不同的情况下执行,其中三种行为已经实现,执行的行为交由子类自行实现,详解:

1.使用模板方法模式,在 exceptionHandlerTemplate() 中,定义好了执行的模板

    public void exceptionHandleTemplate(Callback callback, AbstractTransactionRequest request,
AbstractTransactionResponse response) {
try {
callback.execute(request, response); //执行事务业务的方法
callback.onSuccess(request, response); //设置response返回码
} catch (TransactionException tex) {
LOGGER.error("Catch TransactionException while do RPC, request: {}", request, tex);
callback.onTransactionException(request, response, tex); //设置response返回码并设置msg
} catch (RuntimeException rex) {
LOGGER.error("Catch RuntimeException while do RPC, request: {}", request, rex);
callback.onException(request, response, rex); //设置response返回码并设置msg
}
}

onSuccess、onTransactionException、onException 在 AbstarctCallback 中已经被实现,execute 则由 AbstractExceptionHandler 子类即负责不同事务模式的 handler 类进行实现。 AbstractExceptionHandler 目前有两个子类:AbstractTCInboundHandler 负责处理全局事务的业务,AbstractRMHandler 负责处理分支事务的业务。

2.使用回调机制,优点是:允许 AbstractExceptionHandler 把需要调用的类 Callback 作为参数传递进来,handler 不需要知道 callback 的具体执行逻辑,只要知道 callback 的特性原型和限制条件(参数、返回值),就可以使用了。

先使用模板方法,把事务业务流程定下来,再通过回调,把具体执行事务业务的方法,留给子类实现。设计的非常巧妙。

这个 exceptionHandlerTemplate() 应该翻译成带有异常处理的模板方法。异常处理已经被抽象类实现,具体的不同模式下 commit 、rollback 的业务处理则交给子类实现。

三 . rpc 包分析

seata 对于 rpc 的封装,细节不需要纠结,可以研究一下一下对于事务业务的处理。

client 端的 rpc 类是 AbstractRpcRemotingClient: 在这里插入图片描述

重要的属性和方法都在类图中,消息发送和初始化方法没画在类图中,详细分析一下类图:

clientBootstrap:是 netty 启动类 Bootstrap 的封装类,持有了 Bootstrap 的实例,并自定义自己想要的属性。

clientChannelManager:使用 ConcurrentHashMap<serverAddress,channel> 容器维护地址和 channel 的对应关系。

clientMessageListener: 消息的处理类,根据消息的类型的不同有三种具体的处理方法

public void onMessage(RpcMessage request, String serverAddress, ClientMessageSender sender) {
Object msg = request.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("onMessage:" + msg);
}
if (msg instanceof BranchCommitRequest) {
handleBranchCommit(request, serverAddress, (BranchCommitRequest)msg, sender);
} else if (msg instanceof BranchRollbackRequest) {
handleBranchRollback(request, serverAddress, (BranchRollbackRequest)msg, sender);
} else if (msg instanceof UndoLogDeleteRequest) {
handleUndoLogDelete((UndoLogDeleteRequest)msg);
}
}

消息类中,持有 TransactionMessageHandler 对不同类型消息进行处理,最终会根据事务类型的不同(AT、TCC、SAGE)调用具体的处理类,即第二部分说的 exceptionHandleTemplate() 的实现类。

mergeSendExecutorService:是一个线程池,只有一个线程,负责对不同地址下的消息进行和并发送。在 sendAsyncRequest() 中,会给线程池的队列 LinkedBlockingQueue<> offer 消息,然后这个线程负责 poll 和处理消息。

channelRead():处理服务端的 HeartbeatMessage.PONG 心跳消息。还有消息类型是 MergeResultMessage 即异步消息的响应消息,根据 msgId 找到对应 MessageFuture ,并设置异步消息的 result 结果。

dispatch():调用 clientMessageListener 处理 server 发送过来的消息,不同类型 request 有不同的处理类。

简单点看 netty,只需要关注序列化方式和消息处理 handler 类。seata 的 rpc 序列化方式通过工厂类找 Codec 实现类进行处理,handler 即上文说的 TransactionMessageHandler 。

四 . 总结

core 模块涉及的功能很多,其中的类大多都是其他模块的抽象类。抽象出业务模型,具体的实现分布在不同的模块。core 模块的代码非常的优秀,很多设计都是经典,比如上文分析的基于模板模式改造的,非常实用也非常美,值得仔细研究。

五 . seata 源码分析系列地址

系列地址