跳到主要内容

· 阅读需 55 分钟

在前几篇文章中,我们详细聊了聊 Seata 的 XA、AT 以及 TCC 模式,它们都是在 Seata 定义的全局框架下的不同的事务模式。

我们知道,在 Seata 中,有三类角色,TC、RM、TM,Seata Server 作为 TC 协调分支事务的提交和回滚,各个资源作为 RM 和 TM,那么这三者之间是如何通信的?

所以,这篇文章就来看看 Seata 底层是如何进行网络通信的。

整体类层次结构

我们先着眼大局,看一看 Seata 整个 RPC 的类层次结构。

image-20241217222005964

从类结构层次可以看出来,AbstractNettyRemoting 是整个 Seata 网络通信的一个顶层抽象类。

在这个类中主要实现了一些 RPC 的基础通用方法,比如同步调用 sendSync、异步调用 sendAsync 等。

事实上,就网络调用来说,无非就是同步调用和异步调用,像其他的什么请求和响应都只是报文内容的区分。

所以,在 Seata 中,我个人认为还差一个顶层的接口 Remoting,类似于下面这样的:

import io.netty.channel.Channel;
import java.util.concurrent.TimeoutException;

public interface Remoting<Req, Resp> {

/**
* 同步调用
*/
Resp sendSync(Channel channel, Req request, long timeout) throws TimeoutException;

/**
* 异步调用
*/
void sendAsync(Channel channel, Req request);
}

在 AbstractNettyRemoting 实现了通用的网络调用方法,但是不同角色在这方面还是有一些区分的,比如对于 Server 来说,它的请求调用需要知道向哪个客户端发送,而对于 TM、RM 来说,它们发送请求直接发就行,不需要指定某个特定的 TC 服务,只需要在实现类通过负载均衡算法找到合适的 Server 节点就行。

所以就区分出了 RemotingServer 和 RemotingClient,但是底层还是要依赖 AbstractNettyRemoting 进行网络调用的,所以它们各自有子类实现了 AbstractNettyRemoting。

可以说 Seata 的这种设计在我看来是非常不错的,对于这种 CS 架构的远程通信,可以算一种通用的设计方案。

如何启动 Server 和 Client

聊完了 Seata 底层的类层次,我们再分别以 Server 和 Client 的视角来看它们是如何启动的,以及在启动的时候需要做些什么事情。

Server 是怎么启动的

Seata Server 作为一个独立的 SpringBoot 项目,要怎么样才能在 SpringBoot 启动的时候自动做点事呢?

Seata 的做法是实现了 CommandLineRunner 接口,至于这里面的原理就不是本篇文章讨论的内容了。

我们主要关注它的 run 方法:

// org.apache.seata.server.ServerRunner#run
public void run(String... args) {
try {
long start = System.currentTimeMillis();
seataServer.start(args);
started = true;
long cost = System.currentTimeMillis() - start;
LOGGER.info("\r\n you can visit seata console UI on http://127.0.0.1:{}. \r\n log path: {}.", this.port, this.logPath);
LOGGER.info("seata server started in {} millSeconds", cost);
} catch (Throwable e) {
started = Boolean.FALSE;
LOGGER.error("seata server start error: {} ", e.getMessage(), e);
System.exit(-1);
}
}

这其中核心的逻辑就在 seataServer.start() 方法中:

// org.apache.seata.server.Server#start
public void start(String[] args) {
// 参数解析器,用于解析 sh 的启动参数
ParameterParser parameterParser = new ParameterParser(args);
// initialize the metrics
MetricsManager.get().init();
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
// 127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
if (StringUtils.isNotBlank(preferredNetworks)) {
XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
}
/**
* 主要做这么几件事:
* 1. 设置 workingThreads 为 AbstractNettyRemoting 的 messageExecutor 处理器
* 2. 创建 ServerBootstrap,配置 Boss 和 Worker,并且设置 Seata Server 需要监听的端口
* 3. 设置出栈、入栈处理器 ServerHandler,它是一个 ChannelDuplexHandler 复合的处理器
*/
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
XID.setPort(nettyRemotingServer.getListenPort());
UUIDGenerator.init(parameterParser.getServerNode());
ConfigurableListableBeanFactory beanFactory = ((GenericWebApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT)).getBeanFactory();
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
if (coordinator instanceof ApplicationListener) {
beanFactory.registerSingleton(NettyRemotingServer.class.getName(), nettyRemotingServer);
beanFactory.registerSingleton(DefaultCoordinator.class.getName(), coordinator);
((GenericWebApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT)).addApplicationListener((ApplicationListener<?>) coordinator);
}
// log store mode: file, db, redis
SessionHolder.init();
LockerManagerFactory.init();
// 初始化一系列定时线程池,用于重试事务提交/回滚等
coordinator.init();
// 设置事务处理 Handler 为 DefaultCoordinator
nettyRemotingServer.setHandler(coordinator);
serverInstance.serverInstanceInit();
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
// Server 初始化
nettyRemotingServer.init();
}

最后的 nettyRemotingServer.init() 是整个 Seata Server 启动的重要逻辑,主要做了这么几件事:

  1. 注册一系列处理器
  2. 初始化一个定时线程池,用于清理过期的 MessageFuture
  3. 启动 ServerBootStrap 并将 TC 服务注册到注册中心,比如 Nacos

注册处理器

在 Seata 内部,用一个 Pair 对象关联了处理器和线程池,如下:

package org.apache.seata.core.rpc.processor;

public final class Pair<T1, T2> {

private final T1 first;
private final T2 second;

public Pair(T1 first, T2 second) {
this.first = first;
this.second = second;
}

public T1 getFirst() {
return first;
}

public T2 getSecond() {
return second;
}
}

而注册处理器本质就是将报文类型、处理该报文的处理器以及具体执行的线程池关联起来,存到一张哈希表中。

// AbstractNettyRemotingServer
protected final Map<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
// org.apache.seata.core.rpc.netty.NettyRemotingServer#registerProcessor
private void registerProcessor() {
// 1. registry on request message processor
ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler());
ShutdownHook.getInstance().addDisposable(onRequestProcessor);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. registry on response message processor
ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
// 3. registry rm message processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. registry tm message processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}


// org.apache.seata.core.rpc.netty.AbstractNettyRemotingServer#registerProcessor
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}

你可能会注意到,在注册处理器时,有一些注册时传入的线程池是 null,那么对应的报文会由哪个线程执行呢?

后面我们会提到。

初始化定时线程池

// org.apache.seata.core.rpc.netty.AbstractNettyRemoting#init
public void init() {
timerExecutor.scheduleAtFixedRate(() -> {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
MessageFuture future = entry.getValue();
if (future.isTimeout()) {
futures.remove(entry.getKey());
RpcMessage rpcMessage = future.getRequestMessage();
future.setResultMessage(new TimeoutException(String.format("msgId: %s, msgType: %s, msg: %s, request timeout",
rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();
}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}

这个没啥好说的,就是初始化了一个定时线程池定时清理那些超时的 MessageFuture,这里 MessageFuture 是 Seata 将异步调用转为同步调用的关键,我们后面也会详细说到。

启动 ServerBootStrap

最后启动 ServerBootStrap,这差不多就是 Netty 的内容了。

// org.apache.seata.core.rpc.netty.NettyServerBootstrap#start
public void start() {
int port = getListenPort();
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// 多版本协议解码器
MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(channelHandlers);
ch.pipeline()
.addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(multiProtocolDecoder);
}
});
try {
this.serverBootstrap.bind(port).sync();
LOGGER.info("Server started, service listen port: {}", getListenPort());
InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort());
for (RegistryService<?> registryService : MultiRegistryFactory.getInstances()) {
// 注册服务
registryService.register(address);
}
initialized.set(true);
} catch (SocketException se) {
throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);
} catch (Exception exx) {
throw new RuntimeException("Server start failed", exx);
}
}

ServerBootstrap 启动时的 childOption 属于网络部分的内容,我们不过多解释。

这里你可能有一点疑问,在 pipeline 中仅仅只是添加了一个 MultiProtocolDecoder 解码器,那业务处理器呢?

事实上,MultiProtocolDecoder 的构造参数中的 channelHandlers 就是 ServerHandler,它是在创建 NettyRemotingServer 时就被设置的。

至于为什么要这样做,其实是和 Seata 的多版本协议相关。

当 Seata Server 启动后第一次进行解码时,会将 MultiProtocolDecoder 从 pipeline 中移除,根据版本选择具体的 Encoder 和 Decoder 并添加到 pipeline 中,此时,也会将 ServerHandler 添加到 pipeline。

Client 是怎么启动的

对于 Client 来说,由于我们一般是在 SpringBoot 中使用 Seata,所以我们需要关注的点在 SeataAutoConfiguration 类中。

在这个类里面创建了一个 GlobalTransactionScanner 对象,我们注意到它实现了 InitializingBean,所以将目光转移到 afterPropertiesSet 方法上。

果然在这个方法里面进行了 TM 和 RM 的初始化。

TM 的初始化

对于 TM 来说,初始化的逻辑如下:

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
/**
* 主要做这么几件事
* 1. 创建线程池作为 AbstractNettyRemotingClient 的 messageExecutor
* 2. 设置事务角色 transactionRole 为 TM_ROLE
* 3. 创建 Bootstrap 并设置出栈、入栈处理器 ClientHandler
* 4. 创建客户端 Channel 管理器 NettyClientChannelManager
*/
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);

/**
* 主要做这么几件事:
* 1. 注册一系列处理器
* 2. 创建定时线程池定时对事务组内的 Server 发起连接,如果连接断开,则尝试重新建立连接
* 3. 如果客户端允许报文批量发送,则创建 mergeSendExecutorService 线程池,并提交 MergedSendRunnable 任务
* 4. 初始化一个定时线程池清理过期的 MessageFuture
* 5. 启动客户端 Bootstrap
* 6. 初始化连接 initConnection
*/
tmNettyRemotingClient.init();
}

启动客户端 Bootstrap 的逻辑如下:

@Override
public void start() {
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads()));
}
this.bootstrap.group(this.eventLoopGroupWorker)
.channel(nettyClientConfig.getClientChannelClazz())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
if (nettyClientConfig.enableNative()) {
if (PlatformDependent.isOsx()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("client run on macOS");
}
} else {
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(EpollChannelOption.TCP_QUICKACK, true);
}
}
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
.addLast(new ProtocolDecoderV1())
.addLast(new ProtocolEncoderV1());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});
if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
LOGGER.info("NettyClientBootstrap has started");
}
}

由于客户端的协议版本根据不同的 Seata 版本是可以确定的,所以这里直接添加了 V1 版本的编解码器,这里 channelHandlers 其实就是 ClientHandler,它也是 Netty 中的一个复合处理器。

RM 的初始化

RM 的初始化大致逻辑和 TM 是类似的,这里就不过多介绍了。

如何发送和处理报文

厘清了 Seata Server 和 Client 的大致启动流程之后,我们就可以深入的看一看 Seata 是如何进行报文发送和处理的。

前面我们也说过了,发送请求和处理报文的核心逻辑是在 AbstractNettyRemoting 中,接下来就看一看这个类。

同步和异步

先简单说一说什么是同步和异步。

同步 Synchronous 和异步 Asynchronous,本质上是描述了程序在处理多个事件或者任务时的不同行为模式。

同步是指一个过程必须等待另一个过程完成之后才能继续进行。换句话说,在同步操作中,调用方发出请求后会一直阻塞等待直到接收到响应结果、或者超时才会继续执行后续代码。

相比之下,异步则允许调用者在请求后不必等待响应就可以向下执行,但当请求完成时,会以某种方式将响应通知到调用者(如通过回调函数、Future),异步模型可以提高并发性和效率。

从另一个角度来说,同步调用需要发起调用的线程获取结果,而异步调用则是由异步线程将结果放到某个地方(Future)或者是异步线程去执行事先准备好的调用成功/失败的回调方法(回调函数)。

下面是一个简单的例子,展示了三种调用方式,同步、异步 Future、异步 Callback。

import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class AsyncTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTest.class);

public static void main(String[] args) throws InterruptedException, ExecutionException {
Result syncResponse = testSync();
LOGGER.info("同步响应结果: {}", syncResponse.getString());
CompletableFuture<Result> result = testAsyncFuture();
testAsyncCallback();
LOGGER.info("主线程继续向下执行~~");
TimeUnit.SECONDS.sleep(1); // 保证所有结果处理完毕
LOGGER.info("主线程从异步 Future 中获取结果: {}", result.get().getString());
}

public static void testAsyncCallback() {
new AsyncTask().execute(new AsyncCallback() {
@Override
public void onComplete(Result result) {
try {
TimeUnit.MILLISECONDS.sleep(50); // 模拟异步耗时
} catch (InterruptedException e) {
}
LOGGER.info("异步 Callback 获取结果: {}", result.getString());
}
});
}

public static CompletableFuture<Result> testAsyncFuture() {
return CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(50); // 模拟异步耗时
} catch (InterruptedException e) {
}
Result asyncResponse = getResult();
LOGGER.info("异步 Future 获取结果: {}", asyncResponse.getString());
return asyncResponse;
});
}

public static Result testSync() {
return getResult();
}

@Data
static class Result {
private String string;
}

interface AsyncCallback {
void onComplete(Result result);
}

static class AsyncTask {
void execute(AsyncCallback callback) {
new Thread(() -> {
Result asyncRes = getResult();
callback.onComplete(asyncRes);
}).start();
}
}

private static Result getResult() {
Result result = new Result();
result.setString("结果");
return result;
}
}

输出:

22:26:38.788 [main] INFO  org.hein.netty.AsyncTest - 同步响应结果: 结果
22:26:38.849 [main] INFO org.hein.netty.AsyncTest - 主线程继续向下执行~~
22:26:38.911 [Thread-0] INFO org.hein.netty.AsyncTest - 异步 Callback 获取结果: 结果
22:26:38.911 [ForkJoinPool.commonPool-worker-1] INFO org.hein.netty.AsyncTest - 异步 Future 获取结果: 结果
22:26:39.857 [main] INFO org.hein.netty.AsyncTest - 主线程从异步 Future 中获取结果: 结果

从结果中,至少可以看出三点,

  • 一是异步 Future 和异步 Callback 并不会阻塞主线程向下执行。
  • 二是异步调用时处理结果的不是主线程。
  • 最后,Future 和 Callback 的区别在于 Future 只是由异步线程将结果存储在了一个地方(CompletableFuture#result),但是后续获取结果还是需要主线程(或者其他线程)调用 get 方法,而 Callback 的话,其实就相当于预先设定了结果的处理方式,由异步线程去执行就好了。

当然,CompletableFuture 也是可以作回调的,比如调用 whenComplete 方法。

异步调用

Netty 作为一个高性能的异步 IO 框架,它的设计核心就是异步的,所以基于 Netty 进行异步调用是比较简单的。

protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
channelWritableCheck(channel, rpcMessage.getBody());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("write message: {}, channel: {}, active? {}, writable? {}, isopen? {}", rpcMessage.getBody(), channel, channel.isActive(), channel.isWritable(), channel.isOpen());
}
doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
destroyChannel(future.channel());
}
});
}

只需要简单调用 channel 的 writeAndFlush 方法即可实现异步调用。

特别要注意的是,writeAndFlush 方法在调用线程是 EventLoop 线程的情况下会变成同步调用。

同步调用

在 Netty 中实现异步调用很简单,要实现同步调用就麻烦一点,需要将异步调用转换为同步调用。

从本质上来说,异步转同步就是让调用线程发起调用后,拿到响应前进入阻塞,拿到响应后再唤醒它,向下执行。

那么 Seata 的处理的核心就是 MessageFuture 类,如下:

package org.apache.seata.core.protocol;

import org.apache.seata.common.exception.ShouldNeverHappenException;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class MessageFuture {

private RpcMessage requestMessage;
private long timeout;
private final long start = System.currentTimeMillis();

private final transient CompletableFuture<Object> origin = new CompletableFuture<>();

public boolean isTimeout() {
return System.currentTimeMillis() - start > timeout;
}

public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {
Object result;
try {
result = origin.get(timeout, unit);
if (result instanceof TimeoutException) {
throw (TimeoutException) result;
}
} catch (ExecutionException e) {
throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
} catch (TimeoutException e) {
throw new TimeoutException(String.format("%s, cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));
}
if (result instanceof RuntimeException) {
throw (RuntimeException) result;
} else if (result instanceof Throwable) {
throw new RuntimeException((Throwable) result);
}
return result;
}

public void setResultMessage(Object obj) {
origin.complete(obj);
}

public RpcMessage getRequestMessage() { return requestMessage; }

public void setRequestMessage(RpcMessage requestMessage) { this.requestMessage = requestMessage;}

public long getTimeout() { return timeout; }

public void setTimeout(long timeout) { this.timeout = timeout;}
}

有了这个类之后,同步调用的过程如下,我们以客户端请求、服务端响应为例:

  • 首先客户端将请求构建为 MessageFuture,然后将请求 id 和这个 MessageFuture 对象存储到一个哈希表中。
  • 接着客户端调用 channel.writeAndFlush 发起异步调用,是的,这里还是异步。
  • 异步转同步的核心在于,此时线程需要调用 MessageFuture 对象的 get 方法进入阻塞,当然实际是调用了 CompletableFuture 的 get 方法进入同步阻塞。
  • 当服务端处理完毕,它又会发出请求(服务端视角),在客户端来看,这就是响应。
  • 当客户端收到响应之后,由 EventLoop 线程将响应结果设置到 MessageFuture 中,由于一次请求和响应的 id 是相同的,所以可以从上面的哈希表中拿到对应的 MessageFuture 对象。
  • 当响应结果被设置之后,上面阻塞的线程就可以恢复运行,这样就实现了同步的效果。

所以,Seata 的解决方案本质上来说就是利用了 CompletableFuture 对象,将它作为一个存储结果的容器。

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
if (timeoutMillis <= 0) {
throw new FrameworkException("timeout should more than 0ms");
}
if (channel == null) {
LOGGER.warn("sendSync nothing, caused by null channel.");
return null;
}
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture); // 请求和响应的 id 是一样的
// 检查该 Channel 是否可写(Channel 中有写缓冲区,如果缓冲区达到阈值水位,则不可写)
channelWritableCheck(channel, rpcMessage.getBody());
// 获取目的 ip 地址
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
// 执行发送前钩子方法
doBeforeRpcHooks(remoteAddr, rpcMessage);
// 发送结果,并设置回调,非阻塞
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
// 发送失败,移除 future,关闭 Channel
if (!future.isSuccess()) {
MessageFuture mf = futures.remove(rpcMessage.getId());
if (mf != null) {
mf.setResultMessage(future.cause());
}
destroyChannel(future.channel());
}
});
try {
// Netty 是异步发送,所以这里需要等待结果,将异步转为同步
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
// 执行发送后的钩子方法
doAfterRpcHooks(remoteAddr, rpcMessage, result);
return result;
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
// 超时异常
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
}

