再前不久,我写了一篇关于分布式事务中间件 Fescar 的解析,没过几天 Fescar 团队对其进行了品牌升级,取名为 Seata(Simpe Extensible Autonomous Transaction Architecture),而以前的 Fescar 的英文全称为 Fast & EaSy Commit And Rollback。可以看见 Fescar 从名字上来看更加局限于 Commit 和 Rollback,而新的品牌名字 Seata 旨在打造一套一站式分布式事务解决方案。更换名字之后,我对其未来的发展更有信心。
这里先大概回忆一下 Seata 的整个过程模型:
- TM:事务的发起者。用来告诉 TC,全局事务的开始,提交,回滚。
- RM:具体的事务资源,每一个 RM 都会作为一个分支事务注册在 TC。
- TC:事务的协调者。也可以看做是 Fescar-server,用于接收我们的事务的注册,提交和回滚。
在之前的文章中对整个角色有个大体的介绍,在这篇文章中我将重点介绍其中的核心角色 TC,也就是事务协调器。
2.Transcation Coordinator
为什么之前一直强调 TC 是核心呢?那因为 TC 这个角色就好像上帝一样,管控着云云众生的 RM 和 TM。如果 TC 一旦不好使,那么 RM 和 TM 一旦出现小问题,那必定会乱的一塌糊涂。所以要想了解 Seata,那么必须要了解它的 TC。
那么一个优秀的事务协调者应该具备哪些能力呢?我觉得应该有以下几个:
- 正确的协调:能正确的协调 RM 和 TM 接下来应该做什么,做错了应该怎么办,做对了应该怎么办。
- 高可用: 事务协调器在分布式事务中很重要,如果不能保证高可用,那么它也没有存在的必要了。
- 高性能:事务协调器的性能一定要高,如果事务协调器性能有瓶颈那么它所管理的 RM 和 TM 那么会经常遇到超时,从而引起回滚频繁。
- 高扩展性:这个特点是属于代码层面的,如果是一个优秀的框架,那么需要给使用方很多自定义扩展,比如服务注册/发现,读取配置等等。
下面我也将逐步阐述 Seata 是如何做到上面四点。
2.1 Seata-Server 的设计
Seata-Server 整体的模块图如上所示:
- Coordinator Core: 在最下面的模块是事务协调器核心代码,主要用来处理事务协调的逻辑,如是否 commit,rollback 等协调活动。
- Store:存储模块,用来将我们的数据持久化,防止重启或者宕机数据丢失。
- Discovery: 服务注册/发现模块,用于将 Server 地址暴露给我们 Client。
- Config: 用来存储和查找我们服务端的配置。
- Lock: 锁模块,用于给 Seata 提供全局锁的功能。
- RPC:用于和其它端通信。
- HA-Cluster:高可用集群,目前还没开源,为 Seata 提供可靠的高可用服务,预计将会在 0.6 版本开源。
2.2 Discovery
首先来讲讲比较基础的 Discovery 模块,又称服务注册/发现模块。我们将 Seata-Sever 启动之后,需要将自己的地址暴露给其它使用者,那么就需要我们这个模块帮忙。
这个模块有个核心接口 RegistryService,如上图所示:
- register:服务端使用,进行服务注册。
- unregister:服务端使用,一般在 JVM 关闭钩子,ShutdownHook 中调用。
- subscribe:客户端使用,注册监听事件,用来监听地址的变化。
- unsubscribe:客户端使用,取消注册监听事件。
- lookup:客户端使用,根据 key 查找服务地址列表。
- close:都可以使用,用于关闭 Registry 资源。
如果需要添加自己定义的服务注册/发现,那么实现这个接口即可。截止目前在社区的不断开发推动下,已经有五种服务注册/发现,分别是 redis、zk、nacos、eruka 和 consul。下面简单介绍下 Nacos 的实现:
2.2.1 register 接口:
step1:校验地址是否合法
step2:获取 Nacos 的 Naming 实例,然后将地址注册到服务名为 serverAddr(固定服务名) 的对应集群分组(registry.conf 文件配置)上面。
unregister 接口类似,这里不做详解。
2.2.2 lookup 接口:
step1:获取当前 clusterName 名字。
step2:判断当前集群名对应的服务是否已经订阅过了,如果是直接从 map 中取订阅返回的数据。
step3:如果没有订阅先主动查询一次服务实例列表,然后添加订阅并将订阅返回的数据存放到 map 中,之后直接从 map 获取最新数据。
2.2.3 subscribe 接口
这个接口比较简单,具体分两步:
step1:对将要订阅的 cluster-> listener 存放到 map 中,此处 nacos 未提交单机已订阅列表,所以需要自己实现。
step2:使用 Nacos api 订阅。
2.3 Config
配置模块也是一个比较基础,比较简单的模块。我们需要配置一些常用的参数比如:Netty 的 select 线程数量,work 线程数量,session 允许最大为多少等等,当然这些参数再 Seata 中都有自己的默认设置。
同样的在 Seata 中也提供了一个接口 Configuration,用来自定义我们需要的获取配置的地方:
- getInt/Long/Boolean/getConfig():通过 dataId 来获取对应的值,读取不到配置、异常或超时将返回参数中的默认值。
- putConfig:用于添加配置。
- removeConfig:删除一个配置。
- add/remove/get ConfigListener:添加/删除/获取 配置监听器,一般用来监听配置的变更。
目前为止有四种方式获取 Config:File(文件获取)、Nacos、Apollo 和 ZK(不推荐)。在 Seata 中首先需要配置 registry.conf,来配置 config.type 。实现 conf 比较简单这里就不深入分析。
2.4 Store
存储层的实现对于 Seata 是否高性能,是否可靠非常关键。 如果存储层没有实现好,那么如果发生宕机,在 TC 中正在进行分布式事务处理的数据将会被丢失,既然使用了分布式事务,那么其肯定不能容忍丢失。如果存储层实现好了,但是其性能有很大问题,RM 可能会发生频繁回滚那么其完全无法应对高并发的场景。
在 Seata 中默认提供了文件方式的存储,下面我们定义我们存储的数据为 Session,而我们的 TM 创造的全局事务操作数据叫 GlobalSession,RM 创造的分支事务操作数据叫 BranchSession,一个 GlobalSession 可以拥有多个 BranchSession。我们的目的就是要将这么多 Session 存储下来。
在 FileTransactionStoreManager#writeSession 代码中:
上面的代码主要分为下面几步:
- step1:生成一个 TransactionWriteFuture。
- step2:将这个 futureRequest 丢进一个 LinkedBlockingQueue 中。为什么需要将所有数据都丢进队列中呢?当然这里其实也可以用锁来实现,再另外一个阿里开源的 RocketMQ 中,使用的锁。不论是队列还是锁它们的目的是为了保证单线程写,这又是为什么呢?有人会解释说,需要保证顺序写,这样速度就很快,这个理解是错误的,我们的 FileChannel 的写方法是线程安全的,已经能保证顺序写了。保证单线程写其实是为了让我们这个写逻辑都是单线程的,因为可能有些文件写满或者记录写数据位置等等逻辑,当然这些逻辑都可以主动加锁去做,但是为了实现简单方便,直接再整个写逻辑排队处理是最为合适的。
- step3:调用 future.get,等待我们该条数据写逻辑完成通知。
我们将数据提交到队列之后,我们接下来需要对其进行消费,代码如下:
这里将一个 WriteDataFileRunnable()提交进我们的线程池,这个 Runnable 的 run()方法如下:
分为下面几步:
step1: 判断是否停止,如果 stopping 为 true 则返回 null。
step2:从我们的队列中获取数据。
step3:判断 future 是否已经超时了,如果超时,则设置结果为 false,此时我们生产者 get()方法会接触阻塞。
step4:将我们的数据写进文件,此时数据还在 pageCahce 层并没有刷新到磁盘,如果写成功然后根据条件判断是否进行刷盘操作。
step5:当写入数量到达一定的时候,或者写入时间到达一定的时候,需要将我们当前的文件保存为历史文件,删除以前的历史文件,然后创建新的文件。这一步是为了防止我们文件无限增长,大量无效数据浪费磁盘资源。
在我们的 writeDataFile 中有如下代码:
step1:首先获取我们的 ByteBuffer,如果超出最大循环 BufferSize 就直接创建一个新的,否则就使用我们缓存的 Buffer。这一步可以很大的减少 GC。
step2:然后将数据添加进入 ByteBuffer。
step3:最后将 ByteBuffer 写入我们的 fileChannel,这里会重试三次。此时的数据还在 pageCache 层,受两方面的影响,OS 有自己的刷新策略,但是这个业务程序不能控制,为了防止宕机等事件出现造成大量数据丢失,所以就需要业务自己控制 flush。下面是 flush 的代码:
这里 flush 的条件写入一定数量或者写的时间超过一定时间,这样也会有个小问题如果是停电,那么 pageCache 中有可能还有数据并没有被刷盘,会导致少量的数据丢失。目前还不支持同步模式,也就是每条数据都需要做刷盘操作,这样可以保证每条消息都落盘,但是性能也会受到极大的影响,当然后续会不断的演进支持。
我们的 store 核心流程主要是上面几个方法,当然还有一些比如,session 重建等,这些比较简单,读者可以自行阅读。
2.5 Lock
大家知道数据库实现隔离级别主要是通过锁来实现的,同样的再分布式事务框架 Seata 中要实现隔离级别也需要通过锁。一般在数据库中数据库的隔离级别一共有四种:读未提交,读已提交,可重复读,串行化。在 Seata 中可以保证隔离级别是读已提交,但是提供了达到读已提交隔离的手段。
Lock 模块也就是 Seata 实现隔离级别的核心模块。在 Lock 模块中提供了一个接口用于管理我们的锁:
其中有三个方法:
- acquireLock:用于对 我们的 BranchSession 加锁,这里虽然是传的分支事务 Session,实际上是对分支事务的资源加锁,成功返回 true。
- isLockable:根据事务 ID,资源 Id,锁住的 Key 来查询是否已经加锁。
- cleanAllLocks:清除所有的锁。
对于锁我们可以在本地实现,也可以通过 redis 或者 mysql 来帮助我们实现。官方默认提供了本地全局锁的实现:
在本地锁的实现中有两个常量需要关注:
- BUCKET_PER_TABLE:用来定义每个 table 有多少个 bucket,目的是为了后续对同一个表加锁的时候减少竞争。
- LOCK_MAP:这个 map 从定义上来看非常复杂,里里外外套了很多层 Map,这里用个表格具体说明一下:
层数 | key | value |
---|---|---|
1-LOCK_MAP | resourceId(jdbcUrl) | dbLockMap |
2- dbLockMap | tableName (表名) | tableLockMap |
3- tableLockMap | PK.hashcode%Bucket (主键值的 hashcode%bucket) | bucketLockMap |
4- bucketLockMap | PK | trascationId |
可以看见实际上的加锁在 bucketLockMap 这个 map 中,这里具体的加锁方法比较简单就不作详细阐述,主要是逐步的找到 bucketLockMap,然后将当前 trascationId 塞进去,如果这个主键当前有 TranscationId,那么比较是否是自己,如果不是则加锁失败。
2.6 RPC
保证 Seata 高性能的关键之一也是使用了 Netty 作为 RPC 框架,采用默认配置的线程模型如下图所示:
如果采用默认的基本配置那么会有一个 Acceptor 线程用于处理客户端的链接,会有 cpu*2 数量的 NIO-Thread,再这个线程中不会做业务太重的事情,只会做一些速度比较快的事情,比如编解码,心跳事件,和 TM 注册。一些比较费时间的业务操作将会交给业务线程池,默认情况下业务线程池配置为最小线程为 100,最大为 500。
Seata 目前允许配置的传输层配置如图所示,用户可根据需要进行 Netty 传输层面的调优,配置通过配置中心配置,首次加载时生效。
这里需要提一下的是 Seata 的心跳机制,这里是使用 Netty 的 IdleStateHandler 完成的,如下:
在 Sever 端对于写没有设置最大空闲时间,对于读设置了最大空闲时间,默认为 15s(客户端默认写空闲为 5s,发送 ping 消息),如果超过 15s 则会将链接断开,关闭资源。
step1:判断是否是读空闲的检测事件。
step2:如果是则断开链接,关闭资源。 另外 Seata 做了内存池、客户端做了批量小包合并发送、Netty 连接池(减少连接创建时的服务不可用时间)等功能,以下为批量小包合并功能。
客户端的消息发送并不是真正的消息发送通过 AbstractRpcRemoting#sendAsyncRequest 包装成 RpcMessage 存储至 basket 中并唤醒合并发送线程。合并发送线程通过 while true 的形式 最长等待 1ms 对 basket 的消息取出包装成 merge 消息进行真正发送,此时若 channel 出现异常则会通过 fail-fast 快速失败返回结果。merge 消息发送前在 map 中标识,收到结果后批量确认(AbstractRpcRemotingClient#channelRead),并通过 dispatch 分发至 messageListener 和 handler 去处理。同时,timerExecutor 定时对已发送消息进行超时检测,若超时置为失败。具体消息协议设计将会在后续的文章中给出,敬请关注。 Seata 的 Netty Client 由 TMClient 和 RMClient 组成,根据事务角色功能区分,都继承 AbstractRpcRemotingClient,AbstractRpcRemotingClient 实现了 RemotingService(服务启停), RegisterMsgListener(netty 连接池连接创建回调)和 ClientMessageSender(消息发送)继承了 AbstractRpcRemoting( Client 和 Server 顶层消息发送和处理的模板)。
RMClient 类关系图如下图所示:
TMClient 和 RMClient 又会根据自身的 poolConfig 配置与 NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> 进行 channel 连接的交互,channel 连接池根据角色 key+ip 作为连接池的 key 来定位各个连接池 ,连接池对 channel 进行统一的管理。TMClient 和 RMClient 在发送过程中对于每个 ip 只会使用一个长连接,但连接不可用时,会从连接池中快速取出已经创建好并可用的连接,减少服务的不可用时间。