在前几篇文章中,我们详细聊了聊 Seata 的 XA、AT 以及 TCC 模式,它们都是在 Seata 定义的全局框架下的不同的事务模式。
我们知道,在 Seata 中,有三类角色,TC、RM、TM,Seata Server 作为 TC 协调分支事务的提交和回滚,各个资源作为 RM 和 TM,那么这三者之间是如何通信的?
所以,这篇文章就来看看 Seata 底层是如何进行网络通信的。
整体类层次结构
我们先着眼大局,看一看 Seata 整个 RPC 的类层次结构。
从类结构层次可以看出来,AbstractNettyRemoting 是整个 Seata 网络通信的一个顶层抽象类。
在这个类中主要实现了一些 RPC 的基础通用方法,比如同步调用 sendSync、异步调用 sendAsync 等。
事实上,就网络调用来说,无非就是同步调用和异步调用,像其他的什么请求和响应都只是报文内容的区分。
所以,在 Seata 中,我个人认为还差一个顶层的接口 Remoting,类似于下面这样的:
import io.netty.channel.Channel;
import java.util.concurrent.TimeoutException;
public interface Remoting<Req, Resp> {
/**
* 同步调用
*/
Resp sendSync(Channel channel, Req request, long timeout) throws TimeoutException;
/**
* 异步调用
*/
void sendAsync(Channel channel, Req request);
}
在 AbstractNettyRemoting 实现了通用的网络调用方法,但是不同角色在这方面还是有一些区分的,比如对于 Server 来说,它的请求调用需要知道向哪个客户端发送,而对于 TM、RM 来说,它们发送请求直接发就行,不需要指定某个特定的 TC 服务,只需要在实现类通过负载均衡算法找到合适的 Server 节点就行。
所以就区分出了 RemotingServer 和 RemotingClient,但是底层还是要依赖 AbstractNettyRemoting 进行网络调用的,所以它们各自有子类实现了 AbstractNettyRemoting。
可以说 Seata 的这种设计在我看来是非常不错的,对于这种 CS 架构的远程通信,可以算一种通用的设计方案。
如何启动 Server 和 Client
聊完了 Seata 底层的类层次,我们再分别以 Server 和 Client 的视角来看它们是如何启动的,以及在启动的时候需要做些什么事情。
Server 是怎么启动的
Seata Server 作为一个独立的 SpringBoot 项目,要怎么样才能在 SpringBoot 启动的时候自动做点事呢?
Seata 的做法是实现了 CommandLineRunner 接口,至于这里面的原理就不是本篇文章讨论的内容了。
我们主要关注它的 run 方法:
// org.apache.seata.server.ServerRunner#run
public void run(String... args) {
try {
long start = System.currentTimeMillis();
seataServer.start(args);
started = true;
long cost = System.currentTimeMillis() - start;
LOGGER.info("\r\n you can visit seata console UI on http://127.0.0.1:{}. \r\n log path: {}.", this.port, this.logPath);
LOGGER.info("seata server started in {} millSeconds", cost);
} catch (Throwable e) {
started = Boolean.FALSE;
LOGGER.error("seata server start error: {} ", e.getMessage(), e);
System.exit(-1);
}
}
这其中核心的逻辑就在 seataServer.start() 方法中:
// org.apache.seata.server.Server#start
public void start(String[] args) {
// 参数解析器,用于解析 sh 的启动参数
ParameterParser parameterParser = new ParameterParser(args);
// initialize the metrics
MetricsManager.get().init();
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
// 127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
if (StringUtils.isNotBlank(preferredNetworks)) {
XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
}
/**
* 主要做这么几件事:
* 1. 设置 workingThreads 为 AbstractNettyRemoting 的 messageExecutor 处理器
* 2. 创建 ServerBootstrap,配置 Boss 和 Worker,并且设置 Seata Server 需要监听的端口
* 3. 设置出栈、入栈处理器 ServerHandler,它是一个 ChannelDuplexHandler 复合的处理器
*/
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
XID.setPort(nettyRemotingServer.getListenPort());
UUIDGenerator.init(parameterParser.getServerNode());
ConfigurableListableBeanFactory beanFactory = ((GenericWebApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT)).getBeanFactory();
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
if (coordinator instanceof ApplicationListener) {
beanFactory.registerSingleton(NettyRemotingServer.class.getName(), nettyRemotingServer);
beanFactory.registerSingleton(DefaultCoordinator.class.getName(), coordinator);
((GenericWebApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT)).addApplicationListener((ApplicationListener<?>) coordinator);
}
// log store mode: file, db, redis
SessionHolder.init();
LockerManagerFactory.init();
// 初始化一系列定时线程池,用于重试事务提交/回滚等
coordinator.init();
// 设置事务处理 Handler 为 DefaultCoordinator
nettyRemotingServer.setHandler(coordinator);
serverInstance.serverInstanceInit();
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
// Server 初始化
nettyRemotingServer.init();
}
最后的 nettyRemotingServer.init() 是整个 Seata Server 启动的重要逻辑,主要做了这么几件事:
- 注册一系列处理器
- 初始化一个定时线程池,用于清理过期的 MessageFuture
- 启动 ServerBootStrap 并将 TC 服务注册到注册中心,比如 Nacos
注册处理器
在 Seata 内部,用一个 Pair 对象关联了处理器和线程池,如下:
package org.apache.seata.core.rpc.processor;
public final class Pair<T1, T2> {
private final T1 first;
private final T2 second;
public Pair(T1 first, T2 second) {
this.first = first;
this.second = second;
}
public T1 getFirst() {
return first;
}
public T2 getSecond() {
return second;
}
}
而注册处理器本质就是将报文类型、处理该报文的处理器以及具体执行的线程池关联起来,存到一张哈希表中。
// AbstractNettyRemotingServer
protected final Map<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
// org.apache.seata.core.rpc.netty.NettyRemotingServer#registerProcessor
private void registerProcessor() {
// 1. registry on request message processor
ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler());
ShutdownHook.getInstance().addDisposable(onRequestProcessor);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. registry on response message processor
ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
// 3. registry rm message processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. registry tm message processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
// org.apache.seata.core.rpc.netty.AbstractNettyRemotingServer#registerProcessor
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}
你可能会注意到,在注册处理器时,有一些注册时传入的线程池是 null,那么对应的报文会由哪个线程执行呢?
后面我们会提到。
初始化定时线程池
// org.apache.seata.core.rpc.netty.AbstractNettyRemoting#init
public void init() {
timerExecutor.scheduleAtFixedRate(() -> {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
MessageFuture future = entry.getValue();
if (future.isTimeout()) {
futures.remove(entry.getKey());
RpcMessage rpcMessage = future.getRequestMessage();
future.setResultMessage(new TimeoutException(String.format("msgId: %s, msgType: %s, msg: %s, request timeout",
rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();
}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}
这个没啥好说的,就是初始化了一个定时线程池定时清理那些超时的 MessageFuture,这里 MessageFuture 是 Seata 将异步调用转为同步调用的关键,我们后面也会详细说到。
启动 ServerBootStrap
最后启动 ServerBootStrap,这差不多就是 Netty 的内容了。
// org.apache.seata.core.rpc.netty.NettyServerBootstrap#start
public void start() {
int port = getListenPort();
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// 多版本协议解码器
MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(channelHandlers);
ch.pipeline()
.addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(multiProtocolDecoder);
}
});
try {
this.serverBootstrap.bind(port).sync();
LOGGER.info("Server started, service listen port: {}", getListenPort());
InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort());
for (RegistryService<?> registryService : MultiRegistryFactory.getInstances()) {
// 注册服务
registryService.register(address);
}
initialized.set(true);
} catch (SocketException se) {
throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);
} catch (Exception exx) {
throw new RuntimeException("Server start failed", exx);
}
}
ServerBootstrap 启动时的 childOption 属于网络部分的内容,我们不过多解释。
这里你可能有一点疑问,在 pipeline 中仅仅只是添加了一个 MultiProtocolDecoder 解码器,那业务处理器呢?
事实上,MultiProtocolDecoder 的构造参数中的 channelHandlers 就是 ServerHandler,它是在创建 NettyRemotingServer 时就被设置的。
至于为什么要这样做,其实是和 Seata 的多版本协议相关。
当 Seata Server 启动后第一次进行解码时,会将 MultiProtocolDecoder 从 pipeline 中移除,根据版本选择具体的 Encoder 和 Decoder 并添加到 pipeline 中,此时,也会将 ServerHandler 添加到 pipeline。
Client 是怎么启动的
对于 Client 来说,由于我们一般是在 SpringBoot 中使用 Seata,所以我们需要关注的点在 SeataAutoConfiguration 类中。
在这个类里面创建了一个 GlobalTransactionScanner 对象,我们注意到它实现了 InitializingBean,所以将目光转移到 afterPropertiesSet 方法上。
果然在这个方法里面进行了 TM 和 RM 的初始化。
TM 的初始化
对于 TM 来说,初始化的逻辑如下:
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
/**
* 主要做这么几件事
* 1. 创建线程池作为 AbstractNettyRemotingClient 的 messageExecutor
* 2. 设置事务角色 transactionRole 为 TM_ROLE
* 3. 创建 Bootstrap 并设置出栈、入栈处理器 ClientHandler
* 4. 创建客户端 Channel 管理器 NettyClientChannelManager
*/
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
/**
* 主要做这么几件事:
* 1. 注册一系列处理器
* 2. 创建定时线程池定时对事务组内的 Server 发起连接,如果连接断开,则尝试重新建立连接
* 3. 如果客户端允许报文批量发送,则创建 mergeSendExecutorService 线程池,并提交 MergedSendRunnable 任务
* 4. 初始化一个定时线程池清理过期的 MessageFuture
* 5. 启动客户端 Bootstrap
* 6. 初始化连接 initConnection
*/
tmNettyRemotingClient.init();
}
启动客户端 Bootstrap 的逻辑如下:
@Override
public void start() {
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads()));
}
this.bootstrap.group(this.eventLoopGroupWorker)
.channel(nettyClientConfig.getClientChannelClazz())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
if (nettyClientConfig.enableNative()) {
if (PlatformDependent.isOsx()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("client run on macOS");
}
} else {
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(EpollChannelOption.TCP_QUICKACK, true);
}
}
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
.addLast(new ProtocolDecoderV1())
.addLast(new ProtocolEncoderV1());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});
if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
LOGGER.info("NettyClientBootstrap has started");
}
}
由于客户端的协议版本根据不同的 Seata 版本是可以确定的,所以这里直接添加了 V1 版本的编解码器,这里 channelHandlers 其实就是 ClientHandler,它也是 Netty 中的一个复合处理器。
RM 的初始化
RM 的初始化大致逻辑和 TM 是类似的,这里就不过多介绍了。
如何发送和处理报文
厘清了 Seata Server 和 Client 的大致启动流程之后,我们就可以深入的看一看 Seata 是如何进行报文发送和处理的。
前面我们也说过了,发送请求和处理报文的核心逻辑是在 AbstractNettyRemoting 中,接下来就看一看这个类。
同步和异步
先简单说一说什么是同步和异步。
同步 Synchronous 和异步 Asynchronous,本质上是描述了程序在处理多个事件或者任务时的不同行为模式。
同步是指一个过程必须等待另一个过程完成之后才能继续进行。换句话说,在同步操作中,调用方发出请求后会一直阻塞等待直到接收到响应结果、或者超时才会继续执行后续代码。
相比之下,异步则允许调用者在请求后不必等待响应就可以向下执行,但当请求完成时,会以某种方式将响应通知到调用者(如通过回调函数、Future),异步模型可以提高并发性和效率。
从另一个角度来说,同步调用需要发起调用的线程获取结果,而异步调用则是由异步线程将结果放到某个地方(Future)或者是异步线程去执行事先准备好的调用成功/失败的回调方法(回调函数)。
下面是一个简单的例子,展示了三种调用方式,同步、异步 Future、异步 Callback。
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class AsyncTest {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTest.class);
public static void main(String[] args) throws InterruptedException, ExecutionException {
Result syncResponse = testSync();
LOGGER.info("同步响应结果: {}", syncResponse.getString());
CompletableFuture<Result> result = testAsyncFuture();
testAsyncCallback();
LOGGER.info("主线程继续向下执行~~");
TimeUnit.SECONDS.sleep(1); // 保证所有结果处理完毕
LOGGER.info("主线程从异步 Future 中获取结果: {}", result.get().getString());
}
public static void testAsyncCallback() {
new AsyncTask().execute(new AsyncCallback() {
@Override
public void onComplete(Result result) {
try {
TimeUnit.MILLISECONDS.sleep(50); // 模拟异步耗时
} catch (InterruptedException e) {
}
LOGGER.info("异步 Callback 获取结果: {}", result.getString());
}
});
}
public static CompletableFuture<Result> testAsyncFuture() {
return CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(50); // 模拟异步耗时
} catch (InterruptedException e) {
}
Result asyncResponse = getResult();
LOGGER.info("异步 Future 获取结果: {}", asyncResponse.getString());
return asyncResponse;
});
}
public static Result testSync() {
return getResult();
}
@Data
static class Result {
private String string;
}
interface AsyncCallback {
void onComplete(Result result);
}
static class AsyncTask {
void execute(AsyncCallback callback) {
new Thread(() -> {
Result asyncRes = getResult();
callback.onComplete(asyncRes);
}).start();
}
}
private static Result getResult() {
Result result = new Result();
result.setString("结果");
return result;
}
}
输出:
22:26:38.788 [main] INFO org.hein.netty.AsyncTest - 同步响应结果: 结果
22:26:38.849 [main] INFO org.hein.netty.AsyncTest - 主线程继续向下执行~~
22:26:38.911 [Thread-0] INFO org.hein.netty.AsyncTest - 异步 Callback 获取结果: 结果
22:26:38.911 [ForkJoinPool.commonPool-worker-1] INFO org.hein.netty.AsyncTest - 异步 Future 获取结果: 结果
22:26:39.857 [main] INFO org.hein.netty.AsyncTest - 主线程从异步 Future 中获取结果: 结果
从结果中,至少可以看出三点,
- 一是异步 Future 和异步 Callback 并不会阻塞主线程向下执行。
- 二是异步调用时处理结果的不是主线程。
- 最后,Future 和 Callback 的区别在于 Future 只是由异步线程将结果存储在了一个地方(CompletableFuture#result),但是后续获取结果还是需要主线程(或者其他线程)调用 get 方法,而 Callback 的话,其实就相当于预先设定了结果的处理方式,由异步线程去执行就好了。
当然,CompletableFuture 也是可以作回调的,比如调用 whenComplete 方法。
异步调用
Netty 作为一个高性能的异步 IO 框架,它的设计核心就是异步的,所以基于 Netty 进行异步调用是比较简单的。
protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
channelWritableCheck(channel, rpcMessage.getBody());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("write message: {}, channel: {}, active? {}, writable? {}, isopen? {}", rpcMessage.getBody(), channel, channel.isActive(), channel.isWritable(), channel.isOpen());
}
doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
destroyChannel(future.channel());
}
});
}
只需要简单调用 channel 的 writeAndFlush 方法即可实现异步调用。
特别要注意的是,writeAndFlush 方法在调用线程是 EventLoop 线程的情况下会变成同步调用。
同步调用
在 Netty 中实现异步调用很简单,要实现同步调用就麻烦一点,需要将异步调用转换为同步调用。
从本质上来说,异步转同步就是让调用线程发起调用后,拿到响应前进入阻塞,拿到响应后再唤醒它,向下执行。
那么 Seata 的处理的核心就是 MessageFuture 类,如下:
package org.apache.seata.core.protocol;
import org.apache.seata.common.exception.ShouldNeverHappenException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class MessageFuture {
private RpcMessage requestMessage;
private long timeout;
private final long start = System.currentTimeMillis();
private final transient CompletableFuture<Object> origin = new CompletableFuture<>();
public boolean isTimeout() {
return System.currentTimeMillis() - start > timeout;
}
public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {
Object result;
try {
result = origin.get(timeout, unit);
if (result instanceof TimeoutException) {
throw (TimeoutException) result;
}
} catch (ExecutionException e) {
throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
} catch (TimeoutException e) {
throw new TimeoutException(String.format("%s, cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));
}
if (result instanceof RuntimeException) {
throw (RuntimeException) result;
} else if (result instanceof Throwable) {
throw new RuntimeException((Throwable) result);
}
return result;
}
public void setResultMessage(Object obj) {
origin.complete(obj);
}
public RpcMessage getRequestMessage() { return requestMessage; }
public void setRequestMessage(RpcMessage requestMessage) { this.requestMessage = requestMessage;}
public long getTimeout() { return timeout; }
public void setTimeout(long timeout) { this.timeout = timeout;}
}
有了这个类之后,同步调用的过程如下,我们以客户端请求、服务端响应为例:
- 首先客户端将请求构建为 MessageFuture,然后将请求 id 和这个 MessageFuture 对象存储到一个哈希表中。
- 接着客户端调用 channel.writeAndFlush 发起异步调用,是的,这里还是异步。
- 异步转同步的核心在于,此时线程需要调用 MessageFuture 对象的 get 方法进入阻塞,当然实际是调用了 CompletableFuture 的 get 方法进入同步阻塞。
- 当服务端处理完毕,它又会发出请求(服务端视角),在客户端来看,这就是响应。
- 当客户端收到响应之后,由 EventLoop 线程将响应结果设置到 MessageFuture 中,由于一次请求和响应的 id 是相同的,所以可以从上面的哈希表中拿到对应的 MessageFuture 对象。
- 当响应结果被设置之后,上面阻塞的线程就可以恢复运行,这样就实现了同步的效果。
所以,Seata 的解决方案本质上来说就是利用了 CompletableFuture 对象,将它作为一个存储结果的容器。
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
if (timeoutMillis <= 0) {
throw new FrameworkException("timeout should more than 0ms");
}
if (channel == null) {
LOGGER.warn("sendSync nothing, caused by null channel.");
return null;
}
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture); // 请求和响应的 id 是一样的
// 检查该 Channel 是否可写(Channel 中有写缓冲区,如果缓冲区达到阈值水位,则不可写)
channelWritableCheck(channel, rpcMessage.getBody());
// 获取目的 ip 地址
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
// 执行发送前钩子方法
doBeforeRpcHooks(remoteAddr, rpcMessage);
// 发送结果,并设置回调,非阻塞
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
// 发送失败,移除 future,关闭 Channel
if (!future.isSuccess()) {
MessageFuture mf = futures.remove(rpcMessage.getId());
if (mf != null) {
mf.setResultMessage(future.cause());
}
destroyChannel(future.channel());
}
});
try {
// Netty 是异步发送,所以这里需要等待结果,将异步转为同步
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
// 执行发送后的钩子方法
doAfterRpcHooks(remoteAddr, rpcMessage, result);
return result;
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
// 超时异常
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
}
报文处理
在 Netty 中,提到报文处理,我们 首先应该想到的就是入栈、出栈处理器。
在 Seata Server 端,除了常见的编解码处理器之外,就是 ServerHandler 处理器了,如下:
@ChannelHandler.Sharable
class ServerHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// 前置了解码处理器,所以这里的消息是 RpcMessage
if (msg instanceof RpcMessage) {
processMessage(ctx, (RpcMessage) msg);
} else {
LOGGER.error("rpcMessage type error");
}
}
// ...
}
比较有业务含义的就是这个 channelRead 方法,所有发向 Server 的报文在经过解码之后都会来到这个方法。
这里的 processMessage 方法就是 AbstractNettyRemoting 中的业务处理方法,如下:
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} msgId: {}, body: {}", this, rpcMessage.getId(), rpcMessage.getBody());
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 在 Server 启动的时候,向 processorTable 注册了一大堆处理器
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 拿到对应的线程池执行
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
// 找对应的处理器执行
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
// 线程池满了,执行拒绝策略
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
// 导出线程栈信息
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to {}", jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
try {
// 如果没有为处理器配置线程池,则由当前线程执行,基本上就是 EventLoop 线程了
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
这个方法的逻辑很简单。
Seata 在 Server 启动的过程中,向 processorTable 注册了一大堆处理器,那么这里就可以根据消息类型 Code 拿到对应的处理器和线程池。
如果有线程池,就在线程池内执行处理器的方法,否则就交给 EventLoop 线程去执行。
当然,对于 Client 而言,也是这样的。
批量发送
在网络程序中,有时候也需要实现批量发送,我们来看 Seata 是怎么做的,这里主要看客户端向服务端发送。
还记得我们上面在 Client 启动的过程中提到过一个线程池 mergeSendExecutorService,如果允许批量发送,那么在 Client 启动的时候就会提交一个 MergedSendRunnable 任务,我们先来看这个任务在干啥?
private class MergedSendRunnable implements Runnable {
@Override
public void run() {
// 死循环
while (true) {
synchronized (mergeLock) {
try {
// 保证线程最多只会空闲 1ms
mergeLock.wait(MAX_MERGE_SEND_MILLS); // 1
} catch (InterruptedException ignore) {
// ignore
}
}
// 正在发送中的标识
isSending = true;
// basketMap: key 是 address,value 是发向该 address 的报文队列(阻塞队列)
basketMap.forEach((address, basket) -> {
if (basket.isEmpty()) {
return;
}
MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
// 将同一个阻塞队列中所有 RpcMessage 进行合并
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// 批量发送报文是一个同步请求,但是无需获取返回值
// 因为 messageFuture 在将报文放入 basketMap 之前就已经被创建
// 返回值将在 ClientOnResponseProcessor 中被设置
sendChannel = clientChannelManager.acquireChannel(address);
// 内部将 mergeMessage 封装为一个普通的 RpcMessage 发送
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrorCode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
isSending = false;
}
}
}
那么,与之相关的批量发送代码如下:
public Object sendSyncRequest(Object msg) throws TimeoutException {
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
long timeoutMillis = this.getRpcRequestTimeout();
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
// send batch message
// put message into basketMap, @see MergedSendRunnable
if (this.isEnableClientBatchSendRequest()) {
// 如果允许客户端批量消息发送
// send batch message is sync request, needs to create messageFuture and put it in futures.
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// put message into basketMap
// 拿到 serverAddress 对应的发送队列
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
// 将报文添加到队列中,等待 mergeSendExecutorService 进行实际的发送
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress: {}, rpcMessage: {}", serverAddress, rpcMessage);
return null;
}
if (!isSending) {
// 保证队列中一有数据,就唤醒线程,进行批量发送
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
try {
// 线程阻塞等待响应
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
LOGGER.error("wait response error: {}, ip: {}, request: {}", exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
} else {
// 普通发送,拿到 channel 调父类的同步调用方法即可
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}
可以看到,这里面也用到了对象锁的同步-等待机制,那么实现的效果就是:
- 最多隔 1ms 会遍历 basketMap 进行报文发送。
- 在 mergeSendExecutorService 内部的线程阻塞期间(mainLock.wait),如果来了需要发送的报文,那么会唤醒 mainLock 上的线程,继续进行发送。
那 Server 是怎么处理的呢?主要看 MergedWarpMessage 报文的 TypeCode,实际上就是 TYPE_SEATA_MERGE,再看 Server 启动的时候对这个 Code 注册哪个处理器,实际上就是 ServerOnRequestProcessor。
这里其实就向你展示了,如何去找某个报文是怎么处理的,授人以鱼不如授人以渔!
在 ServerOnRequestProcessor 这边,实际上对应了两种处理 MergedWarpMessage 报文的方式:
- MergedWarpMessage 中的所有独立请求全部处理完毕之后,统一发送 MergeResultMessage。