报文处理

在 Netty 中,提到报文处理,我们首先应该想到的就是入栈、出栈处理器。

在 Seata Server 端,除了常见的编解码处理器之外,就是 ServerHandler 处理器了,如下:

@ChannelHandler.Sharable
class ServerHandler extends ChannelDuplexHandler {

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// 前置了解码处理器,所以这里的消息是 RpcMessage
if (msg instanceof RpcMessage) {
processMessage(ctx, (RpcMessage) msg);
} else {
LOGGER.error("rpcMessage type error");
}
}

// ...
}

比较有业务含义的就是这个 channelRead 方法,所有发向 Server 的报文在经过解码之后都会来到这个方法。

这里的 processMessage 方法就是 AbstractNettyRemoting 中的业务处理方法,如下:

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} msgId: {}, body: {}", this, rpcMessage.getId(), rpcMessage.getBody());
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 在 Server 启动的时候,向 processorTable 注册了一大堆处理器
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 拿到对应的线程池执行
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
// 找对应的处理器执行
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
// 线程池满了,执行拒绝策略
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
// 导出线程栈信息
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to {}", jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
try {
// 如果没有为处理器配置线程池,则由当前线程执行,基本上就是 EventLoop 线程了
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}

这个方法的逻辑很简单。

Seata 在 Server 启动的过程中,向 processorTable 注册了一大堆处理器,那么这里就可以根据消息类型 Code 拿到对应的处理器和线程池。

如果有线程池,就在线程池内执行处理器的方法,否则就交给 EventLoop 线程去执行。

当然,对于 Client 而言,也是这样的。

批量发送

在网络程序中,有时候也需要实现批量发送,我们来看 Seata 是怎么做的,这里主要看客户端向服务端发送。

还记得我们上面在 Client 启动的过程中提到过一个线程池 mergeSendExecutorService,如果允许批量发送,那么在 Client 启动的时候就会提交一个 MergedSendRunnable 任务,我们先来看这个任务在干啥?

private class MergedSendRunnable implements Runnable {

@Override
public void run() {
// 死循环
while (true) {
synchronized (mergeLock) {
try {
// 保证线程最多只会空闲 1ms
mergeLock.wait(MAX_MERGE_SEND_MILLS); // 1
} catch (InterruptedException ignore) {
// ignore
}
}
// 正在发送中的标识
isSending = true;
// basketMap: key 是 address,value 是发向该 address 的报文队列(阻塞队列)
basketMap.forEach((address, basket) -> {
if (basket.isEmpty()) {
return;
}
MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
// 将同一个阻塞队列中所有 RpcMessage 进行合并
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// 批量发送报文是一个同步请求,但是无需获取返回值
// 因为 messageFuture 在将报文放入 basketMap 之前就已经被创建
// 返回值将在 ClientOnResponseProcessor 中被设置
sendChannel = clientChannelManager.acquireChannel(address);
// 内部将 mergeMessage 封装为一个普通的 RpcMessage 发送
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrorCode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
isSending = false;
}
}
}

那么,与之相关的批量发送代码如下:

public Object sendSyncRequest(Object msg) throws TimeoutException {
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
long timeoutMillis = this.getRpcRequestTimeout();
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
// send batch message
// put message into basketMap, @see MergedSendRunnable
if (this.isEnableClientBatchSendRequest()) {
// 如果允许客户端批量消息发送
// send batch message is sync request, needs to create messageFuture and put it in futures.
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);

// put message into basketMap
// 拿到 serverAddress 对应的发送队列
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
// 将报文添加到队列中,等待 mergeSendExecutorService 进行实际的发送
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress: {}, rpcMessage: {}", serverAddress, rpcMessage);
return null;
}
if (!isSending) {
// 保证队列中一有数据,就唤醒线程,进行批量发送
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
try {
// 线程阻塞等待响应
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
LOGGER.error("wait response error: {}, ip: {}, request: {}", exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
} else {
// 普通发送,拿到 channel 调父类的同步调用方法即可
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}

可以看到,这里面也用到了对象锁的同步-等待机制,那么实现的效果就是:

  1. 最多隔 1ms 会遍历 basketMap 进行报文发送。
  2. 在 mergeSendExecutorService 内部的线程阻塞期间(mainLock.wait),如果来了需要发送的报文,那么会唤醒 mainLock 上的线程,继续进行发送。

那 Server 是怎么处理的呢?主要看 MergedWarpMessage 报文的 TypeCode,实际上就是 TYPE_SEATA_MERGE,再看 Server 启动的时候对这个 Code 注册哪个处理器,实际上就是 ServerOnRequestProcessor。

这里其实就向你展示了,如何去找某个报文是怎么处理的,授人以鱼不如授人以渔!

在 ServerOnRequestProcessor 这边,实际上对应了两种处理 MergedWarpMessage 报文的方式:

  1. MergedWarpMessage 中的所有独立请求全部处理完毕之后,统一发送 MergeResultMessage。
  2. 由 batchResponseExecutorService 线程池处理发送任务,可以保证两点,一是当有报文结果就响应,即使线程 wait,也会将它 notify,二是至少 1ms 会响应一次,因为 batchResponseExecutorService 中执行的线程最多 wait 1ms。

注意,这两种方式响应的报文类型是不同的,第一种响应的是 MergeResultMessage,第二种是 BatchResultMessage,在 Client 也会有不同的处理。

ServerOnRequestProcessor 中核心处理方法如下:

private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
// the batch send request message
if (message instanceof MergedWarpMessage) {
final List<AbstractMessage> msgs = ((MergedWarpMessage) message).msgs;
final List<Integer> msgIds = ((MergedWarpMessage) message).msgIds;
// 允许 TC 服务端批量返回结果 && 客户端版本号 >= 1.5.0
if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
// 由 batchResponseExecutorService 单独处理,无需等到批量请求全部处理完毕
for (int i = 0; i < msgs.size(); i++) {
if (PARALLEL_REQUEST_HANDLE) {
int finalI = i;
CompletableFuture.runAsync(
() -> handleRequestsByMergedWarpMessageBy150(msgs.get(finalI), msgIds.get(finalI), rpcMessage, ctx, rpcContext));
} else {
handleRequestsByMergedWarpMessageBy150(msgs.get(i), msgIds.get(i), rpcMessage, ctx, rpcContext);
}
}
} else {
// 每个请求都处理完毕,才能向客户端发出响应
List<AbstractResultMessage> results = new ArrayList<>();
List<CompletableFuture<AbstractResultMessage>> futures = new ArrayList<>();
for (int i = 0; i < msgs.size(); i++) {
if (PARALLEL_REQUEST_HANDLE) {
int finalI = i;
futures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage(msgs.get(finalI), rpcContext)));
} else {
results.add(i, handleRequestsByMergedWarpMessage(msgs.get(i), rpcContext));
}
}
if (CollectionUtils.isNotEmpty(futures)) {
try {
for (CompletableFuture<AbstractResultMessage> future : futures) {
results.add(future.get()); // 阻塞等待处理结果
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("handle request error: {}", e.getMessage(), e);
}
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
}
} else {
// 处理单个报文响应
}
}

而 handleRequestsByMergedWarpMessage 和 handleRequestsByMergedWarpMessageBy150 的区别就在于后者会将结果封装为 QueueItem 加入到阻塞队列由 batchResponseExecutorService 中的线程进行实际的发送,而前者仅仅是返回处理的结果。

private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage subMessage, RpcContext rpcContext) {
AbstractResultMessage resultMessage = transactionMessageHandler.onRequest(subMessage, rpcContext);
return resultMessage;
}

private void handleRequestsByMergedWarpMessageBy150(AbstractMessage msg, int msgId, RpcMessage rpcMessage,
ChannelHandlerContext ctx, RpcContext rpcContext) {
AbstractResultMessage resultMessage = transactionMessageHandler.onRequest(msg, rpcContext);
// 拿到 channel 对应的发送队列
BlockingQueue<QueueItem> msgQueue = CollectionUtils.computeIfAbsent(basketMap, ctx.channel(), key -> new LinkedBlockingQueue<>());
// 将结果添加到队列中,等待 batchResponseExecutorService 线程池实际进行发送
if (!msgQueue.offer(new QueueItem(resultMessage, msgId, rpcMessage))) {
LOGGER.error("put message into basketMap offer failed, channel: {}, rpcMessage: {}, resultMessage: {}", ctx.channel(), rpcMessage, resultMessage);
}
if (!isResponding) {
// 保证队列中一有数据,就唤醒线程,进行批量发送
synchronized (batchResponseLock) {
batchResponseLock.notifyAll();
}
}
}

再来看 batchResponseExecutorService 线程池是怎么处理批量发送的任务的?

private class BatchResponseRunnable implements Runnable {
@Override
public void run() {
while (true) {
synchronized (batchResponseLock) {
try {
// 最多空闲 1ms
batchResponseLock.wait(MAX_BATCH_RESPONSE_MILLS);
} catch (InterruptedException e) {
LOGGER.error("BatchResponseRunnable Interrupted error", e);
}
}
isResponding = true;
// 遍历 basketMap 处理
basketMap.forEach((channel, msgQueue) -> {
if (msgQueue.isEmpty()) {
return;
}
// Because the [serialization,compressor,rpcMessageId,headMap] of the response
// needs to be the same as the [serialization,compressor,rpcMessageId,headMap] of the request.
// Assemble by grouping according to the [serialization,compressor,rpcMessageId,headMap] dimensions.
// 将队列中的响应封装为 BatchResultMessage,但是注意并不是将所有的响应报文一次发送出去
// 需要按照 [serialization,compressor,rpcMessageId,headMap] 进行分组,然后按组进行异步发送
Map<ClientRequestRpcInfo, BatchResultMessage> batchResultMessageMap = new HashMap<>();
while (!msgQueue.isEmpty()) {
QueueItem item = msgQueue.poll();
BatchResultMessage batchResultMessage = CollectionUtils.computeIfAbsent(batchResultMessageMap,
new ClientRequestRpcInfo(item.getRpcMessage()),
key -> new BatchResultMessage());
batchResultMessage.getResultMessages().add(item.getResultMessage());
batchResultMessage.getMsgIds().add(item.getMsgId());
}
batchResultMessageMap.forEach((clientRequestRpcInfo, batchResultMessage) ->
remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo), channel, batchResultMessage));
});
isResponding = false;
}
}
}

最后我们来看 Client 这边是怎么处理 Server 的批量响应报文的,根据 Client 注册的处理器,处理批量报文的处理器是 ClientOnResponseProcessor,如下:

public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
// 处理 MergeResultMessage
if (rpcMessage.getBody() instanceof MergeResultMessage) {
MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();
MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());
for (int i = 0; i < mergeMessage.msgs.size(); i++) {
int msgId = mergeMessage.msgIds.get(i);
MessageFuture future = futures.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, results.getMsgs()[i]);
} else {
future.setResultMessage(results.getMsgs()[i]);
}
}
} else if (rpcMessage.getBody() instanceof BatchResultMessage) {
// 处理 BatchResultMessage
try {
BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody();
for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
int msgId = batchResultMessage.getMsgIds().get(i);
MessageFuture future = futures.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i));
} else {
future.setResultMessage(batchResultMessage.getResultMessages().get(i));
}
}
} finally {
// In order to be compatible with the old version, in the batch sending of version 1.5.0,
// batch messages will also be placed in the local cache of mergeMsgMap,
// but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMap
mergeMsgMap.clear();
}
} else {
// 处理非批量发送报文
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
if (rpcMessage.getBody() instanceof AbstractResultMessage) {
if (transactionMessageHandler != null) {
transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
}
}
}
}
}

当然,这里处理的逻辑很简单,就是将结果塞到对应的 MessageFuture 中,那么最开始发送请求的、阻塞的线程就可以拿到结果了,这样一次批量发送和响应就算处理完毕了。

我们再做一些额外的思考,Seata 的批量发送为什么有两种方式,孰优孰劣?

对于 MergeResultMessage 的这种方式来说,它必须等到所有的报文都处理完毕之后才会发送出去,所以其实它的响应速度受限于处理最长时间的报文,即使其他报文在很短时间内就可以发送出去。

而 BatchResultMessage 这种方式则不然,配合 CompletableFuture 进行并行处理,它就可以实现一有报文处理完毕就发送,而不需要等其他报文的处理,它的响应速度肯定是更快的。

而后面这种方式是 Seata 1.5 版本之后才有的,其实也可以看出来这是一种更好地处理方式。

最后,再分享一张 Seata RPC 重构作者的全局事务提交请求的交互流程图:

image-20241217222048505

Seata 如何管理 Channel

在整个 TC、TM、RM 的网络通信的过程中,Channel 是一个至关重要的通信组件,而要想知道 Seata 是怎么管理 Channel 的,最容易想到的入口就是看 Server 和 Client 发送报文时是从哪里拿到到 Channel 的。

在 AbstractNettyRemotingClient 类的 sendSyncRequest 中,我们可以看到下面的代码:

public Object sendSyncRequest(Object msg) throws TimeoutException {
// ...
// Client 通过 NettyClientChannelManager 获取 Channel
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}

而在 AbstractNettyRemotingServer 类的 sendSyncRequest 中,我们可以看到下面的代码:

public Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) throws TimeoutException {
// Server 通过 ChannelManager 拿到 Channel
Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp);
if (channel == null) {
throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
}
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
}

所以 Client 主要是通过 NettyClientChannelManager 中获取 Channel,而 Server 则是根据 resourceId 和 clientId 从 ChannelManager 中获取 Channel。

所以下面我们主要研究的就是这两个类,以及相关的一些逻辑。

Client Channel

我们先来看 Client 这边是怎么管理 Channel 的,核心类是 NettyClientChannelManager。

先简单看一下这个类的属性,

// serverAddress -> lock
private final ConcurrentMap<String, Object> channelLocks = new ConcurrentHashMap<>();
// serverAddress -> NettyPoolKey
private final ConcurrentMap<String, NettyPoolKey> poolKeyMap = new ConcurrentHashMap<>();
// serverAddress -> Channel
private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();
// 对象池,NettyPoolKey -> Channel
private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;
// 函数式接口,封装了通过 serverAddress 获取 NettyPoolKey 的逻辑
private final Function<String, NettyPoolKey> poolKeyFunction;

对象池的核心类

Seata 使用了 GenericKeyedObjectPool 作为管理 Channel 的对象池。

GenericKeyedObjectPool 作为 Apache Commons Pool 库中的一个实现,它主要用于管理一组对象池,每个对象通过唯一的 Key 进行区分,可以支持多类型的对象池化需求。

在使用 GenericKeyedObjectPool 时,通常还需要配置 KeyedPoolableObjectFactory 工厂,这个工厂定义了如何创建、验证、激活、钝化以及销毁池中的对象。

当 GenericKeyedObjectPool 需要创建对象时会调用 KeyedPoolableObjectFactory 工厂的 makeObject 方法,当需要销毁时会调用 destroyObject 方法进行销毁 ……

如何池化 Channel

被池化的对象就是 Channel,而对应的 Key 是 NettyPoolKey,如下:

public class NettyPoolKey {

private TransactionRole transactionRole;
private String address;
private AbstractMessage message;

// ...
}

在 NettyPoolKey 中,维护了三个信息,事务角色(TM、RM、Server),目的 TC Server 地址,以及在 Client 连接 Server 时发送的 RPC 报文。

如何创建这个 NettyPoolKey 呢?在 Seata 中,客户端其实是有两种角色的,TM 和 RM,创建的逻辑肯定是不一样的,所以,Seata 在 AbstractNettyRemotingClient 中抽象了一个方法,它的返回值是一个函数式接口,这个函数式接口就封装了根据 serverAddress 创建 NettyPoolKey 的逻辑。

// org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#getPoolKeyFunction
protected abstract Function<String, NettyPoolKey> getPoolKeyFunction();

比如在 TM 中的实现是:

protected Function<String, NettyPoolKey> getPoolKeyFunction() {
return severAddress -> {
RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup, getExtraData());
return new NettyPoolKey(NettyPoolKey.TransactionRole.TM_ROLE, severAddress, message);
};
}

而在 RM 中的实现是:

protected Function<String, NettyPoolKey> getPoolKeyFunction() {
return serverAddress -> {
String resourceIds = getMergedResourceKeys();
if (resourceIds != null && LOGGER.isInfoEnabled()) {
LOGGER.info("RM will register: {}", resourceIds);
}
RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
message.setResourceIds(resourceIds);
return new NettyPoolKey(NettyPoolKey.TransactionRole.RM_ROLE, serverAddress, message);
};
}

从这里就可以看到,TM 在连接 Server 后发送的报文是 RegisterTMRequest,而 RM 是 RegisterRMRequest。

那这个函数式接口在什么时候被调用呢,后面再看。

我们前面也说到了,一个对象池,会配备对应的对象创建工厂 KeyedPoolableObjectFactory,在 Seata 中,以 NettyPoolableFactory 继承 KeyedPoolableObjectFactory 来实现。

/**
* Netty Channel 创建工厂,通过 NettyPoolKey 创建 Channel,该类的方法必须是线程安全的
*/
public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {

// ...

/**
* 需要一个新的实例则调用该方法
*/
@Override
public Channel makeObject(NettyPoolKey key) {
InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
// 创建 Channel,本质上就是通过 bootstrap.connect 连接到 Seata Server 返回 Channel
Channel tmpChannel = clientBootstrap.getNewChannel(address);
long start = System.currentTimeMillis();
Object response;
Channel channelToServer = null;
if (key.getMessage() == null) {
throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
}
try {
// 发送 Message,TM 就是 RegisterTMRequest,RM 就是 RegisterRMRequest
response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());
// 根据 response 判断是否注册成功
if (!isRegisterSuccess(response, key.getTransactionRole())) {
rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
} else {
// 注册成功
channelToServer = tmpChannel;
// 将 serverAddress 作为 key,Channel 作为 value,添加到 NettyClientChannelManager.channels 中
// 如果是 RM 可能还需要将 Server 注册 resources
rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());
}
} catch (Exception exx) {
if (tmpChannel != null) {
tmpChannel.close();
}
throw new FrameworkException("register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());
}
return channelToServer;
}

// ...

@Override
public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {
if (channel != null) {
channel.disconnect();
channel.close();
}
}

/**
* 需要借用对象时会调用该方法校验对象有效性(可选)
*/
@Override
public boolean validateObject(NettyPoolKey key, Channel obj) {
if (obj != null && obj.isActive()) {
return true;
}
return false;
}

/**
* 需要借用对象时会调用该方法激活对象
*/
@Override
public void activateObject(NettyPoolKey key, Channel obj) throws Exception {}

/**
* 归还对象时会调用该方法钝化对象
*/
@Override
public void passivateObject(NettyPoolKey key, Channel obj) throws Exception {}
}

获取 Channel

在整个 Seata 客户端,有三个口径可以获取 Channel,即初始化、定时重连,发送报文时获取 Channel。

// 口径一
private void initConnection() {
boolean failFast =
ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST, DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
}

// 口径二
public void init() {
// 默认延时 60s 定时 10s 周期重连
timerExecutor.scheduleAtFixedRate(() -> {
try {
clientChannelManager.reconnect(getTransactionServiceGroup());
} catch (Exception ex) {
LOGGER.warn("reconnect server failed. {}", ex.getMessage());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
// ...
}

// 口径三
public Object sendSyncRequest(Object msg) throws TimeoutException {
// ...
// Client 通过 NettyClientChannelManager 获取 Channel
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}

不过,这三个口径最后都会调用到 clientChannelManager 的 acquireChannel 方法获取 Channel。

/**
* 根据 serverAddress 拿到 Channel,如果 Channel 不存在或者连接已死则需要重新建立连接
*/
Channel acquireChannel(String serverAddress) {
// 从 channels 中根据 serverAddress 拿到 Channel
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null) {
channelToServer = getExistAliveChannel(channelToServer, serverAddress);
if (channelToServer != null) {
return channelToServer;
}
}
// 如果 channels 没有这个 Channel 或者这个 Channel 已死,则需要对这个地址建立连接
Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
synchronized (lockObj) {
// 建立连接
return doConnect(serverAddress);
}
}

private Channel doConnect(String serverAddress) {
// 再尝试拿一次
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null && channelToServer.isActive()) {
return channelToServer;
}
Channel channelFromPool;
try {
// 这里就调用了函数式接口
NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
poolKeyMap.put(serverAddress, currentPoolKey);
// 从对象池中 borrowObject,如果需要创建对象,则会调用工厂的 makeObject 方法,
// 该方法内部就会向 Server 进行 connect,并且发送 currentPoolKey.message 的报文
channelFromPool = nettyClientKeyPool.borrowObject(currentPoolKey);
channels.put(serverAddress, channelFromPool);
} catch (Exception exx) {
LOGGER.error("{} register RM failed.", FrameworkErrorCode.RegisterRM.getErrCode(), exx);
throw new FrameworkException("can not register RM,err:" + exx.getMessage());
}
return channelFromPool;
}

Server Channel

而在 Server 这边,基本上有关 Channe 管理的核心逻辑都在 ChannelManager 中,那 Server 这边的 Channel 是怎么来的呢?还记得在 Client 那边向 Server 发起连接,成功之后还会发送 TM 和 RM 的一个注册请求。

这里先来看看 Server 是怎么处理这些 registerRequest 的。

处理 Client 注册

与之相关的处理器是 RegRmProcessor 和 RegTmProcessor,在这两个处理器中,最核心的逻辑就是调用 ChannelManager 的 registerTMChannel 和 registerRMChannel 方法。

public static void registerTMChannel(RegisterTMRequest request, Channel channel) throws IncompatibleVersionException {
// 构建 RpcContext,这个 RpcContext 就是维护了客户端连接信息上下文
RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TM_ROLE, request.getVersion(),
request.getApplicationId(),
request.getTransactionServiceGroup(),
null, channel);
// 将 Channel 作为 key,rpcContext 作为 value,put 到 IDENTIFIED_CHANNELS 中
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
// applicationId:clientIp
String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR + ChannelUtil.getClientIpFromChannel(channel);
// 将 Channel 信息存储到 TM_CHANNELS 中
ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = CollectionUtils.computeIfAbsent(TM_CHANNELS, clientIdentified, key -> new ConcurrentHashMap<>());
rpcContext.holdInClientChannels(clientIdentifiedMap);
}

public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel) throws IncompatibleVersionException {
Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
RpcContext rpcContext;
if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
// 构建 RpcContext 和 IDENTIFIED_CHANNELS
rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RM_ROLE, resourceManagerRequest.getVersion(),
resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
resourceManagerRequest.getResourceIds(), channel);
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
} else {
rpcContext = IDENTIFIED_CHANNELS.get(channel);
rpcContext.addResources(dbkeySet);
}
if (dbkeySet == null || dbkeySet.isEmpty()) {
return;
}
for (String resourceId : dbkeySet) {
String clientIp;
// 维护 RM_CHANNELS 信息
ConcurrentMap<Integer, RpcContext> portMap = CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>())
.computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());
rpcContext.holdInResourceManagerChannels(resourceId, portMap);
updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
}
}

这两个方法逻辑很简单,就是基于注册请求和 Channel 的信息构建 RpcContext,维护 Server 内的相关 Map 集合,IDENTIFIED_CHANNELS、RM_CHANNELS、TM_CHANNELS。

但是,说实话,这几个集合实在是嵌套的有点深,不知道能不能优化一下。

/**
* Channel -> RpcContext
*/
private static final ConcurrentMap<Channel, RpcContext> IDENTIFIED_CHANNELS = new ConcurrentHashMap<>();

/**
* resourceId -> applicationId -> ip -> port -> RpcContext
*/
// resourceId applicationId ip
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String,
// port RpcContext
ConcurrentMap<Integer, RpcContext>>>> RM_CHANNELS = new ConcurrentHashMap<>();

/**
* applicationId:clientIp -> port -> RpcContext
*/
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS = new ConcurrentHashMap<>();

获取 Channel

在 Server 这边,获取 Channel 的逻辑,真的是超长,感兴趣自己看看吧,本质上就是从 map 中拿到一个有效的 Channel。

public static Channel getChannel(String resourceId, String clientId, boolean tryOtherApp) {
Channel resultChannel = null;
// 解析 ClientId,三部分组成:applicationId + clientIp + clientPort
String[] clientIdInfo = parseClientId(clientId);
if (clientIdInfo == null || clientIdInfo.length != 3) {
throw new FrameworkException("Invalid Client ID: " + clientId);
}
if (StringUtils.isBlank(resourceId)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No channel is available, resourceId is null or empty");
}
return null;
}
// applicationId
String targetApplicationId = clientIdInfo[0];
// clientIp
String targetIP = clientIdInfo[1];
// clientPort
int targetPort = Integer.parseInt(clientIdInfo[2]);
// 下面就是不断取出内层的 ConcurrentHashMap
ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);
if (targetApplicationId == null || applicationIdMap == null || applicationIdMap.isEmpty()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No channel is available for resource[{}]", resourceId);
}
return null;
}
ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);
if (ipMap != null && !ipMap.isEmpty()) {
// Firstly, try to find the original channel through which the branch was registered.
// 端口 -> RpcContext
ConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);
/**
* 在 targetIp 上拿 Channel
*/
if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {
RpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);
if (exactRpcContext != null) {
Channel channel = exactRpcContext.getChannel();
if (channel.isActive()) {
// Channel 有效,则跳过下面所有的 if 返回这个 Channel
resultChannel = channel;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Just got exactly the one {} for {}", channel, clientId);
}
} else {
if (portMapOnTargetIP.remove(targetPort, exactRpcContext)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive {}", channel);
}
}
}
}
// The original channel was broken, try another one.
if (resultChannel == null) {
// 尝试当前节点上的其他端口
for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP.entrySet()) {
Channel channel = portMapOnTargetIPEntry.getValue().getChannel();
if (channel.isActive()) {
resultChannel = channel;
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Choose {} on the same IP[{}] as alternative of {}", channel, targetIP, clientId);
}
break;
} else {
if (portMapOnTargetIP.remove(portMapOnTargetIPEntry.getKey(),
portMapOnTargetIPEntry.getValue())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive {}", channel);
}
}
}
}
}
}
/**
* 在 targetApplicationId 上拿 Channel
*/
// No channel on the app node, try another one.
if (resultChannel == null) {
for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap.entrySet()) {
if (ipMapEntry.getKey().equals(targetIP)) {
continue;
}
ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();
if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {
continue;
}
for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {
Channel channel = portMapOnOtherIPEntry.getValue().getChannel();
if (channel.isActive()) {
resultChannel = channel;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Choose {} on the same application[{}] as alternative of {}", channel, targetApplicationId, clientId);
}
break;
} else {
if (portMapOnOtherIP.remove(portMapOnOtherIPEntry.getKey(), portMapOnOtherIPEntry.getValue())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive {}", channel);
}
}
}
}
if (resultChannel != null) {
break;
}
}
}
}
if (resultChannel == null && tryOtherApp) {
// 尝试其他 applicationId
resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);
if (resultChannel == null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No channel is available for resource[{}] as alternative of {}", resourceId, clientId);
}
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Choose {} on the same resource[{}] as alternative of {}", resultChannel, resourceId, clientId);
}
}
}
return resultChannel;
}

private static Channel tryOtherApp(ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap, String myApplicationId) {
Channel chosenChannel = null;
for (ConcurrentMap.Entry<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMapEntry : applicationIdMap.entrySet()) {
if (!StringUtils.isNullOrEmpty(myApplicationId) && applicationIdMapEntry.getKey().equals(myApplicationId)) {
continue;
}
ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> targetIPMap = applicationIdMapEntry.getValue();
if (targetIPMap == null || targetIPMap.isEmpty()) {
continue;
}
for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> targetIPMapEntry : targetIPMap.entrySet()) {
ConcurrentMap<Integer, RpcContext> portMap = targetIPMapEntry.getValue();
if (portMap == null || portMap.isEmpty()) {
continue;
}
for (ConcurrentMap.Entry<Integer, RpcContext> portMapEntry : portMap.entrySet()) {
Channel channel = portMapEntry.getValue().getChannel();
if (channel.isActive()) {
chosenChannel = channel;
break;
} else {
if (portMap.remove(portMapEntry.getKey(), portMapEntry.getValue())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive {}", channel);
}
}
}
}
if (chosenChannel != null) {
break;
}
}
if (chosenChannel != null) {
break;
}
}
return chosenChannel;
}

一图总结

最后,再以一个时序图来总结一下 Channel 的管理过程。

image-20241217222155609

Seata 如何设计协议

对于一个网络程序而言,通信协议是必不可少的,Seata 也不例外,这里我们就看看 Seata V1 版本的协议是如何实现的。

与之相关类主要有 ProtocolEncoderV1、ProtocolDecoderV1。

当然,我们前面也知道 Seata Server 启动时加入的处理器其实是 MultiProtocolDecoder,在这个类的 decode 方法中,如下:

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame;
Object decoded;
byte version;
try {
if (isV0(in)) {
decoded = in;
version = ProtocolConstants.VERSION_0;
} else {
decoded = super.decode(ctx, in);
version = decideVersion(decoded);
}
if (decoded instanceof ByteBuf) {
frame = (ByteBuf) decoded;
// 通过 MultiProtocolDecoder 进行多版本协议识别
// 通过 version 选择对应的编解码器
ProtocolDecoder decoder = protocolDecoderMap.get(version);
ProtocolEncoder encoder = protocolEncoderMap.get(version);
try {
if (decoder == null || encoder == null) {
throw new UnsupportedOperationException("Unsupported version: " + version);
}
return decoder.decodeFrame(frame);
} finally {
if (version != ProtocolConstants.VERSION_0) {
frame.release();
}
// 将选定的编解码器加入到 pipeline,并且移除 MultiProtocolDecoder
ctx.pipeline().addLast((ChannelHandler) decoder);
ctx.pipeline().addLast((ChannelHandler) encoder);
if (channelHandlers != null) {
ctx.pipeline().addLast(channelHandlers);
}
ctx.pipeline().remove(this);
}
}
} catch (Exception exx) {
LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
throw new DecodeException(exx);
}
return decoded;
}

所以,这里选择好与 version 对应的编解码器,然后加入到 pipeline 中,就会将 MultiProtocolDecoder 移除。

V1 版本协议

Seata 的协议设计是比较周全并且通用的,也是主流的解决粘包半包问题的解决方案,即消息长度 + 消息内容。

协议的格式如下:

image-20241217222155609

可以看到,包括魔数、协议版本号、长度域、头长度、报文类型、序列化算法、压缩算法、请求 id、可选的 map 扩展以及报文体。

如何进行编解码

Seata 解码器使用了 Netty 内置的 LengthFieldBasedFrameDecoder,不熟悉的可以看看。

不过编解码并不难,所以简单给出代码,不过多解释。

package org.apache.seata.core.rpc.netty.v1;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.seata.core.rpc.netty.ProtocolEncoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.compressor.Compressor;
import org.apache.seata.core.compressor.CompressorFactory;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.serializer.SerializerServiceLoader;
import org.apache.seata.core.serializer.SerializerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
* | magic |proto| full length | head | Msg |Seria|Compr| RequestId |
* | code |versi| (head+body) | length |Type |lizer|ess | |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | Head Map [Optional] |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | body |
* +-----------------------------------------------------------------------------------------------+
* </pre>
* <p>
* <li>Full Length: include all data </li>
* <li>Head Length: include head data from magic code to head map. </li>
* <li>Body Length: Full Length - Head Length</li>
* </p>
*/
public class ProtocolEncoderV1 extends MessageToByteEncoder implements ProtocolEncoder {

private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolEncoderV1.class);

public void encode(RpcMessage message, ByteBuf out) {
try {
ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
rpcMessage.rpcMsgToProtocolMsg(message);
int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
int headLength = ProtocolConstants.V1_HEAD_LENGTH;
byte messageType = rpcMessage.getMessageType();
out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
out.writeByte(ProtocolConstants.VERSION_1);
// full Length(4B) and head length(2B) will fix in the end.
out.writerIndex(out.writerIndex() + 6); // 这里跳过 full length 和 head length 的位置,最后在补
out.writeByte(messageType);
out.writeByte(rpcMessage.getCodec());
out.writeByte(rpcMessage.getCompressor());
out.writeInt(rpcMessage.getId());
// direct write head with zero-copy
Map<String, String> headMap = rpcMessage.getHeadMap();
if (headMap != null && !headMap.isEmpty()) {
int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);
headLength += headMapBytesLength;
fullLength += headMapBytesLength;
}
byte[] bodyBytes = null;
// heartbeat don't have body
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
fullLength += bodyBytes.length;
}
if (bodyBytes != null) {
out.writeBytes(bodyBytes);
}
// fix fullLength and headLength
int writeIndex = out.writerIndex();
// skip magic code(2B) + version(1B)
out.writerIndex(writeIndex - fullLength + 3);
out.writeInt(fullLength);
out.writeShort(headLength);
out.writerIndex(writeIndex);
} catch (Throwable e) {
LOGGER.error("Encode request error!", e);
throw e;
}
}

@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
try {
if (msg instanceof RpcMessage) {
this.encode((RpcMessage) msg, out);
} else {
throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
}
} catch (Throwable e) {
LOGGER.error("Encode request error!", e);
}
}
}
package org.apache.seata.core.rpc.netty.v1;

import java.util.List;
import java.util.Map;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.seata.core.compressor.Compressor;
import org.apache.seata.core.compressor.CompressorFactory;
import org.apache.seata.core.exception.DecodeException;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.rpc.netty.ProtocolDecoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.serializer.SerializerServiceLoader;
import org.apache.seata.core.serializer.SerializerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
* | magic |proto| full length | head | Msg |Seria|Compr| RequestId |
* | code |versi| (head+body) | length |Type |lizer|ess | |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | Head Map [Optional] |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | body |
* +-----------------------------------------------------------------------------------------------+
* </pre>
* <p>
* <li>Full Length: include all data </li>
* <li>Head Length: include head data from magic code to head map. </li>
* <li>Body Length: Full Length - Head Length</li>
* </p>
*/
public class ProtocolDecoderV1 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder {

private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV1.class);

private final List<SerializerType> supportDeSerializerTypes;

public ProtocolDecoderV1() {
/**
* int maxFrameLength,
* int lengthFieldOffset, 魔术 2B、版本号 1B 所以长度偏移 3B
* int lengthFieldLength, FullLength is int(4B). so values is 4
* int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7
* int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0
*/
super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0);
supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers();
if (supportDeSerializerTypes.isEmpty()) {
throw new IllegalArgumentException("No serializer found");
}
}

@Override
public RpcMessage decodeFrame(ByteBuf frame) {
byte b0 = frame.readByte();
byte b1 = frame.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
}
byte version = frame.readByte();
int fullLength = frame.readInt();
short headLength = frame.readShort();
byte messageType = frame.readByte();
byte codecType = frame.readByte();
byte compressorType = frame.readByte();
int requestId = frame.readInt();
ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
rpcMessage.setCodec(codecType);
rpcMessage.setId(requestId);
rpcMessage.setCompressor(compressorType);
rpcMessage.setMessageType(messageType);
// direct read head with zero-copy
int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
if (headMapLength > 0) {
Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
rpcMessage.getHeadMap().putAll(map);
}
// read body
if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {
rpcMessage.setBody(HeartbeatMessage.PING);
} else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
rpcMessage.setBody(HeartbeatMessage.PONG);
} else {
int bodyLength = fullLength - headLength;
if (bodyLength > 0) {
byte[] bs = new byte[bodyLength];
frame.readBytes(bs);
Compressor compressor = CompressorFactory.getCompressor(compressorType);
bs = compressor.decompress(bs);
SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec());
if (this.supportDeSerializerTypes.contains(protocolType)) {
Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1);
rpcMessage.setBody(serializer.deserialize(bs));
} else {
throw new IllegalArgumentException("SerializerType not match");
}
}
}
return rpcMessage.protocolMsgToRpcMsg();
}

@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object decoded;
try {
decoded = super.decode(ctx, in);
if (decoded instanceof ByteBuf) {
ByteBuf frame = (ByteBuf) decoded;
try {
return decodeFrame(frame);
} finally {
frame.release();
}
}
} catch (Exception exx) {
LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
throw new DecodeException(exx);
}
return decoded;
}
}

总结

就目前看来,Seata 的网络通信实现的是比较容易看懂的,不过,这篇文章的分析也仅仅只是浮于表面,对深层次的更加重要的代码健壮性、异常处理、优雅关闭等问题都没有聊到,看后面有新的理解再分析分析。

原文链接

· 阅读需 6 分钟

随着PR https://github.com/apache/incubator-seata/pull/6754 的合并,Seata Server能够做到识别并处理Grpc请求,这意味着任意语言客户端,只需要引入proto文件,就可以和部署在JVM上的Seata Server通信,进而实现分布式事务的全流程。

下面以Go语言为例,向大家演示这一过程。

环境准备

Goland 2024.2

Idea 2024.3

jdk 1.8

go 1.23.3

Seata 2.3.0-SNAPSHOT

libprotoc 3.21.0

操作过程

部署并启动 Seata Server

运行 org.apache.seata.server.ServerApplication#main,如下所示

2024121301.png

proto文件导入

在go项目中导入完成本次事务流程所需的proto文件,包括各类事务请求和响应的proto文件和发起RPC的proto文件。如下所示

2024121302.png

grpc相关文件生成

在上一步导入的proto文件目录下,执行命令

 protoc --go_out=. --go-grpc_out=. .\*.proto

执行完后会生成grpc代码,如下所示

2024121303.png

grpc调用

在main.go中完成一次分布式事务的流程,并打印Seata Server的响应,代码如下所示

func main() {
conn, err := grpc.Dial(":8091", grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewSeataServiceClient(conn)
stream, err := client.SendRequest(context.Background())
if err != nil {
log.Fatalf("could not sendRequest: %v", err)
}
defer stream.CloseSend()

sendRegisterTm(stream)
xid := sendGlobalBegin(stream)
sendBranchRegister(stream, xid)
sendGlobalCommit(stream, xid)
}

func sendRegisterTm(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto]) {
abstractIdentifyRequestProto := &pb.AbstractIdentifyRequestProto{
ApplicationId: "test-applicationId",
}
registerTMRequestProto := &pb.RegisterTMRequestProto{
AbstractIdentifyRequest: abstractIdentifyRequestProto,
}

registerTMResponseProto := &pb.RegisterTMResponseProto{}
sendMessage(stream, registerTMRequestProto, registerTMResponseProto)
}

func sendGlobalBegin(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto]) string {
globalBeginRequestProto := &pb.GlobalBeginRequestProto{
TransactionName: "test-transactionName",
Timeout: 200,
}
globalBeginResponseProto := &pb.GlobalBeginResponseProto{}
sendMessage(stream, globalBeginRequestProto, globalBeginResponseProto)
return globalBeginResponseProto.Xid
}

func sendBranchRegister(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto], xid string) {
branchRegisterRequestProto := &pb.BranchRegisterRequestProto{
Xid: xid,
LockKey: "1",
ResourceId: "test-resourceId",
BranchType: pb.BranchTypeProto_AT,
ApplicationData: "{\"mock\":\"mock\"}",
}

branchRegisterResponseProto := &pb.BranchRegisterResponseProto{}
sendMessage(stream, branchRegisterRequestProto, branchRegisterResponseProto)
}

func sendGlobalCommit(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto], xid string) {
abstractGlobalEndRequestProto := &pb.AbstractGlobalEndRequestProto{
Xid: xid,
}
globalCommitRequestProto := &pb.GlobalCommitRequestProto{
AbstractGlobalEndRequest: abstractGlobalEndRequestProto,
}

globalCommitResponseProto := &pb.GlobalCommitResponseProto{}
sendMessage(stream, globalCommitRequestProto, globalCommitResponseProto)
}

func sendMessage(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto], req proto.Message, response proto.Message) {
anyMsg, err := anypb.New(req)
if err != nil {
log.Fatalf("could not new any msg: %v", err)
}
marshal, err := proto.Marshal(anyMsg)
msg := &pb.GrpcMessageProto{
HeadMap: map[string]string{},
Body: marshal,
}
err = stream.Send(msg)
if err != nil {
log.Fatalf("could not send msg: %v", err)
}
resp, err := stream.Recv()
if err != nil {
log.Fatalf("failed to receive message: %v", err)
}

body := resp.Body
var anyMessage anypb.Any
err = proto.Unmarshal(body, &anyMessage)
if err != nil {
log.Fatalf("failed to unmarshal to any: %v", err)
}
err = anypb.UnmarshalTo(&anyMessage, response, proto.UnmarshalOptions{})
if err != nil {
log.Fatalf("failed to unmarshal to message: %v", err)
}

log.Printf("Received: %+v", response)
}

运行后,Seata Server控制台打印如下

2024121304.png

Go客户端控制台打印如下

2024121305.png

实现原理

proto设计

为了实现与多语言grpc客户端的通信,Seata Server定义了grpcMessage.proto,其中定义了装配 Seata各种Message对象的GrpcMessageProto和装配Seata各类通信请求的双向流接口sendRequest。Seata Server以grpcMessage.proto作为媒介,可以实现与多语言客户端的通信

syntax = "proto3";
package org.apache.seata.protocol.protobuf;
import "google/protobuf/any.proto";
option java_multiple_files = true;
option java_outer_classname = "GrpcMessage";
option java_package = "org.apache.seata.core.protocol.generated";

message GrpcMessageProto {
int32 id = 1;
int32 messageType = 2;
map<string, string> headMap = 3;
google.protobuf.Any body = 4;
}

service SeataService {
rpc sendRequest (stream GrpcMessageProto) returns (stream GrpcMessageProto);
}

除此之外,还定义了GrpcSerializer,适配 Seata 的序列化器SPI体系,用于实现protobuf字节流和Seata消息对象的互相转换

grpc协议识别

Seata Server实现了ProtocolDetectHandler和ProtocolDetector。ProtocolDetectHandler作为ByteToMessageDecoder,在收到消息时,会遍历ProtocolDetector列表寻找能够识别当前消息的ProtocolDetector,ProtocolDetector通过识别魔数的方式区分Seata协议,Http1.1协议,Http2协议,一旦识别成功,会将能够处理该协议的ChannelHandler加入到当前Channel的Pipeline中

2024121306.jpeg

grpc请求发送与处理

Seata Server 实现了GrpcEncoder和GrpcDecoder,GrpcEncoder负责将Seata的RpcMessage转换为grpc原生客户端可识别的GrpcMessageProto,并在header中填充status,contentType等协议头用于与grpc原生客户端通信。GrpcEncoder还负责适配grpc协议规范,将压缩位、长度、消息体按照grpc协议约定的顺序写入channel

GrpcDecoder负责处理grpc原生客户端的请求。由于grpc客户端在底层传输时通过队列的方式实现了请求的分批flush,因此GrpcDecoder还负责将一批请求进行拆包。最终GrpcDecoder将protobuf字节流转换为一个或多个RpcMessage,并传递给Seata请求处理器

grpc连接的建立和管理

Server端只需配置配置一个ProtocolDetectHandler,即可完成各种类型连接的识别和建立

@Override
public void initChannel(SocketChannel ch) {
ProtocolDetector[] defaultProtocolDetectors = {
new Http2Detector(getChannelHandlers()),
new SeataDetector(getChannelHandlers()),
new HttpDetector()
};
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new ProtocolDetectHandler(defaultProtocolDetectors));
}

Client端在每次获取Channel时,如果当前配置的通信方式是Grpc,则会以NioSocketChannel作为父Channel,获取一个Http2MultiStreamChannel,并在该Channel中添加grpc相关的handler

if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) {
Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel);
bootstrap.handler(new ChannelInboundHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channel.pipeline().addLast(new GrpcDecoder());
channel.pipeline().addLast(new GrpcEncoder());
if (channelHandlers != null) {
addChannelPipelineLast(channel, channelHandlers);
}
}
});
channel = bootstrap.open().get();
}

· 阅读需 11 分钟

seata目前支持多种注册中心的实现,为了提供整个链路的闭环功能,seata设计推出了原生的注册中心namingserver

2. 领域模型

2.1 命名空间与事务分组

  • Namespace:在NamingServer模型中,命名空间(Namespace)用于实现命名空间的环境隔离。它允许在不同的环境(如开发、测试、生产)中隔离各自的服务实例。
  • Cluster与Unit:Cluster(集群)负责事务分组的处理,Unit则在每个集群内部做负载均衡。事务分组(vgroup)在命名空间和集群的配合下,通过元数据定位到具体的TC节点。

2.2 事务处理流程与namingserver的交互

img 事务处理流程与namingserver的交互的流程如下:

1.在client侧配置好Namingserver的地址和相关配置

2.client启动后TM向namingserver发起服务发现的请求

3.namingserver根据TM传来的vGroup参数和内存中的事务分组映射关系返回相关的集群列表,namingserver返回的集群列表元数据如下

{
"clusterList": [
{
"clusterName": "cluster2",
"clusterType": "default",
"groupList":[group1,group2]
"unitData": [
{
"unitName": "115482ee-cf27-45d6-b17e-31b9e2d7892f",
"namingInstanceList": [
{
"ip": "172.31.31.191",
"port": 8092,
"nettyPort": 0,
"grpcPort": 0,
"weight": 1.0,
"healthy": true,
"timeStamp": 1695042063334,
"role": member,
"metadata": {
"weight": 1,
"cluster-type": "default"
}
}
]
},
{
"unitName": "097e6ab7-d2d2-47e4-a578-fae1a4f4c517",
"namingInstanceList": [
{
"ip": "172.31.31.191",
"port": 8091,
"nettyPort": 0,
"grpcPort": 0,
"weight": 1.0,
"healthy": true,
"timeStamp": 1695042076481,
"role": member,
"metadata": {
"weight": 1,
"cluster-type": "default"
}
}
]
}
]
}
],
"term": 1695042076578
}

4.客户端通过负载均衡策略找出合适的TC节点开启事务

5.TM将事务分组和TC节点传递给RM

6.RM向TC节点发起分支注册的请求

7.TC节点完成二阶段下发

3.设计思路

3.1 AP还是CP?

CAP协议又称CAP定理,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。 分布式系统的CAP理论:理论首先把分布式系统中的三个特性进行了如下归纳:

● 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)

● 可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)

● 分区容错性(P):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

对于namingserver而言,我们更倾向于使用AP模型,即注重可用性和分区容错性,牺牲一定的一致性。NamingServer作为服务注册中心,主要职责是提供高效的服务发现与注册服务,而对短时间内的数据一致性要求可以适当放松。在分布式环境中,可能会出现多个节点短暂的注册数据不一致现象。例如,当多个NamingServer节点发生了网络分区时,某些节点获取的注册信息可能有延迟。 对于NamingServer来说,这种短暂的不一致性我们认为是可以容忍的。由于服务注册与发现的强一致性要求并不高,即使在某个时刻有部分节点的注册数据滞后或不一致,也不会立刻影响到整个系统的正常服务。通过心跳机制、周期性同步等方式,最终一致性可以逐渐得到保证。

3.2 Quorum NWR机制在NamingServer中的应用

Quorum NWR(仲裁读写)是一种在分布式系统中用于确保数据一致性的机制。该机制通过设置副本的总数(N)、写操作需要成功的副本数(W)、读操作需要访问的副本数(R)来协调数据的一致性。在NamingServer的设计中,采用了多写+补偿机制来保证多个NamingServer节点之间的信息一致性,而客户端则与某一个NamingServer节点交互以获取注册信息。

  1. 写入操作(W-写仲裁): 当有集群节点变化时,server端会将请求发送到NamingServer集群中的多个节点。 根据NWR机制,系统确保至少有W个副本成功写入注册信息。 通过多写机制,即使某些节点暂时不可用或存在网络延迟,仍能确保写操作的高可用性。一旦W个节点写入成功,客户端将收到成功响应。 补偿机制:对于没有立即成功写入的副本,系统会通过异步补偿方式,在稍后的时间段内同步这些节点,确保最终一致性。
  2. 读取操作(R-读仲裁): 客户端通过与NamingServer集群中的任意一个节点进行交互,发送读取请求以获取服务注册信息。 系统会从至少R个副本中读取数据,采用最新版本的数据作为返回结果。即使某些节点的数据存在短暂不一致,客户端通过读取多个副本并比较其版本号或一致性标记,能够确保读取到最新的注册信息。 由于客户端只与一个NamingServer节点进行交互,读取操作的效率得到了提升,并且避免了多个节点之间的复杂协调,系统依然能保证最终一致性。
  3. NWR参数的设计与权衡: 在namingserver中我们设置W=N、R=1。W=N虽然意味着写入需要发送到所有节点,但并不要求所有节点必须立即成功写入。系统允许某些节点暂时失败,通过补偿机制在后续阶段同步这些节点,从而提高系统的容错性,因为即便某些节点在写入时发生故障或网络中断,数据更新仍然可以通过补偿机制最终传播到所有节点。这既保证了系统的高可用性,也确保了数据最终在所有节点上是一致的。由于写操作要求所有节点参与,因此每个节点都会接收到最新的数据更新。 客户端在执行读取操作时,可以从任意一个NamingServer节点读取数据,而不必担心数据不一致的问题。即使某些节点未能在写入时立即成功,客户端仍能从其他已成功写入的节点获取最新的注册信息。这样,R值可以设定较低(如R=1),从而提高读取操作的效率,同时系统通过补偿机制确保所有节点最终达到一致。 img

3.2 架构图

img

namingserver的运行链路如上图所示:

  1. 通过控制台在某个cluster下创建一个事务分组vgroup。
  2. 创建vgroup->cluster的请求发送给namingserver,namingserver再传递给对应的tc节点。
  3. tc节点将vgroup->cluster映射关系持久化保存。
  4. tc节点在心跳的时候将vgroup->cluster的映射关系更新到所有的namingserver。
  5. client通过自己配置的事务分组vgroup从namingserver获取对应cluster元数据。
  6. client在事务流程中,使用cluster下的unit进行负载均衡,再进行begin,registry,commit,rollback等。
  7. 事务决议后,对应的unit下的leader节点下发二阶段,无状态节点下,每个unit的唯一node就是leader。

3.3 设计细节

3.3.1 长轮询推送集群变化情况

img

如上图所示,每隔30s client侧需要向namingserver发起一次服务发现的请求,用以拉取最新的tc列表。而在这30s的间隔中,client侧将采用HTTP长轮询的方式一直watch namingserver节点,如果namingserver 侧有如下的变化:

1.事务分组映射关系的变化;

2.集群中实例的增加或者减少;

3.集群中实例的属性的变化;

那么watch返回200状态码,告知client需要获取最新集群信息;否则namingserver将一直挂起watch方法,直到HTTP长轮询超时,然后返回304状态码, 告知client进行下一轮watch。

· 阅读需 14 分钟

目前seata支持丰富的第三方配置中心,但是考虑使用的便捷性同时为了降低使用seata的门槛,在seata-server利用现有的sofa-jraft+rocksdb构建一个配置中心功能,seata-client直接与seata-server通信,获取seata相关的配置,不需要再去第三方配置中心读取,实现配置中心自闭环。

2. 设计说明

2.1 配置中心

在现有的第三方配置中心实现中,Client端和Server端对于配置中心是解耦的,Client端和Server端直接通过Configuration实例获取配置项,且Configuration对于Client端和Server端的初始化行为是一致的,都是先连接到配置中心中间件然后获取配置,以及添加监听器等。 img

当使用Raft的实现配置中心后,由于所有的配置项信息是保存在Server端的,因此初始化Configuration实例时对于Client端和Server端的行为是不一致的。

此外为保证和原来配置中心的逻辑相同,Client端和Server端获取配置项依旧统一从RaftConfiguration实例中获取,不直接和RocksDB进行交互。 img

img

RaftConfiguration分为Server端和Client端,按照启动环境返回不同配置实例。

public class RaftConfigurationProvider implements ConfigurationProvider {
@Override
public Configuration provide() {
String applicationType = System.getProperty(APPLICATION_TYPE_KEY);
if (APPLICATION_TYPE_SERVER.equals(applicationType)){
return RaftConfigurationServer.getInstance();
}else{
return RaftConfigurationClient.getInstance();
}
}
}

@SpringBootApplication(scanBasePackages = {"org.apache.seata"})
public class ServerApplication {
public static void main(String[] args) throws IOException {
System.setProperty(APPLICATION_TYPE_KEY, APPLICATION_TYPE_SERVER);
// run the spring-boot application
SpringApplication.run(ServerApplication.class, args);
}
}

2.2 配置存储模块

img

抽象设计

为了未来支持和拓展更多的KV内存键值对数据库(如LevelDB,Caffeine),现抽象一个ConfigStoreManager接口以及抽象类AbstractConfigStoreManager,提供如下公共方法:

  • Get:获取指定namespace,dataId中名为key的单一配置项
  • GetAll:获取指定namespace,dataId中全部配置项
  • Put:新增/修改指定namespace,dataId中某一配置项<key,value>
  • Delete:删除指定namespace,dataId中名为key的配置项
  • DeleteAll:删除指定namespace,dataId中全部配置项
  • Clear:清空所有配置
  • GetAllNamespaces:获取所有命名空间
  • GetAllDataIds:获取指定namespace下的所有配置dataIds
  • ...

ConfigStoreManagerFactory、ConfigStoreManagerProvider.java:使用SPI机制实现的配置存储工厂类和提供者。

配置监听

Server端和Client端配置中心需要监听配置项的变化。

在Server端,由于配置本身存储在Server端,我们直接拦截配置变更的方法即可。我们在抽象接口中定义了addConfigListenerremoveConfigListener方法用户添加和删除配置监听器。监听的逻辑由具体的实现类负责。

RocksDBConfigStoreManager中,定义了notifyConfigChange方法来触发监听,当调用写相关操作时(如Put、Delete)触发该方法来通知配置的变更。从而触发回调事件通知Server端配置中心。

在Client端,我们定义了配置版本号长连接机制来实现配置的监听。具体的Client端在启动时,与Server建立长连接并定期刷新该连接。Server端内部维护一个watchMap存放所有客户端的监听信息。每当Raft状态机执行配置更新的操作会发送一个ApplicationEvent事件,该事件被ClusterConfigWatcherManager监听,从而通知watchMap中所有客户端配置变更。此外使用了配置版本号来优化实现,在建立长连接时,客户端需要传入版本号,当版本号低于Server端对应配置的版本号时直接返回最新配置。反之,若Server端配置版本号低于本地,则Client端认为该Server的配置已经过期(可能宕机或集群发生脑裂)会重试请求集群中其他的节点。

多租户方案

在Seata-Server端存放配置时,需要实现多租户的配置隔离,要求不同租户间的配置是相互独立、物理/逻辑上是隔离的。

首先调研了业内使用RocksDB的开源项目的实现,总结如下。

  1. JRaft,单RocksDB实例,两个列族,一个用来存Raft条目,一个用来存元信息。
  2. TiKV,两个RocksDB实例,分别为raftDB,kvDB。kvDB中使用了多个列族存放元数据,用户数据,锁数据等。
  3. Pika,为每个数据结构(String,Hash,List,Set,Zset等)创建了一个RocksDB实例,每个RocksDB实例分别用多个列族存储数据,比如Data、Meta

考虑到无法提前知道租户数量(无法在启动时创建指定数量的RocksDB),因此使用单个RocksDB实例,多列族存储。不同租户使用namespace区分,在Rocksdb中通过列族(ColumnFamily)进行逻辑隔离,一个namespace对应一个列族。列族相当于关系型数据库中表的概念。在配置的增删改查时指定namespace操作具体的列族,实现多租户的隔离。此外名为config_version的列族是内置的,用于对现有的配置进行版本号跟踪。 img

3. 使用方式

3.0 准备配置文件

首先准备好配置文件,具体可以参考:配置文件示例。 并将配置文件置于Seata server项目资源目录下。

3.1 Server端配置

application.yml 中加入Raft配置中心配置,其余配置参考

config:
# support: nacos, consul, apollo, zk, etcd3, raft
type: raft
raft:
db:
type: rocksdb # db类型,目前只支持rocksdb
dir: configStore # db文件存储目录
destroy-on-shutdown: false #应用关闭时是否清除db文件, 默认false
namespace: 'default' # 命名空间
dataId: 'seata.properties' # 配置文件id
file:
name: 'file' # 初始配置文件名

server:
raft:
group: default #此值代表该raft集群的group,client的事务分组对应的值要与之对应
server-addr: 192.168.241.1:9091, 192.168.241.2:9091 ,192.168.241.3:9091 # 其他Raft节点的ip和端口,端口为该节点的netty端口+1000,默认netty端口为8091
snapshot-interval: 600 # 600秒做一次数据的快照,以便raftlog的快速滚动,但是每次做快照如果内存中事务数据过多会导致每600秒产生一次业务rt的抖动,但是对于故障恢复比较友好,重启节点较快,可以调整为30分钟,1小时都行,具体按业务来,可以自行压测看看是否有抖动,在rt抖动和故障恢复中自行找个平衡点
apply-batch: 32 # 最多批量32次动作做一次提交raftlog
max-append-bufferSize: 262144 #日志存储缓冲区最大大小,默认256K
max-replicator-inflight-msgs: 256 #在启用 pipeline 请求情况下,最大 in-flight 请求数,默认256
disruptor-buffer-size: 16384 #内部 disruptor buffer 大小,如果是写入吞吐量较高场景,需要适当调高该值,默认 16384
election-timeout-ms: 1000 #超过多久没有leader的心跳开始重选举
reporter-enabled: false # raft自身的监控是否开启
reporter-initial-delay: 60 # 监控的区间间隔
serialization: jackson # 序列化方式,不要改动
compressor: none # raftlog的压缩方式,如gzip,zstd等
sync: true # raft日志的刷盘方式,默认是同步刷盘

在Seata-Server下,需要一个初始配置文件作为Server端的配置文件(也就是上一步所述的配置文件),file.name配置项需要和该文件名保持一致。在Server初次启动时,会将该配置文件作为Raft配置中心的初始配置。目前支持的文件类型有:conf、yaml、properties、txt。

注意:Raft集群内节点的初始配置文件需要保持一致。

3.2 控制台配置管理界面

当Server端使用Raft模式的配置中心后,可通过在Seata Console中内置的配置管理页面,进行配置中心的管理。用户可以通过在该页面对存储于Seata-Server集群的配置进行增删改查,注意该操作是对于集群生效的,因此可以在集群中的任意节点进行修改,所有操作会通过Raft在集群内进行同步。

注意:该配置管理页面仅在配置中心为Raft模式下开启,对其他类型的配置中心不开放。

3.2.1 配置隔离

Raft配置中心提供了namespace命名空间的机制来实现多租户的配置隔离。不同namespace下的配置通过底层存储机制实现逻辑隔离。在同一namespace下可以有多套配置文件,不同的配置文件用dataId进行区分。一套配置以namespacedataId来唯一标识。

例如:

  • namespace=default(默认),dataId=seata.properties(默认)

  • namespace=dev,dataId=seata-server.properties,dataId=seata-client.yaml

  • namespace=prop,dataId=seata-server.properties,dataId=seata-client.txt

img

3.2.2 配置上传

在Sever启动时,Server端配置的初始文件会自动上传到配置中心。此外用户还可以通过点击"上传(Upload)"按钮上传配置文件到指定的namespacedataId。 当配置上传到Server端配置中心后,Client端就可以通过namespacedataId来获取具体的配置文件了。

img

目前支持上传的配置文件类型有:txt、text、yaml、properties类型。具体的配置文件可参考示例:配置文件示例

3.2.3 配置查询

选择namespacedataId后,可点击"搜索(Search)"按钮查询该配置下的所有配置项信息。

配置以配置项列表的形式呈现,每一行都代表一个配置项,以KeyValue展示。

img

3.2.4 配置删除

当不再需要某一套配置时,用户可以删除指定namespacedataId的配置数据。

注意该操作一旦完成,会清空该配置下的所有配置项信息,且无法恢复。请避免删除正在使用中的配置。

img

3.2.5 配置修改

在配置项列表中,用户可以对该配置下的某个配置项进行新增修改删除操作。操作成功后Server端和Client端会及时收到配置变更,从而获取到最新的值。

  • 新增:在当前配置下添加配置项

img

  • 修改:修改指定配置项的值。

img

  • 删除:删除指定的配置项

img

3.3 Client端配置

Client需要添加如下的配置项。其中raft.server-addr需要和Server端Raft集群的IP地址列表一致。

config:
type: raft # Raft模式
raft:
server-addr: 192.168.241.1:7091, 192.168.241.2:7091 ,192.168.241.3:7091 # 配置raft相关元数据的获取地址
username: 'seata' # 鉴权
password: 'seata' # 鉴权
db:
namespace: 'default' # 命名空间
dataId: 'seata.properties' # 配置文件Id

此外,client需要引入HttpClient依赖,用于通过Http请求向Seata-Server集群获取配置信息

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>

配置完成后,Client应用启动时就会从raft.server-addr配置的Server中订阅并获取指定namespacedataId的配置,并通过监听机制在配置发生变更时获取获取最新配置。

· 阅读需 9 分钟

引言和概述

在分布式系统中,通信协议的设计直接影响系统的可靠性和可扩展性。Apache Seata的RPC通信协议为各组件间的数据传输提供了基础,对这方面的源码分析是深入理解seata的又一个好方式。在最近的2.2.0版本里,我为Seata的通信机制进行了重构,以支持多版本协议的兼容性,现在改造完成了,我将从传输机制和通信协议两个方面去分析新版本里源码。 本文是第一篇,介绍Seata传输机制。

seata里的RPC通信主角是TCTMRM三者,当然过程中还可能会涉及注册中心甚至配置中心等其他网络交互,但这些相对内容所使用的通信机制是相对独立的,本文不作讨论。

接下来我将按照我最早了解源码时的几个直觉提问,带大家进行探索。

Seata中的Netty(谁在传输)

第一个问题:seata通信的底层是什么在进行发送请求报文和接收请求报文?答案是netty,而netty在seata里面是如何工作的呢?我们将去到core包的org.apache.seata.core.rpc.netty去探索

从这个继承关系我们可以看到,AbstractNettyRemoting作为核心的父类,RM和TM和Server(TC)都实现了他,实际上这个类里面已经实现了核心的发送和接收

sendSync实现了同步发送逻辑,异步发送sendAsync的逻辑相似且更简单这里不再重复,只要拿到channel进行发送即可

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
// 此处省略非关键代码

MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);

channelWritableCheck(channel, rpcMessage.getBody());

String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
doBeforeRpcHooks(remoteAddr, rpcMessage);

// netty的写方法(netty write)
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {
messageFuture1.setResultMessage(future.cause());
}
destroyChannel(future.channel());
}
});

try {
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
doAfterRpcHooks(remoteAddr, rpcMessage, result);
return result;
} catch (Exception exx) {
// 此处省略非关键代码
}
}

而接收报文的方式,主要在processMessage方法里,这个方法被AbstractNettyRemotingClient.ClientHandlerAbstractNettyRemotingServer.ServerHandler这两个类的channelRead调用,这两个内部类都是ChannelDuplexHandler的子类,他们各自注册在client和server的Bootstrap里(为什么注册到bootstrap就能进行接收操作?这个要移步netty的原理)

收到信息后就会调进父类的processMessage方法里,我们来看看源码

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
// 此处省略非关键代码
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 这里可读性稍微差点,first是Processor,表示普通处理,而second是线程池,表示池化处理。
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
// 此处省略非关键代码
}
} else {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}

实际上这些processor和executor是client和server注册进来的处理器:下面是一部分的处理器,他们对应着不同的MessageType,以下是部分处理器的注册举例(他们在NettyRemotingServer#registerProcessor)

        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

可以看到这些processor实际上就是seata各种提交回滚等等的处理器

Seata中的NettyChannel(channel怎么管理)

那第二个问题,既然上面是netty依靠着channel在进行着收发,那这个channel怎么来呢?会一直持有吗?如果断了怎么重连?答案在ChannelManager和上面的两个reg的processor

当RM/TM取得了server的地址,进行注册的时候(第一次通信),如果server能成功解析报文并发现是REG信息,就会进入regRmProcessor/regTmProcessor,这里以TM为例子

// server接收 RegTmProcessor
private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();
String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
Version.putChannelVersion(ctx.channel(), message.getVersion());
boolean isSuccess = false;
String errorInfo = StringUtils.EMPTY;
try {
if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) {
// 在ChannelManager中注册channel,这里可以预见到注册之后,server进行sendSync(channel,xxx)的时候就可以拿到这个channel了
ChannelManager.registerTMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());
isSuccess = true;
}
} catch (Exception exx) {
isSuccess = false;
errorInfo = exx.getMessage();
LOGGER.error("TM register fail, error message:{}", errorInfo);
}
RegisterTMResponse response = new RegisterTMResponse(isSuccess);
// 异步回复
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
// 此处省略非关键代码
}

// ChannelManager
public static void registerTMChannel(RegisterTMRequest request, Channel channel)
throws IncompatibleVersionException {
RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
request.getApplicationId(),
request.getTransactionServiceGroup(),
null, channel);
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR + ChannelUtil.getClientIpFromChannel(channel);
ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = CollectionUtils.computeIfAbsent(TM_CHANNELS, clientIdentified, key -> new ConcurrentHashMap<>());
rpcContext.holdInClientChannels(clientIdentifiedMap);
}

在ChannelManager里管理着RM_CHANNELSRM_CHANNELS两个比较复杂的map,特别是RM_CHANNELS里面有4层(resourceId -> applicationId -> ip -> port -> RpcContext)

说完了server对channel的管理,那client呢?这个map管理更简单一些,就是注册成功后在onRegisterMsgSuccess里面也用一个NettyClientChannelManager里registerChannel,后续跟server交互尽量都用这个channel。

那么第三个问题又来了,client的channel不可用了可以自行新建,可是server接收后发现这是新channel怎么办?或者server在异步回复的时候发现channel不可用了怎么办? 答案依然在NettyClientChannelManager,这里面相对复杂的是,client方面需要用到channel的时候,实际上由一个对象池nettyClientKeyPool管理着,这是个apache的objectPool,所以当channel不可用时也会由这个池子去新增并在使用完后入池。这个对象池实际上是一直持有着RegisterTMRequest,跟第一次进来时一样,每次创建需要创建channel的时候,实际上都发生了一次注册

// NettyClientChannelManager
public Channel makeObject(NettyPoolKey key) {
InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("NettyPool create channel to " + key);
}
Channel tmpChannel = clientBootstrap.getNewChannel(address);
Object response;
Channel channelToServer = null;
// key持有RegisterTMRequest
if (key.getMessage() == null) {
throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
}
try {
// 实际上是在进行reg操作
response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());
if (!isRegisterSuccess(response, key.getTransactionRole())) {
rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
} else {
channelToServer = tmpChannel;
// 成功后会入池
rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());
}
}
// 此处省略非关键代码

return channelToServer;
}

最后

这一篇我们了解了seata是怎样借助netty来传输数据的,为了更好的看懂netty处理的全貌,我画了个层级图

上面已经讲了请求发送时,serverHandler/clientHandler和NettyRemoting(包括RM、TM、TC)的处理,知道了从外部到netty处理器再到内部的DefaultCoordinator的过程,但我们还缺Decoder/Encoder没讲,这里面会进行协议的解析/封装,也会进行序列化和反序列化,请看 Seata的RPC通信源码分析02:协议篇

· 阅读需 11 分钟

引言和概述

上一篇Seata的RPC通信源码分析01:传输篇已经介绍了RPC通信的传输机制,这一篇我们继续来看协议部分的内容,把这个图里没解析清楚的encode/decode部分给补充完整。

同样的,我们以提问来深入的方式去探究它。在本文中,我们不仅要了解二进制如何解析成rpcMsg类型,还要知道如何兼容不同版本的协议,那么第一个问题:协议长什么样?

协议结构

上图展示了协议在0.7.1之前和之后的变化,(在ProtocolDecoderV1的注释也可以看到,更旧版本的要看ProtocolV1Decoder),可以看到新版本的有以下这些构成部分

  • magic-code:0xdada
  • protocal-verson:版本号
  • full-length:总长度
  • head-length:头部长度
  • msgtype:消息类型
  • serializer/codecType:序列化方式
  • compress:压缩方式
  • requestid:请求id

这里我们说明一下seata各版本的server之间对协议的处理差异

  • version<0.7.1 : 只能处理v0版本的协议(上图中的上半部分,带有flag段的),无法识别其他版本协议
  • 0.7.1<=version<2.2.0 : 只能处理v1版本的协议(上图中的下半部分),无法识别其他版本协议
  • version>=2.2.0 : 可以同时识别v0和v1版本的协议,并处理

那么2.2.0是怎样做到兼容的呢?先卖个关子,在说明这个之前我们先看看v1的encoder和decoder分别都是怎样运作的。需要注意的是,和之前提到的传输机制一样,协议处理也是client和server共用的,所以下面提到的都是通用逻辑。

从ByteBuf到RpcMessage(Encoder/Decoder做了什么)

先来看ProtocolDecoderV1

    public RpcMessage decodeFrame(ByteBuf frame) {
byte b0 = frame.readByte();
byte b1 = frame.readByte();

// 获取version
byte version = frame.readByte();
// 获取header和body以外的字段
int fullLength = frame.readInt();
short headLength = frame.readShort();
byte messageType = frame.readByte();
byte codecType = frame.readByte();
byte compressorType = frame.readByte();
int requestId = frame.readInt();

ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
rpcMessage.setCodec(codecType);
rpcMessage.setId(requestId);
rpcMessage.setCompressor(compressorType);
rpcMessage.setMessageType(messageType);

// 头部信息
int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
if (headMapLength > 0) {
Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
rpcMessage.getHeadMap().putAll(map);
}

// 如果是心跳信息不需要对body进行序列化
if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {
rpcMessage.setBody(HeartbeatMessage.PING);
} else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
rpcMessage.setBody(HeartbeatMessage.PONG);
} else {
int bodyLength = fullLength - headLength;
if (bodyLength > 0) {
byte[] bs = new byte[bodyLength];
frame.readBytes(bs);
// 根据刚才得到的compressorType来按需做解压处理
Compressor compressor = CompressorFactory.getCompressor(compressorType);
bs = compressor.decompress(bs);
SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec());
if (this.supportDeSerializerTypes.contains(protocolType)) {
// 序列化器,由于这个是v1专用的ProtocolDecoderV1,所以可以直接传入version1
Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1);
rpcMessage.setBody(serializer.deserialize(bs));
} else {
throw new IllegalArgumentException("SerializerType not match");
}
}
}
return rpcMessage.protocolMsg2RpcMsg();
}

由于encode操作正好和decode操作相反,这里不再重复介绍,我们继续看里面的serialize操作。上面的serialize类来自SerializerServiceLoader

    public static Serializer load(SerializerType type, byte version) throws EnhancedServiceNotFoundException {
// 先处理PROTOBUF方式的序列化,通过反射工具取得
if (type == SerializerType.PROTOBUF) {
try {
ReflectionUtil.getClassByName(PROTOBUF_SERIALIZER_CLASS_NAME);
} catch (ClassNotFoundException e) {
throw new EnhancedServiceNotFoundException("'ProtobufSerializer' not found. " +
"Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency ", e);
}
}

String key = serialzerKey(type, version);
// 这里是一个SERIALIZER_MAP,相当于序列化类的缓存。为什么需要缓存,因为SeataSerializer的scope = Scope.PROTOTYPE,防止多次创建类
Serializer serializer = SERIALIZER_MAP.get(key);
if (serializer == null) {
if (type == SerializerType.SEATA) {
// 这里是seata的SPI机制,本文不再往里深入加载类的逻辑,只需要知道去加载Serializer这个接口,并且把version给到了构造方法
serializer = EnhancedServiceLoader.load(Serializer.class, type.name(), new Object[]{version});
} else {
serializer = EnhancedServiceLoader.load(Serializer.class, type.name());
}
SERIALIZER_MAP.put(key, serializer);
}
return serializer;
}

// 这里是SeataSerializer构造方法,里面是单例模式的构造器,因为现在是两个版本各一个类,也可以说是双例
public SeataSerializer(Byte version) {
if (version == ProtocolConstants.VERSION_0) {
versionSeataSerializer = SeataSerializerV0.getInstance();
} else if (version == ProtocolConstants.VERSION_1) {
versionSeataSerializer = SeataSerializerV1.getInstance();
}
if (versionSeataSerializer == null) {
throw new UnsupportedOperationException("version is not supported");
}
}

这样,decoder就得到了一个Serializer,程序运行到rpcMessage.setBody(serializer.deserialize(bs)),我们来看看deserialize是怎样处理的

    public <T> T deserialize(byte[] bytes) {
return deserializeByVersion(bytes, ProtocolConstants.VERSION_0);
}
private static <T> T deserializeByVersion(byte[] bytes, byte version) {
//前面是合法性判断,此处忽略
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
short typecode = byteBuffer.getShort();
ByteBuffer in = byteBuffer.slice();
//创建父类,并根据版本号创建Codec
AbstractMessage abstractMessage = MessageCodecFactory.getMessage(typecode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typecode, version);
//codec的decode操作
messageCodec.decode(abstractMessage, in);
return (T) abstractMessage;
}

很遗憾,这个serialize并没有太多逻辑,关键还是在MessageCodecFactory和Codec,我们继续往里看。可以看到MessageCodecFactory内容不少,但形式单一,都是根据MessageType返回message和codec,所以这里不再展示factory的内容,我们直接看message和codec,也就是messageCodec.decode(abstractMessage, in),虽然codec类型还是很多,但我们可以看到他们的结构都是相似的,逐个字段解析:

    // BranchRegisterRequestCodec的decode,这个请求是注册一个事务分支
public <T> void decode(T t, ByteBuffer in) {
BranchRegisterRequest branchRegisterRequest = (BranchRegisterRequest)t;

// 解析xid
short xidLen = in.getShort();
if (xidLen > 0) {
byte[] bs = new byte[xidLen];
in.get(bs);
branchRegisterRequest.setXid(new String(bs, UTF8));
}
// 解析branchType
branchRegisterRequest.setBranchType(BranchType.get(in.get()));
short len = in.getShort();
if (len > 0) {
byte[] bs = new byte[len];
in.get(bs);
branchRegisterRequest.setResourceId(new String(bs, UTF8));
}
// 解析lockKey
int iLen = in.getInt();
if (iLen > 0) {
byte[] bs = new byte[iLen];
in.get(bs);
branchRegisterRequest.setLockKey(new String(bs, UTF8));
}
// 解析applicationData
int applicationDataLen = in.getInt();
if (applicationDataLen > 0) {
byte[] bs = new byte[applicationDataLen];
in.get(bs);
branchRegisterRequest.setApplicationData(new String(bs, UTF8));
}
}

好了,到这里,我们已经得到了branchRegisterRequest,可以愉快地交给TCInboundHandler处理了。

但是问题又来了,我们只看到client(RM/TM)有以下这种添加encoder/decoder的代码,也就是我们知道client都使用当前版本的encoder/decoder处理:

        bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds()))
.addLast(new ProtocolDecoderV1())
.addLast(new ProtocolEncoderV1());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});

但server如何处理?还有说好的多版本协议呢?

多版本协议(版本识别和绑定)

我们先来看encoder/decoder的一个类图:

ProtocolDecoderV1我们已经分析完了,ProtocolEncoderV1是反向操作,应该比较好理解,至于ProtocolDecoderV0和ProtocolEncoderV0,从图上也可以看到他们和v1是平行关系,除了v0的操作(虽然目前为止我们还没让他派上用场),他们都是netty里典型的encode和decode的子类,但MultiProtocolDecoder又是什么?他是多版本协议的主角,而且在启动的时候已经注册进server的bootstrap。

    protected boolean isV0(ByteBuf in) {
boolean isV0 = false;
in.markReaderIndex();
byte b0 = in.readByte();
byte b1 = in.readByte();
// 实际上,识别协议就靠第3个byte(b2),只要是正常的新版本,b2就是大于0的版本号,而对于0.7以下的版本来说,b2是FLAG的第一位,正好无论是哪种情况他都是0
// v1/v2/v3 : b2 = version
// v0 : b2 = 0 ,1st byte in FLAG(2byte:0x10/0x20/0x40/0x80)
byte b2 = in.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] == b0 && ProtocolConstants.MAGIC_CODE_BYTES[1] == b1 && 0 == b2) {
isV0 = true;
}
// 读完的字节还要吐回去,为了让各版本的decoder能从头解析
in.resetReaderIndex();
return isV0;
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame;
Object decoded;
byte version;
try {
// 识别版本号,获取当前版本号
if (isV0(in)) {
decoded = in;
version = ProtocolConstants.VERSION_0;
} else {
decoded = super.decode(ctx, in);
version = decideVersion(decoded);
}

if (decoded instanceof ByteBuf) {
frame = (ByteBuf) decoded;
ProtocolDecoder decoder = protocolDecoderMap.get(version);
ProtocolEncoder encoder = protocolEncoderMap.get(version);
try {
if (decoder == null || encoder == null) {
throw new UnsupportedOperationException("Unsupported version: " + version);
}
// 首次进来,使用判断好的decoder进行操作
return decoder.decodeFrame(frame);
} finally {
if (version != ProtocolConstants.VERSION_0) {
frame.release();
}
// 首次进来,绑定对应version的encoder和decoder,也就相当于绑定了channel
ctx.pipeline().addLast((ChannelHandler)decoder);
ctx.pipeline().addLast((ChannelHandler)encoder);
if (channelHandlers != null) {
ctx.pipeline().addLast(channelHandlers);
}
// 绑定好之后,将自身移除,后续不再判断
ctx.pipeline().remove(this);
}
}
} catch (Exception exx) {
LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
throw new DecodeException(exx);
}
return decoded;
}

protected byte decideVersion(Object in) {
if (in instanceof ByteBuf) {
ByteBuf frame = (ByteBuf) in;
frame.markReaderIndex();
byte b0 = frame.readByte();
byte b1 = frame.readByte();
// 和isV0方法相似,取第3个byte
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
}

byte version = frame.readByte();
frame.resetReaderIndex();
return version;
}
return -1;
}

通过上面的分析,v0终于派上用场(当有旧版本的client注册时,server就会为其分配低版本的encoder/decoder),我们也摸清了多版本协议如何识别、如何绑定。

· 阅读需 10 分钟

背景

随着 Seata 项目的不断发展和壮大,我们的贡献者群体也在持续扩大。项目的功能不断增强,对于代码质量的要求也在提高。在这个过程中,我们期望每一位贡献者在提交功能代码的同时,能够附带规范、完备的测试用例。

一个优秀的项目,其完备的单元测试是基本保障。Test-Driven Development(TDD)理念已经提出多年,它强调在编写功能代码之前先编写测试用例。通过编写单元测试,开发者可以更深入地理解代码中相关类和方法的作用,掌握核心逻辑,熟悉各种场景的运行情况。同时,单元测试也为开源项目提供了稳定安全的保障,使得项目在接受贡献者代码时,能够确保代码的质量和稳定性。 单元测试是质量保障的第一环,有效的单元测试能够提前发现90%以上的代码Bug问题,同时也能防止代码的腐化。在项目重构和演进过程中,单元测试起到了至关重要的作用,它能够确保重构后的代码仍然能够正常工作,不会引入新的Bug。

在社区看来,贡献合理的测试用例代码和贡献功能代码同样重要,为了帮助开发者编写出高质量的测试用例,本文给出一些基础的规范和建议。

推荐的框架

当前社区使用以下三个框架编写测试用例;

junit5

junit是Java中最常用的单元测试框架,用于编写和运行可重复的测试用例。

        <junit-jupiter.version>5.8.2</junit-jupiter.version>
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>${junit-jupiter.version}</version>
</dependency>

mockito

mockito是一个mock框架,主要是用来做mock测试,他可以模拟任何 Spring管理的 bean、模拟方法的返回值、模拟抛出异常等,可以让我们在缺乏一些依赖的情况下,完成测试及验证。

        <mockito.version>4.11.0</mockito.version>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
</dependency>

assertj

assertj是一个断言库,提供了一组易于使用和可读性很强的断言方法,当junit的断言难以满足时,可以使用assertj进行断言;

请注意:我们在seata-dependencies的pom.xml中统一管理了这三个库的版本。

        <assertj-core.version>3.12.2</assertj-core.version>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
</dependency>

规范

我们参考阿里巴巴JAVA开发手册,整理了一些建议及规范,分为不同的级别,其中【【强制】部分,开发者需要严格遵守,社区在合并代码时会按照强制规则进行review,【【推荐】【参考】部分,方便大家更好的了解我们对于测试用例的考量和原则。

1.【强制】单元测试必须遵守 AIR 原则。

说明:好的单元测试宏观上来说,具有自动化、独立性、可重复执行的特点。

  • A:Automatic(自动化)
  • I:Independent(独立性)
  • R:Repeatable(可重复)
2.【强制】单元测试应该是全自动执行的,并且非交互式的。

测试用例通常是被定期执行的,执行过程必须完全自动化才有意义。输出结果需要人工检查的测试不是一个好的单元测试。单元测试中不准使用 System.out 来进行人肉验证,必须使用 assert 来验证。

3.【强制】保持单元测试的独立性。为了保证单元测试稳定可靠且便于维护,单元测试用例之间决不能互相调用,也不能依赖执行的先后次序。

反例:method2 需要依赖 method1 的执行,将执行结果作为 method2 的输入。

4.【强制】单元测试是可以重复执行的,不能受到外界环境的影响。

说明:单元测试通常会被放到持续集成中,每次有代码 check in 时单元测试都会被执行。如果单测对外部环境(网络、服务、中间件等)有依赖,容易导致持续集成机制的不可用。

正例:为了不受外界环境影响,要求设计代码时就把 SUT 的依赖改成注入,在测试时用 spring 这样的 DI框架注入一个本地(内存)实现或者 Mock 实现。

5.【强制】对于单元测试,要保证测试粒度足够小,有助于精确定位问题。单测粒度至多是类级别,一般是方法级别。

说明:只有测试粒度小才能在出错时尽快定位到出错位置。单测不负责检查跨类或者跨系统的交互逻辑,那是集成测试的领域。

6.【强制】核心业务、核心应用、核心模块的增量代码确保单元测试通过。

说明:新增代码及时补充单元测试,如果新增代码影响了原有单元测试,请及时修正。

7.【强制】单元测试代码必须写在如下工程目录:src/test/java,不允许写在业务代码目录下。

说明:源码编译时会跳过此目录,而单元测试框架默认是扫描此目录。

8.【强制】单元测试的基本目标:语句覆盖率达到 70%;核心模块的语句覆盖率和分支覆盖率都要达到 100%。

说明:在工程规约的应用分层中提到的 DAO 层,Manager 层,可重用度高的 Service,都应该进行单元测试。

9.【推荐】编写单元测试代码遵守 BCDE 原则,以保证被测试模块的交付质量。
  • B:Border,边界值测试,包括循环边界、特殊取值、特殊时间点、数据顺序等。
  • C:Correct,正确的输入,并得到预期的结果。
  • D:Design,与设计文档相结合,来编写单元测试。
  • E:Error,强制错误信息输入(如:非法数据、异常流程、业务允许外等),并得到预期的结果。
10.【推荐】对于数据库相关的查询,更新,删除等操作,不能假设数据库里的数据是存在的,或者直接操作数据库把数据插入进去,请使用程序插入或者导入数据的方式来准备数据。

反例:删除某一行数据的单元测试,在数据库中,先直接手动增加一行作为删除目标,但是这一行新增数据并不符合业务插入规则,导致测试结果异常。

11.【推荐】和数据库相关的单元测试,可以设定自动回滚机制,不给数据库造成脏数据。或者对单元测试产生的数据有明确的前后缀标识。
12.【推荐】对于不可测的代码在适当的时机做必要的重构,使代码变得可测,避免为了达到测试要求而书写不规范测试代码。
13.【推荐】单元测试作为一种质量保障手段,在提交pr前完成单元测试的编写及验证。
14.【参考】为了更方便地进行单元测试,业务代码应避免以下情况:
  • 构造方法中做的事情过多。
  • 存在过多的全局变量和静态方法。
  • 存在过多的外部依赖。
  • 存在过多的条件语句。 说明:多层条件语句建议使用卫语句、策略模式、状态模式等方式重构。

· 阅读需 20 分钟

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在今年的开源之夏活动中,我加入了 Apache Seata (Incubator) 社区,完成了开源之夏的课题,并从此一直积极参与社区。我有幸在云栖大会-开发者秀场上分享了我的开发者经验。在本文中,我将与大家分享我在 Seata 社区中的开发者之旅,以及在这个旅程中积累的经验和见解。希望通过我的故事,能够激励更多人踏上这充满挑战和激励的开源之路,为开源社区的繁荣做出自己的贡献。

相关背景

在正式介绍我的经历之前,我想先提供一些相关的背景信息,以解释为什么我要参与开源以及如何参与开源。关于参与开源的原因,我相信每个人都有不同的动机。以下是我认为一些主要的原因:

  • 学习:参与开源使我们有机会为不同组织开发的开源项目做出贡献,与行业专家互动,提供了学习的机会。
  • 技能提升:以我为例,我通常使用 Java 和 Python 进行后端开发。但在参与Seata项目时,我有机会学习Go语言,拓宽了我的后端技术栈。此外作为学生,我很难接触到生产级框架或应用,而开源社区为我提供了这个机会。
  • 兴趣:我身边的朋友都是热衷于开源的,他们享受编程,对开源充满热情。
  • 求职:参与开源可以丰富我们的作品集,为简历增加分量。
  • 工作需求:有时参与开源是为了解决工作中遇到的问题或满足工作需求。

这些都是参与开源的原因,对我来说,学习、技能提升和兴趣是我参与开源的主要动机。无论你是在校学生还是在职人员,如果你有参与开源的意愿,不要犹豫,任何人都可以为开源项目做出贡献。年龄、性别、工作和所在地都不重要,关键是你的热情和对开源项目的好奇心。

我参与开源的契机是参加了中科院软件所举办的开源之夏活动。

开源之夏是一个面向高校开发者的开源活动,社区发布开源项目,学生开发者在导师的指导下完成项目的开发,结项成果贡献给社区,合入社区仓库,获得项目奖金和证书。开源之夏是踏入开源社区的一个绝佳契机,也是我第一次比较正式地接触开源项目,而这个经历为我打开了一扇全新的大门。自此我深刻地认识到参与开源项目的建设,分享自己的技术成果,让更多的开发者能够使用你所贡献的东西,是一件极富乐趣和意义的事情。

下面我分享的这张图片是开源之夏官方公开的数据,从 2020 年开始参与的社区数量还有学生数量都在逐年增加,活动也是越办越好。可以看到今年的参与的社区项目共有 133 个,每个社区又提供了若干个课题,而每位学生只能选择一个课题。想要在这么多个社区中找到想要参与的社区和适合自己的课题是一个相对复杂的任务。

img

综合考虑社区的活跃程度、技术栈契合度、新人引导情况等,最终我选择加入 Seata 社区。

Seata 是一款开源的分布式事务框架,提供了完整的分布式事务解决方案,包括 AT、TCC、Saga 和 XA 事务模式,可支持多种编程语言和数据存储方案。从 19 年开源起到今年已经走过了 5 个年头,社区中有超过 300 多位贡献者,项目收获了 24k+ 星标,是一个非常成熟的社区。同时 Seata 兼容 10 余种主流 RPC 框架和 RDBMS,与 20 多个社区存在集成和被集成的关系,被几千家客户应用到业务系统中,可以说是分布式事务解决方案的事实标准。

img

2023 年 10 月 29 日,Seata 正式捐赠给了 Apache 软件基金会,成为孵化项目。经过孵化之后,Seata将有望成为首个 Apache 软件基金会的分布式事务框架顶级项目。这次捐赠也将推动 Seata 更广泛地发展,对生态系统的建设产生深远的影响,从而使更多的开发者受益。这个重要的里程碑也为 Seata 带来更广阔的发展空间。

开发之旅

介绍完了一些基本情况,后文中我将分享我在 Seata 社区的开发之旅。

在正式开始开发之前,我进行了许多准备工作。因为 Seata 已经经历了五年的发展,积累了数十万行代码,因此直接参与开发需要一定的上手成本。我分享了一些准备经验,希望能够为大家提供一些启发。

  1. 文档和博客是第一手材料 文档和博客这类的文本材料可以帮助社区新人迅速了解项目背景和代码结构。 首先,官方文档是最主要的参考资料,从这里可以了解到一切官方认为你需要了解的东西。 img 博客,仅次于官方文档的材料,一般是开发者或者是深度用户编写的,和文档不同的点在于博客可能会更深入到某个专项上去介绍,比如一些项目的理论模型、项目结构、某个模块的源码分析等等。 img 公众号,和博客类似,一般是偏技术性的文章,公众号还有个优点是可以订阅推送,利用碎片时间阅读一些技术。 img 此外,开源社区的一些在线分享或线下 Meetup 公开的幻灯片也是非常有意义的文本资料。 img 除了官方资料之外,还有许多第三方资料可供学习,比如可以通过用户分享的 use cases 了解项目的具体实施和实践;通过第三方社区的集成文档了解项目的生态;还有就是通过第三方的视频教程来学习。但在所有这些资料中,我认为官方文档和博客是最有帮助的。
  2. 熟悉使用框架 当然刚才说的这些文本资料肯定不需要面面俱到的看完,纸上得来终觉浅,看到感觉差不多明白了就可以去实践了。可以按照官方文档的"Get Started"章节逐步了解项目的基本流程。另一种方法是查找官方提供的示例或演示,构建并运行它们,理解代码和配置的含义,并通过使用项目了解项目的需求、目标以及现有功能和架构。 例如,Seata有一个名为 seata-samples 的仓库,其中包含20多种用例,比如 Seata 和 Dubbo 集成,和 SCA, Nacos 集成的案例,基本可以覆盖到支持的所有场景。
  3. 粗略阅读源代码把握主要逻辑 在准备阶段,粗略地阅读源代码以把握项目的主要逻辑也很重要。了解如何高效地把握项目的主要内容是一个需要长期积累的技能。首先,通过前述的准备步骤,了解项目的概念、交互和流程模型是很有帮助的。 以Seata为例,通过官方文档和实际操作,可以了解Seata事务领域的三个角色:TC(Transaction Coordinator)、TM(Transaction Manager)和 RM(Resource Manager)。TC 作为独立部署的 Server 用于维护全局和分支事务的状态,是 Seata 实现高可用的关键;TM 用于与 TC 交互,定义全局事务的开始、提交或回滚;RM 用于管理分支事务处理的资源,与 TC 交互以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。粗略地了解这些角色之间的交互后,可以更轻松地把握项目的主要逻辑。 img 脑海里刻下了这些模型的印象,对源码的主干提取就相对得心应手了一些。比如 Seata TC 事务协调者,作为 Server 端,是一个独立于业务部署的单独应用。那为了分析源码,就可以直接在本地把 server 起起来,通过启动类开始追踪。可以分析到一些初始化的逻辑比如服务注册、全局锁的初始化等等。还有可以通过 RPC 的调用来追踪到交互逻辑的代码,比如 TC 是如何对全局事务和分支事务进行持久化,如何驱动全局事务提交或者回滚的。 然而内嵌客户端的框架代码,没有一个启动类入口可以入手分析。那其实可以从一个 sample 入手,找到其对框架代码的引用从而进行阅读。比如 Seata 一个很重要的注解是 GlobalTransaction,用于标识一个全局事务。想要知道 TM 是如何对这个注解分析的,那我们通过 IDE 的搜索功能,找到 GlobalTransaction 的拦截器即可分析其中的逻辑。 还有一个小 tips 分享给大家,往往来说单测注重于单一模块的职能,可以通过阅读单测可以了解一个模块的输入输出、逻辑边界,也可以顺着单测的调用链去阅读代码,也是理解源码一个很重要的手段。

万事俱备只欠东风,做完充足的准备,下一步就是区积极参与到社区之中。

参与的方式也有很多种,最常见的参与方式是查看项目的 Issues 列表,社区通常会为新贡献者标记一些带有特殊标签的 Issue,如“good-first-issue”、“contributions-welcome”和“help-wanted”等。可以通过这些标签筛选感兴趣的任务。

img

除了 Issues,GitHub 还提供了讨论的功能,可以参与一些公开的讨论并获取新的想法。

img

此外,社区通常会定期举行会议,比如周会或双周会,可以通过参加这些会议来了解社区的最新进展,提出问题以及与其他社区成员交流。

总结与心得

我加入 Seata 社区最初是通过开源之夏活动。我完成了我的课题,为 Seata Saga 实现了一些新的功能,也做了一系列的优化。但我不止于此,因为在 Seata 的开源经历中我获得了学生生涯中最宝贵的一次开发者体验,在之后的时间我也持续通过上述参与方式持续活跃在社区中。这主要得益于以下几个方面:

  1. 沟通与社交:导师制度为我提供了重要的支持。在开发过程中,我与我的导师亦夏之间的密切合作对我适应社区文化和工作流程起到了关键作用。他不仅帮助我适应了社区,还为我提供了程序设计的思路,也与我分享了一些在工作中的经验和见解,这些都对我的发展非常有帮助。此外,Seata 社区创始人清铭也提供了很多帮助,包括建立了与其他同学的联系,帮助我进行 Code Review,也为我提供了许多机会。
  2. 正反馈:在 Seata 的开发过程中,我经历了一个良性的循环。许多细节为我提供了许多正反馈,例如我的贡献能被用户广泛使用和受益,比如开发得到了社区的认可。这些正反馈加强了我继续在 Seata 社区贡献的意愿。
  3. 技能提升:再就是参与 Seata 开发,对我能力的提升也是巨大的。在这里,我能学习到生产级别的代码,包括性能优化,接口设计,边界判断的技巧。可以直接参与一个开源项目的运作,包括项目计划,安排,沟通等。当然还了解一个分布式事务框架是如何设计并实现的。

除了这些宝贵的开发者体验,我也从这次经历中体悟到了一些关于参与开源的个人心得,为激励其他有兴趣参与开源社区的同学,我做了简单的总结:

  1. 了解和学习社区文化和价值观:每个开源社区都有不同的文化和价值观。了解社区的文化和价值观对于成功参与社区至关重要。观察和了解社区其他成员的日常开发和交流方式是学习社区文化的好方法。在社区中要尊重他人的意见和包容不同的观点。
  2. 敢于迈出第一步:不要害怕面对困难,迈出第一步是参与开源社区的关键。可以通过领取标有"good-first-issue"等标签的 Issue,编写文档、单元测试等方式来开始。重要的是要克服畏难情绪,积极尝试并学习。
  3. 对自己的工作要充满信心:不要怀疑自己的能力。每个人都是从零开始的,没有人天生就是专家。参与开源社区是一个学习和成长的过程,需要不断的实践和积累经验。
  4. 积极参与讨论,持续学习不同技术:不要害怕提出问题,无论是关于项目的具体技术还是开发过程中的挑战。同时也不要局限于一个领域。尝试学习和掌握不同编程语言、框架和工具,这可以拓宽技术视野,为项目提供有价值的洞见。

通过我的开源之旅,我积累了宝贵的经验和技能,这些不仅帮助我成长为一个更有价值的开发者,也让我深刻地了解了开源社区的力量。然而,我不仅仅是个别的参与者,我代表着 Seata 社区的一部分。Seata 作为一个正在不断成长和演变的开源项目,有着巨大的潜力,同时也面临着新的挑战。因此我要强调 Seata 社区的重要性和未来的潜力,它已经进入 Apache 软件基金会的孵化阶段,这个重要的里程碑将为 Seata 带来更广阔的发展空间。Seata 欢迎更多的开发者和贡献者的加入,让我们共同推动这个开源项目的发展,为分布式事务领域的进步贡献一份力量。

· 阅读需 24 分钟

Seata 是一款开源的分布式事务解决方案,star高达24000+,社区活跃度极高,致力于在微服务架构下提供高性能和简单易用的分布式事务服务.

目前Seata的分布式事务数据存储模式有file,db,redis,而本篇文章将Seata-Server Raft模式的架构,部署使用,压测对比,及为什么Seata需要Raft,并领略从调研对比,设计,到具体实现,再到知识沉淀的过程.

分享人:陈健斌(funkye) github id: funky-eyes

2. 架构介绍

2.1 Raft 模式是什么?

首先需要明白什么是raft分布式一致性算法,这里直接摘抄sofa-jraft官网的相关介绍:

RAFT 是一种新型易于理解的分布式一致性复制协议,由斯坦福大学的 Diego Ongaro 和 John Ousterhout 提出,作为 RAMCloud 项目中的中心协调组件。Raft 是一种 Leader-Based 的 Multi-Paxos 变种,相比 Paxos、Zab、View Stamped Replication 等协议提供了更完整更清晰的协议描述,并提供了清晰的节点增删描述。 Raft 作为复制状态机,是分布式系统中最核心最基础的组件,提供命令在多个节点之间有序复制和执行,当多个节点初始状态一致的时候,保证节点之间状态一致。

简而言之Seata的Raft模式就是基于Sofa-Jraft组件实现可保证Seata-Server自身的数据一致性和服务高可用.

2.2 为什么需要raft模式

看完上述的Seata-Raft模式是什么的定义后,是否就有疑问,难道现在Seata-Server就无法保证一致性和高可用了吗?那么下面从一致性和高可用来看看目前Seata-Server是如何做的.

2.2.1 现有存储模式

在当前的 Seata 设计中,Server 端的作用是保证事务的二阶段被正确执行。然而,这取决于事务记录的正确存储。为确保事务记录不丢失,需要在保持状态正确的前提下,驱动所有的 Seata-RM 执行正确的二阶段行为。那么,Seata 目前是如何存储事务状态和记录的呢?

首先介绍一下 Seata 支持的三种事务存储模式:file、db 和 redis。根据一致性的排名,db 模式下的事务记录可以得到最好的保证,其次是 file 模式的异步刷盘,最后是 redis 模式下的 aof 和 rdb

顾名思义:

  • file 模式是 Seata 自实现的事务存储方式,它以顺序写的形式将事务信息存储到本地磁盘上。为了兼顾性能,默认采用异步方式,并将事务信息存储在内存中,确保内存和磁盘上的数据一致性。当 Seata-Server(TC)意外宕机时,在重新启动时会从磁盘读取事务信息并恢复到内存中,以便继续运行事务上下文。
  • db 是 Seata 的抽象事务存储管理器(AbstractTransactionStoreManager)的另一种实现方式。它依赖于数据库,如 PostgreSQL、MySQL、Oracle 等,在数据库中进行事务信息的增删改查操作。一致性由数据库的本地事务保证,数据也由数据库负责持久化到磁盘。
  • redis 和 db 类似,也是一种事务存储方式。它利用 Jedis 和 Lua 脚本来进行事务的增删改查操作,部分操作(如竞争锁)在 Seata 2.x 版本中全部采用了 Lua 脚本。数据的存储与 db 类似,依赖于存储方(Redis)来保证数据的一致性。与 db 类似,redis 在 Seata 中采用了计算和存储分离的架构设计.

2.2.2 高可用

高可用简单理解就是集群能够在主节点宕机后继续正常运行,常见的方式是通过部署多个提供相同服务的节点,并通过注册中心实时感知主节点的上下线情况,以便及时切换到可用的节点。

看起来似乎只需要加几台机器进行部署,但实际上背后存在一个问题,即如何确保多个节点像一个整体一样运作。如果其中一个节点宕机,另一个节点能够完美接替宕机节点的工作,包括处理宕机节点的数据。解决这个问题的答案其实很简单,在计算与存储分离的架构下,只需将数据存储在共享的存储中间件中,任何一个节点都可以通过访问该公共存储区域获取所有节点操作的事务信息,从而实现高可用的能力。

然而,前提条件是计算与存储必须分离。为什么计算与存储一体化设计不可行呢?这就要说到 File 模式的实现了。如之前描述的,File 模式将数据存储在本地磁盘和节点内存中,数据写操作没有任何同步,这意味着目前的 File 模式无法实现高可用,仅支持单机部署。作为初级的快速入门和简单使用而言,File 模式适用性较低,高性能的基于内存的 File 模式也基本上不再被生产环境使用。

2.3 Seata-Raft是如何设计的呢?

2.3.1 设计原理

Seata-Raft模式的设计思路是通过封装无法高可用的file模式,利用Raft算法实现多个TC之间数据的同步。该模式保证了使用file模式时多个TC的数据一致性,同时将异步刷盘操作改为使用Raft日志和快照进行数据恢复。 流程图

在Seata-Raft模式中,client端在启动时会从配置中心获取当前client的事务分组(例如default)以及相关Raft集群节点的IP地址。通过向Seata-Server的控制端口发送请求,client可以获取到default分组对应的Raft集群的元数据,包括leader、follower和learner成员节点。然后,client会监视(watch)非leader节点的任意成员节点。

假设TM开始一个事务,并且本地的metadata中的leader节点指向了TC1的地址,那么TM只会与TC1进行交互。当TC1添加一个全局事务信息时,通过Raft协议,即图中标注为步骤1的日志发送,TC1会将日志发送给其他节点,步骤2是follower节点响应日志接收情况。当超过半数的节点(如TC2)接受并响应成功时,TC1上的状态机(FSM)将执行添加全局事务的动作。

watch watch2

如果TC1宕机或发生重选举,会发生什么呢?由于首次启动时已经获取到了元数据,client会执行watch follower节点的接口来更新本地的metadata信息。因此,后续的事务请求将发送到新的leader(例如TC2)。同时,TC1的数据已经被同步到了TC2和TC3,因此数据一致性不会受到影响。只在选举发生的瞬间,如果某个事务正好发送给了旧的leader,该事务会被主动回滚,以确保数据的正确性。

需要注意的是,在该模式下,如果事务处于决议发送请求或一阶段流程还未走完的时刻,并且恰好在选举时发生,这些事务会被主动回滚。因为RPC节点已经宕机或发生了重选举,当前没有实现RPC重试。TM侧默认有5次重试机制,但由于选举需要大约1s-2s的时间,这些处于begin状态的事务可能无法成功决议,因此会优先回滚,释放锁,以避免影响其他业务的正确性。

2.3.2 故障恢复

在Seata中,当TC发生故障时,数据恢复的过程如下:

故障恢复

如上图所示

  • 检查是否存在最新的数据快照:首先,系统会检查是否存在最新的数据快照文件。数据快照是基于内存的数据状态的一次全量拷贝,如果有最新的数据快照,则系统将直接加载该快照到内存中。

  • 根据快照后的Raft日志进行回放:如果存在最新的快照或者没有快照文件,系统将根据之前记录的Raft日志进行数据回放。每个Seata-Server中的请求最终会经过ServerOnRequestProcessor进行处理,然后转移到具体的协调者类(DefaultCoordinator或RaftCoordinator)中,再转向具体的业务代码(DefaultCore)进行相应的事务处理(如begin、commit、rollback等)。

  • 当日志回放完成后,便会由leader发起日志的同步,并继续执行相关事务的增删改动作。f

通过以上步骤,Seata能够实现在故障发生后的数据恢复。首先尝试加载最新的快照,如果有的话可以减少回放的时间;然后根据Raft日志进行回放,保证数据操作的一致性;最后通过日志同步机制,确保数据在多节点之间的一致性。

2.3.3 业务处理同步过程

流程 对于client侧获取最新metadata时恰好有业务线程在执行begin、commit或registry等操作的情况,Seata采取了以下处理方式:

  • client侧:

    • 如果客户端正在执行begin、commit或registry等操作,并且此时需要获取最新metadata,由于此时的leader可能已经不存在或不是当前leader,因此客户端的RPC请求可能会失败。
    • 如果请求失败,客户端会收到异常响应,此时客户端需要根据请求的结果进行回滚操作。
  • TC侧对旧leader的检测:

    • 在TC侧,如果此时客户端的请求到达旧的leader节点,TC会进行当前是否是leader的检测,如果不是leader,则会拒绝该请求。
    • 如果是leader但在中途失败,比如在提交任务到状态机的过程中失败,由于当前已经不是leader,创建任务(createTask)的动作会失败。这样,客户端也会接收到响应异常。
    • 旧leader的提交任务也会失败,确保了事务信息的一致性。 通过上述处理方式,当客户端获取最新metadata时恰好遇到业务操作的情况,Seata能够保证数据的一致性和事务的正确性。如果客户端的RPC请求失败,将触发回滚操作;而在TC侧,对旧leader的检测和任务提交的失败可以防止事务信息不一致的问题。这样,客户端的数据也能保持一致性。

3.使用部署

在使用和部署上,社区秉持着最小侵入,最小改动的原则,所以整体的部署上手应该是非常简单的,接下来分开client与server两端的部署改动点进行介绍

3.1 client

首先,使用注册配置中心较多的同学应该知道Seata的配置项中有一个seata.registry.type的配置项,支持了nacos,zk,etcd,redis等等,而在2.0以后增加了一个raft的配置项

   registry:
type: raft
raft:
server-addr: 192.168.0.111:7091, 192.168.0.112:7091, 192.168.0.113:7091

registry.type 改为raft,并配置raft相关元数据的获取地址,该地址统一为seata-server的ip+http端口 然后必不可少的就是传统的事务分组的配置

seata:
tx-service-group: default_tx_group
service:
vgroup-mapping:
default_tx_group: default

如现在使用的事务分组为default_tx_group,那么对应的seata集群/分组就是default,这个是有对应关系的,后续再server部署环节上会介绍 至此client的改动已经完成了

3.2 server

对于server的改动可能会多一些,要熟悉一些调优参数和配置,当然也可以选择默认值不做任何修改

seata:
server:
raft:
group: default #此值代表该raft集群的group,client的事务分组对应的值要与之对应
server-addr: 192.168.0.111:9091,192.168.0.112:9091,192.168.0.113:9091 # 3台节点的ip和端口,端口为该节点的netty端口+1000,默认netty端口为8091
snapshot-interval: 600 # 600秒做一次数据的快照,以便raftlog的快速滚动,但是每次做快照如果内存中事务数据过多会导致每600秒产生一次业务rt的抖动,但是对于故障恢复比较友好,重启节点较快,可以调整为30分钟,1小时都行,具体按业务来,可以自行压测看看是否有抖动,在rt抖动和故障恢复中自行找个平衡点
apply-batch: 32 # 最多批量32次动作做一次提交raftlog
max-append-bufferSize: 262144 #日志存储缓冲区最大大小,默认256K
max-replicator-inflight-msgs: 256 #在启用 pipeline 请求情况下,最大 in-flight 请求数,默认256
disruptor-buffer-size: 16384 #内部 disruptor buffer 大小,如果是写入吞吐量较高场景,需要适当调高该值,默认 16384
election-timeout-ms: 1000 #超过多久没有leader的心跳开始重选举
reporter-enabled: false # raft自身的监控是否开启
reporter-initial-delay: 60 # 监控的区间间隔
serialization: jackson # 序列化方式,不要改动
compressor: none # raftlog的压缩方式,如gzip,zstd等
sync: true # raft日志的刷盘方式,默认是同步刷盘
config:
# support: nacos, consul, apollo, zk, etcd3
type: file # 该配置可以选择不同的配置中心
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa
type: file # raft模式下不允许使用非file的其他注册中心
store:
# support: file 、 db 、 redis 、 raft
mode: raft # 使用raft存储模式
file:
dir: sessionStore # 该路径为raftlog及事务相关日志的存储位置,默认是相对路径,最好设置一个固定的位置

在3个或者大于3个节点的seata-server中配置完以上参数后,直接启动便可以看到类似以下的日志输出,就代表集群已经正常启动了

2023-10-13 17:20:06.392  WARN --- [Rpc-netty-server-worker-10-thread-1] [com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory] [ensurePipeline] []: JRaft SET bolt.rpc.dispatch-msg-list-in-default-executor to be false for replicator pipeline optimistic.
2023-10-13 17:20:06.439 INFO --- [default/PeerPair[10.58.16.231:9091 -> 10.58.12.217:9091]-AppendEntriesThread0] [com.alipay.sofa.jraft.storage.impl.LocalRaftMetaStorage] [save] []: Save raft meta, path=sessionStore/raft/9091/default/raft_meta, term=4, votedFor=0.0.0.0:0, cost time=25 ms
2023-10-13 17:20:06.441 WARN --- [default/PeerPair[10.58.16.231:9091 -> 10.58.12.217:9091]-AppendEntriesThread0] [com.alipay.sofa.jraft.core.NodeImpl] [handleAppendEntriesRequest] []: Node <default/10.58.16.231:9091> reject term_unmatched AppendEntriesRequest from 10.58.12.217:9091, term=4, prevLogIndex=4, prevLogTerm=4, localPrevLogTerm=0, lastLogIndex=0, entriesSize=0.
2023-10-13 17:20:06.442 INFO --- [JRaft-FSMCaller-Disruptor-0] [io.seata.server.cluster.raft.RaftStateMachine] [onStartFollowing] []: groupId: default, onStartFollowing: LeaderChangeContext [leaderId=10.58.12.217:9091, term=4, status=Status[ENEWLEADER<10011>: Raft node receives message from new leader with higher term.]].
2023-10-13 17:20:06.449 WARN --- [default/PeerPair[10.58.16.231:9091 -> 10.58.12.217:9091]-AppendEntriesThread0] [com.alipay.sofa.jraft.core.NodeImpl] [handleAppendEntriesRequest] []: Node <default/10.58.16.231:9091> reject term_unmatched AppendEntriesRequest from 10.58.12.217:9091, term=4, prevLogIndex=4, prevLogTerm=4, localPrevLogTerm=0, lastLogIndex=0, entriesSize=0.
2023-10-13 17:20:06.459 INFO --- [Bolt-default-executor-4-thread-1] [com.alipay.sofa.jraft.core.NodeImpl] [handleInstallSnapshot] []: Node <default/10.58.16.231:9091> received InstallSnapshotRequest from 10.58.12.217:9091, lastIncludedLogIndex=4, lastIncludedLogTerm=4, lastLogId=LogId [index=0, term=0].
2023-10-13 17:20:06.489 INFO --- [Bolt-conn-event-executor-13-thread-1] [com.alipay.sofa.jraft.rpc.impl.core.ClientServiceConnectionEventProcessor] [onEvent] []: Peer 10.58.12.217:9091 is connected
2023-10-13 17:20:06.519 INFO --- [JRaft-Group-Default-Executor-0] [com.alipay.sofa.jraft.util.Recyclers] [<clinit>] []: -Djraft.recyclers.maxCapacityPerThread: 4096.
2023-10-13 17:20:06.574 INFO --- [JRaft-Group-Default-Executor-0] [com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage] [destroySnapshot] []: Deleting snapshot sessionStore/raft/9091/default/snapshot/snapshot_4.
2023-10-13 17:20:06.574 INFO --- [JRaft-Group-Default-Executor-0] [com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage] [close] []: Renaming sessionStore/raft/9091/default/snapshot/temp to sessionStore/raft/9091/default/snapshot/snapshot_4.
2023-10-13 17:20:06.689 INFO --- [JRaft-FSMCaller-Disruptor-0] [io.seata.server.cluster.raft.snapshot.session.SessionSnapshotFile] [load] []: on snapshot load start index: 4
2023-10-13 17:20:06.694 INFO --- [JRaft-FSMCaller-Disruptor-0] [io.seata.server.cluster.raft.snapshot.session.SessionSnapshotFile] [load] []: on snapshot load end index: 4
2023-10-13 17:20:06.694 INFO --- [JRaft-FSMCaller-Disruptor-0] [io.seata.server.cluster.raft.RaftStateMachine] [onSnapshotLoad] []: groupId: default, onSnapshotLoad cost: 110 ms.
2023-10-13 17:20:06.694 INFO --- [JRaft-FSMCaller-Disruptor-0] [io.seata.server.cluster.raft.RaftStateMachine] [onConfigurationCommitted] []: groupId: default, onConfigurationCommitted: 10.58.12.165:9091,10.58.12.217:9091,10.58.16.231:9091.
2023-10-13 17:20:06.705 INFO --- [JRaft-FSMCaller-Disruptor-0] [com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl] [onSnapshotLoadDone] []: Node <default/10.58.16.231:9091> onSnapshotLoadDone, last_included_index: 4
last_included_term: 4
peers: "10.58.12.165:9091"
peers: "10.58.12.217:9091"
peers: "10.58.16.231:9091"

2023-10-13 17:20:06.722 INFO --- [JRaft-Group-Default-Executor-1] [com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage] [lambda$truncatePrefixInBackground$2] []: Truncated prefix logs in data path: sessionStore/raft/9091/default/log from log index 1 to 5, cost 0 ms.

3.3 faq

  • seata.raft.server-addr配置好后,必须通过server的openapi进行集群的扩缩容,直接改动该配置进行重启是不会生效的 接口为/metadata/v1/changeCluster?raftClusterStr=新的集群列表
  • 如果server-addr:中的地址都为本机,那么需要根据本机上不同的server的netty端口增加1000的偏移量,如server.port: 7092那么netty端口为8092,raft选举和通信端口便为9092,需要增加启动参数-Dserver.raftPort=9092. Linux下可以通过export JAVA_OPT="-Dserver.raftPort=9092"等方式指定。

4.压测对比

压测对比分为两种场景,并且为了避免数据热点冲突与线程调优等情况,将Client侧的数据初始化300W条商品,并直接使用jdk21虚拟线程+spring boot3+seata AT来测试,在gc方面全部采用分代ZGC进行,压测工具为阿里云PTS,Server侧统一使用jdk21(目前还未适配虚拟线程) 服务器配置如下 TC: 4c8g3 Client: 4c8G*1 数据库为阿里云rds 4c16g

  • 64并发压测只增加@Globaltransactional注解接口空提交的性能
  • 随机300W数据进行32并发10分钟的扣库存

4.1 1.7.1 db模式

raft压测模型

空提交 64C

db64-2

随机扣库存 32C

db32-2

4.2 2.0 raft模式

raft压测模型

空提交 64C

raft64-2

随机扣库存 32C

raft32c-2

4.3 压测结果对比

32并发对300W商品随机扣库存场景

tps avgtps maxcountrterror存储类型
1709(42%↑)2019(21%↑)1228803(42%↑)13.86ms(30%↓)0Raft
1201166886410519.86ms0DB

64并发空压@Globaltransactional接口(压测峰值上限为8000)

tps avgtps maxcountrterror存储类型
5704(20%↑)8062(30%↑)4101236(20%↑)7.79ms(19%↓)0Raft
4743617234102409.65ms0DB

除了以上数据上的直观对比,通过对压测的曲线图来观察,raft模式下tps与rt更加平稳,抖动更少,性能与吞吐量上更佳.

5.总结

在Seata未来的发展中,性能、入门门槛、部署运维成本,都是我们需要关注和不断优化的方向,在raft模式推出后有以下几个特点:

  1. 如存储方面,存算分离后Seata对其优化的上限被拔高,自主可控
  2. 部署成本更低,无需额外的注册中心,存储中间件
  3. 入门的门槛更低,无需学习其他的一些如注册中心的知识,一站式直接使用Seata Raft 即可上手

针对业界发展趋势,一些开源项目如ClickHouse和Kafka已经开始放弃使用ZooKeeper,并转而采用自研的解决方案,比如ClickKeeper和KRaft。这些方案将元数据等信息交由自身保证存储,以减少对第三方依赖的需求,从而降低运维成本和学习成本。这些特性是非常成熟和可借鉴的。

当然,目前来看,基于Raft模式的解决方案可能还不够成熟,可能无法完全达到上述描述的那样美好。然而,正是因为存在这样的理论基础,社区更应该朝着这个方向努力,让实践逐步接近理论的要求。在这里,欢迎所有对Seata感兴趣的同学加入社区,共同为Seata添砖加瓦!

· 阅读需 23 分钟

本文主要介绍分布式事务从内部到商业化和开源的演进历程,Seata社区当前进展和未来规划。

Seata是一款开源的分布式事务解决方案,旨在为现代化微服务架构下的分布式事务提供解决方案。Seata提供了完整的分布式事务解决方案,包括AT、TCC、Saga和XA事务模式,可支持多种编程语言和数据存储方案。Seata还提供了简便易用的API,以及丰富的文档和示例,方便企业在应用Seata时进行快速开发和部署。

Seata的优势在于具有高可用性、高性能、高扩展性等特点,同时在进行横向扩展时也无需做额外的复杂操作。 目前Seata已在阿里云上几千家客户业务系统中使用,其可靠性得到了业内各大厂商的认可和应用。

作为一个开源项目,Seata的社区也在不断扩大,现已成为开发者交流、分享和学习的重要平台,也得到了越来越多企业的支持和关注。

今天我主要针对以下三个小议题对Seata进行分享:

  • 从TXC/GTS 到 Seata
  • Seata 社区最新进展
  • Seata 社区未来规划

从TXC/GTS 到Seata

分布式事务的缘起

产品矩阵 Seata 在阿里内部的产品代号叫TXC(taobao transaction constructor),这个名字有非常浓厚的组织架构色彩。TXC 起源于阿里五彩石项目,五彩石是上古神话中女娲补天所用的石子,项目名喻意为打破关键技术壁垒,象征着阿里在从单体架构向分布式架构的演进过程中的重要里程碑。在这个项目的过程中演进出一批划时代的互联网中间件,包括我们常说的三大件:

  • HSF 服务调用框架
    解决单体应用到服务化后的服务通信调用问题。
  • TDDL 分库分表框架
    解决规模化后单库存储容量和连接数问题。
  • MetaQ 消息框架
    解决异步调用问题。

三大件的诞生满足了微服务化业务开发的基本需求,但是微服务化后的数据一致性问题并未得到妥善解决,缺少统一的解决方案。应用微服务化后出现数据一致性问题概率远大于单体应用,从进程内调用到网络调用这种复杂的环境加剧了异常场景的产生,服务跳数的增多使得在出现业务处理异常时无法协同上下游服务同时进行数据回滚。TXC的诞生正是为了解决应用架构层数据一致性的痛点问题,TXC 核心要解决的数据一致性场景包括:

  • 跨服务的一致性。 应对系统异常如调用超时和业务异常时协调上下游服务节点回滚。
  • 分库分表的数据一致性。 应对业务层逻辑SQL操作的数据在不同数据分片上,保证其分库分表操作的内部事务。
  • 消息发送的数据一致性。 应对数据操作和消息发送成功的不一致性问题。

为了克服以上通用场景遇到的问题,TXC与三大件做了无缝集成。业务使用三大件开发时,完全感知不到背后TXC的存在,业务不需要考虑数据一致性的设计问题,数据一致性保证交给了框架托管,业务更加聚焦于业务本身的开发,极大的提升了开发的效率。


GTS架构

TXC已在阿里集团内部广泛应用多年,经过双11等大型活动的洪荒流量洗礼,TXC极大提高了业务的开发效率,保证了数据的正确性,消除了数据不一致导致的资损和商誉问题。随着架构的不断演进,标准的三节点集群已可以承载接近10W TPS的峰值和毫秒级事务处理。在可用性和性能方面都达到了4个9的SLA保证,即使在无值守状态下也能保证全年无故障。


分布式事务的演进

新事物的诞生总是会伴随着质疑的声音。中间件层来保证数据一致性到底可靠吗?TXC最初的诞生只是一种模糊的理论,缺乏理论模型和工程实践。在我们进行MVP(最小可行产品)模型测试并推广业务上线后,经常出现故障,常常需要在深夜起床处理问题,睡觉时要佩戴手环来应对紧急响应,这也是我接管这个团队在技术上过的最痛苦的几年。

分布式事务演进

随后,我们进行了广泛的讨论和系统梳理。我们首先需要定义一致性问题,我们是要像RAFT一样实现多数共识一致性,还是要像Google Spanner一样解决数据库一致性问题,还是其他方式?从应用节点自上而下的分层结构来看,主要包括开发框架、服务调用框架、数据中间件、数据库Driver和数据库。我们需要决定在哪一层解决数据一致性问题。我们比较了解决不同层次数据一致性问题所面临的一致性要求、通用性、实现复杂度和业务接入成本。最后,我们权衡利弊,把实现复杂度留给我们,作为一个一致性组件,我们需要确保较高的一致性,但又不能锁定到具体数据库的实现上,确保场景的通用性和业务接入成本足够低以便更容易实现业务,这也是TXC最初采用AT模式的原因。

分布式事务它不仅仅是一个框架,它是一个体系。 我们在理论上定义了一致性问题,概念上抽象出了模式、角色、动作和隔离性等。从工程实践的角度,我们定义了编程模型,包括低侵入的注解、简单的方法模板和灵活的API ,定义了事务的基础能力和增强能力(例如如何以低成本支持大量活动),以及运维、安全、性能、可观测性和高可用等方面的能力。

事务逻辑模型 分布式事务解决了哪些问题呢?一个经典且具有体感的例子就是转账场景。转账过程包括减去余额和增加余额两个步骤,我们如何保证操作的原子性?在没有任何干预的情况下,这两个步骤可能会遇到各种问题,例如B账户已销户或出现服务调用超时等情况。

超时问题一直是分布式应用中比较难解决的问题,我们无法准确知晓B服务是否执行以及其执行顺序。从数据的角度来看,这意味着B 账户的钱未必会被成功加起来。在服务化改造之后,每个节点仅获知部分信息,而事务本身需要全局协调所有节点,因此需要一个拥有上帝视角、能够获取全部信息的中心化角色,这个角色就是TC(transaction coordinator),它用于全局协调事务的状态。TM(Transaction Manager) 则是驱动事务生成提议的角色。但是,即使上帝也有打瞌睡的时候,他的判断也并不总是正确的,因此需要一个RM(resource manager) 角色作为灵魂的代表来验证事务的真实性。这就是TXC 最基本的哲学模型。我们从方法论上验证了它的数据一致性是非常完备的,当然,我们的认知是有边界的。也许未来会证明我们是火鸡工程师,但在当前情况下,它的模型已经足以解决大部分现有问题。

分布式事务性能 经过多年的架构演进,从事务的单链路耗时角度来看,TXC在事务开始时的处理平均时间约为0.2毫秒,分支注册的平均时间约为0.4毫秒,整个事务额外的耗时在毫秒级别之内。这也是我们推算出的极限理论值。在吞吐量方面,单节点的TPS达到3万次/秒,标准集群的TPS接近10万次/秒。


Seata 开源

为什么要做开源?这是很多人问过我的问题。2017年我们做了商业化的 GTS(Global Transaction Service )产品产品在阿里云上售卖,有公有云和专有云两种形态。此时集团内发展的顺利,但是在我们商业化的过程中并不顺利,我们遇到了各种各样的问题,问题总结起来主要包括两类:一是开发者对于分布式事务的理论相当匮乏, 大多数人连本地事务都没搞明白是怎么回事更何况是分布式事务。 二是产品成熟度上存在问题, 经常遇到稀奇古怪的场景问题,导致了支持交付成本的急剧上升,研发变成了售后客服。

我们反思为什么遇到如此多的问题,这里主要的问题是在阿里集团内部是统一语言栈和统一技术栈的,我们对特定场景的打磨是非常成熟的,服务阿里一家公司和服务云上成千上万家企业有本质的区,这也启示我们产品的场景生态做的不够好。在GitHub 80%以上的开源软件是基础软件,基础软件首要解决的是场景通用性问题,因此它不能被有一家企业Lock In,比如像Linux,它有非常多的社区分发版本。因此,为了让我们的产品变得更好,我们选择了开源,与开发者们共建,普及更多的企业用户。

阿里开源 阿里的开源经历了三个主要阶段。第一个阶段是Dubbo所处的阶段,开发者用爱发电, Dubbo开源了有10几年的时间,时间充分证明了Dubbo是非常优秀的开源软件,它的微内核插件化的扩展性设计也是我最初开源Seata 的重要参考。做软件设计的时候我们要思考扩展性和性能权衡起来哪个会更重要一些,我们到底是要做一个三年的设计,五年的设计亦或是满足业务发展的十年设计。我们在做0-1服务调用问题的解决方案的同时,能否预测到1-100规模化后的治理问题。

第二个阶段是开源和商业化的闭环,商业化反哺于开源社区,促进了开源社区的发展。 我认为云厂商更容易做好开源的原因如下:

  • 首先,云是一个规模化的经济,必然要建立在稳定成熟的内核基础上,在上面去包装其产品化能力包括高可用、免运维和弹性能力。不稳定的内核必然导致过高的交付支持成本,研发团队的支持答疑穿透过高,过高的交付成本无法实现大规模的复制,穿透率过高无法使产品快速的演进迭代。
  • 其次,商业产品是更懂业务需求的。我们内部团队做技术的经常是站在研发的视角YY 需求,做出来的东西没有人使用,也就不会形成价值的转换。商业化收集到的都是真实的业务需求,因此,它的开源内核也必须会朝着这个方向演进。如果不朝着这个方向去演进必然导致两边架构上的分裂,增加团队的维护成本。
  • 最后,开源和商业化闭环,能促进双方更好的发展。如果开源内核经常出现各种问题,你是否愿意相信的它的商业化产品是足够优秀的。

第三个阶段是体系化和标准化。 首先,体系化是开源解决方案的基础。阿里的开源项目大多是基于内部电商场景的实践而诞生的。例如Higress,它用于打通蚂蚁集团的网关;Nacos承载着服务的百万实例和千万连接;Sentinel 提供大促时的降级和限流等高可用性能力;而Seata负责保障交易数据的一致性。这套体系化的开源解决方案是基于阿里电商生态的最佳实践而设计的。其次,标准化是另一个重要的特点。以OpenSergo为例,它既是一个标准,又是一个实现。在过去几年里,国内开源项目数量呈爆发式增长。然而,各个开源产品的能力差异很大,彼此集成时会遇到许多兼容性问题。因此,像OpenSergo这样的开源项目能够定义一些标准化的能力和接口,并提供一些实现,这将为整个开源生态系统的发展提供极大的帮助。


Seata 社区最新进展

Seata 社区简介

社区简介 目前,Seata已经开源了4种事务模式,包括AT、TCC、Saga和XA,并在积极探索其他可行的事务解决方案。 Seata已经与10多个主流的RPC框架和关系数据库进行了集成,同时与20 多个社区存在集成和被集成的关系。此外,我们还在多语言体系上探索除Java之外的语言,如Golang、PHP、Python和JS。

Seata已经被几千家客户应用到业务系统中。Seata的应用已经变得越来越成熟,在金融业务场景中信银行和光大银行与社区做了很好的合作,并成功将其纳入到核心账务系统中。在金融场景对微服务体系的落地是非常严苛的,这也标志着Seata的内核成熟度迈上了一个新台阶。


Seata 扩展生态

扩展生态 Seata采用了微内核和插件化的设计,它在API、注册配置中心、存储模式、锁控制、SQL解析器、负载均衡、传输、协议编解码、可观察性等方面暴露了丰富的扩展点。 这使得业务可以方便地进行灵活的扩展和技术组件的选择。


Seata 应用案例

应用案例 案例1:中航信航旅纵横项目
中航信航旅纵横项目在Seata 0.2版本中引入Seata解决机票和优惠券业务的数据一致性问题,大大提高了开发效率、减少了数据不一致造成的资损并提升了用户交互体验。

案例2:滴滴出行二轮车事业部
滴滴出行二轮车事业部在Seata 0.6.1版本中引入Seata,解决了小蓝单车、电动车、资产等业务流程的数据一致性问题,优化了用户使用体验并减少了资产的损失。

案例3:美团基础架构
美团基础架构团队基于开源的Seata项目开发了内部分布式事务解决方案Swan,被用于解决美团内部各业务的分布式事务问题。

场景4:盒马小镇
盒马小镇在游戏互动中使用Seata控制偷花的流程,开发周期大大缩短,从20天缩短到了5天,有效降低了开发成本。


Seata 事务模式的演进

模式演进


Seata 当前进展

  • 支持 Oracle和 Postgresql 多主键。
  • 支持 Dubbo3
  • 支持 Spring Boot3
  • 支持 JDK 17
  • 支持 ARM64 镜像
  • 支持多注册模型
  • 扩展了多种SQL语法
  • 支持 GraalVM Native Image
  • 支持 Redis lua 存储模式

Seata 2.x 发展规划

发展规划

主要包括下面几个方面:

  • 存储/协议/特性
    存储模式上探索存算不分离的Raft集群模式;更好的体验,统一当前4种事务模式的API;兼容GTS协议;支持Saga注解;支持分布式锁的控制;支持以数据视角的洞察和治理。
  • 生态
    融合支持更多的数据库,更多的服务框架,同时探索国产化信创生态的支持;支持MQ生态;进一步完善APM的支持。
  • 解决方案
    解决方案上除了支持微服务生态探索多云方案;更贴近云原生的解决方案;增加安全和流量防护能力;实现架构上核心组件的自闭环收敛。
  • 多语言生态
    多语言生态中Java最成熟,其他已支持的编程语言继续完善,同时探索与语言无关的Transaction Mesh方案。
  • 研发效能/体验
    提升测试的覆盖率,优先保证质量、兼容性和稳定性;重构官网文档结构,提升文档搜索的命中率;在体验上简化运维部署,实现一键安装和配置元数据简化;控制台支持事务控制和在线分析能力。

一句话总结2.x 的规划:更大的场景,更大的生态,从可用到好用。


Seata 社区联系方式

联系方式