跳到主要内容

· 阅读需 10 分钟

Seata AT 模式是一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。

为什么要检查全局锁呢,这是由于 Seata AT 模式的事务隔离是建立在支事务的本地隔离级别基础之上的,在数据库本地隔离级别读已提交或以上的前提下,Seata 设计了由事务协调器维护的全局写排他锁,来保证事务间的写隔离,同时,将全局事务默认定义在读未提交的隔离级别上。

Seata 事务隔离级别解读

在讲 Seata 事务隔离级之前,我们先来回顾一下数据库事务的隔离级别,目前数据库事务的隔离级别一共有 4 种,由低到高分别为:

  1. Read uncommitted:读未提交
  2. Read committed:读已提交
  3. Repeatable read:可重复读
  4. Serializable:序列化

数据库一般默认的隔离级别为读已提交,比如 Oracle,也有一些数据的默认隔离级别为可重复读,比如 Mysql,一般而言,数据库的读已提交能够满足业务绝大部分场景了。

我们知道 Seata 的事务是一个全局事务,它包含了若干个分支本地事务,在全局事务执行过程中(全局事务还没执行完),某个本地事务提交了,如果 Seata 没有采取任务措施,则会导致已提交的本地事务被读取,造成脏读,如果数据在全局事务提交前已提交的本地事务被修改,则会造成脏写。

由此可以看出,传统意义的脏读是读到了未提交的数据,Seata 脏读是读到了全局事务下未提交的数据,全局事务可能包含多个本地事务,某个本地事务提交了不代表全局事务提交了。

在绝大部分应用在读已提交的隔离级别下工作是没有问题的,而实际上,这当中又有绝大多数的应用场景,实际上工作在读未提交的隔离级别下同样没有问题。

在极端场景下,应用如果需要达到全局的读已提交,Seata 也提供了全局锁机制实现全局事务读已提交。但是默认情况下,Seata 的全局事务是工作在读未提交隔离级别的,保证绝大多数场景的高效性。

全局锁实现

AT 模式下,会使用 Seata 内部数据源代理 DataSourceProxy,全局锁的实现就是隐藏在这个代理中。我们分别在执行、提交的过程都做了什么。

1、执行过程

执行过程在 StatementProxy 类,在执行过程中,如果执行 SQL 是 select for update,则会使用 SelectForUpdateExecutor 类,如果执行方法中带有 @GlobalTransactional or @GlobalLock注解,则会检查是否有全局锁,如果当前存在全局锁,则会回滚本地事务,通过 while 循环不断地重新竞争获取本地锁和全局锁。

io.seata.rm.datasource.exec.SelectForUpdateExecutor#doExecute

public T doExecute(Object... args) throws Throwable {
Connection conn = statementProxy.getConnection();
// ... ...
try {
// ... ...
while (true) {
try {
// ... ...
if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
// Do the same thing under either @GlobalTransactional or @GlobalLock,
// that only check the global lock here.
statementProxy.getConnectionProxy().checkLock(lockKeys);
} else {
throw new RuntimeException("Unknown situation!");
}
break;
} catch (LockConflictException lce) {
if (sp != null) {
conn.rollback(sp);
} else {
conn.rollback();
}
// trigger retry
lockRetryController.sleep(lce);
}
}
} finally {
// ...
}

2、提交过程

提交过程在 ConnectionProxy#doCommit方法中。

1)如果执行方法中带有@GlobalTransactional注解,则会在注册分支时候获取全局锁:

  • 请求 TC 注册分支

io.seata.rm.datasource.ConnectionProxy#register

private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
context.setBranchId(branchId);
}
  • TC 注册分支的时候,获取全局锁

io.seata.server.transaction.at.ATCore#branchSessionLock

protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
if (!branchSession.lock()) {
throw new BranchTransactionException(LockKeyConflict, String
.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()));
}
}

2)如果执行方法中带有@GlobalLock注解,在提交前会查询全局锁是否存在,如果存在则抛异常:

io.seata.rm.datasource.ConnectionProxy#processLocalCommitWithGlobalLocks

private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}

GlobalLock 注解说明

从执行过程和提交过程可以看出,既然开启全局事务 @GlobalTransactional注解可以在事务提交前,查询全局锁是否存在,那为什么 Seata 还要设计多处一个 @GlobalLock注解呢?

因为并不是所有的数据库操作都需要开启全局事务,而开启全局事务是一个比较重的操作,需要向 TC 发起开启全局事务等 RPC 过程,而@GlobalLock注解只会在执行过程中查询全局锁是否存在,不会去开启全局事务,因此在不需要全局事务,而又需要检查全局锁避免脏读脏写时,使用@GlobalLock注解是一个更加轻量的操作。

如何防止脏写

先来看一下使用 Seata AT 模式是怎么产生脏写的:

注:分支事务执行过程省略其它过程。

业务一开启全局事务,其中包含分支事务A(修改 A)和分支事务 B(修改 B),业务二修改 A,其中业务一执行分支事务 A 先获取本地锁,业务二则等待业务一执行完分支事务 A 之后,获得本地锁修改 A 并入库,业务一在执行分支事务时发生异常了,由于分支事务 A 的数据被业务二修改,导致业务一的全局事务无法回滚。

如何防止脏写?

1、业务二执行时加 @GlobalTransactional注解:

注:分支事务执行过程省略其它过程。

业务二在执行全局事务过程中,分支事务 A 提交前注册分支事务获取全局锁时,发现业务业务一全局锁还没执行完,因此业务二提交不了,抛异常回滚,所以不会发生脏写。

2、业务二执行时加 @GlobalLock注解:

注:分支事务执行过程省略其它过程。

@GlobalTransactional注解效果类似,只不过不需要开启全局事务,只在本地事务提交前,检查全局锁是否存在。

2、业务二执行时加 @GlobalLock 注解 + select for update语句:

如果加了select for update语句,则会在 update 前检查全局锁是否存在,只有当全局锁释放之后,业务二才能开始执行 updateA 操作。

如果单单是 transactional,那么就有可能会出现脏写,根本原因是没有 Globallock 注解时,不会检查全局锁,这可能会导致另外一个全局事务回滚时,发现某个分支事务被脏写了。所以加 select for update 也有个好处,就是可以重试。

如何防止脏读

Seata AT 模式的脏读是指在全局事务未提交前,被其它业务读到已提交的分支事务的数据,本质上是Seata默认的全局事务是读未提交。

那么怎么避免脏读现象呢?

业务二查询 A 时加 @GlobalLock 注解 + select for update语句:

select for update语句会在执行 SQL 前检查全局锁是否存在,只有当全局锁完成之后,才能继续执行 SQL,这样就防止了脏读。

作者简介:

张乘辉,目前就职于蚂蚁集团,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 11 分钟

在上一篇关于新版雪花算法的解析中,我们提到新版算法所做出的2点改变:

  1. 时间戳不再时刻追随系统时钟。
  2. 节点ID和时间戳互换位置。由原版的: 原版位分配策略 改成: 改进版位分配策略

有细心的同学提出了一个问题:新版算法在单节点内部确实是单调递增的,但是在多实例部署时,它就不再是全局单调递增了啊!因为显而易见,节点ID排在高位,那么节点ID大的,生成的ID一定大于节点ID小的,不管时间上谁先谁后。而原版算法,时间戳在高位,并且始终追随系统时钟,可以保证早生成的ID小于晚生成的ID,只有当2个节点恰好在同一时间戳生成ID时,2个ID的大小才由节点ID决定。这样看来,新版算法是不是错的?

这是一个很好的问题!能提出这个问题的同学,说明已经深入思考了标准版雪花算法和新版雪花算法的本质区别,这点值得鼓励!在这里,我们先说结论:新版算法的确不具备全局的单调递增性,但这不影响我们的初衷(减少数据库的页分裂)。这个结论看起来有点违反直觉,但可以被证明。

在证明之前,我们先简单回顾一下数据库关于页分裂的知识。以经典的mysql innodb为例,innodb使用B+树索引,其中,主键索引的叶子节点还保存了数据行的完整记录,叶子节点之间以双向链表的形式串联起来。叶子节点的物理存储形式为数据页,一个数据页内最多可以存储N条行记录(N与行的大小成反比)。如图所示: 数据页
B+树的特性要求,左边的节点应小于右边的节点。如果此时要插入一条ID为25的记录,会怎样呢(假设每个数据页只够存放4条记录)?答案是会引起页分裂,如图: 页分裂
页分裂是IO不友好的,需要新建数据页,拷贝转移旧数据页的部分记录等,我们应尽量避免。

理想的情况下,主键ID最好是顺序递增的(例如把主键设置为auto_increment),这样就只会在当前数据页放满了的时候,才需要新建下一页,双向链表永远是顺序尾部增长的,不会有中间的节点发生分裂的情况。

最糟糕的情况下,主键ID是随机无序生成的(例如java中一个UUID字符串),这种情况下,新插入的记录会随机分配到任何一个数据页,如果该页已满,就会触发页分裂。

如果主键ID由标准版雪花算法生成,最好的情况下,是每个时间戳内只有一个节点在生成ID,这时候算法的效果等同于理想情况的顺序递增,即跟auto_increment无差。最坏的情况下,是每个时间戳内所有节点都在生成ID,这时候算法的效果接近于无序(但仍比UUID的完全无序要好得多,因为workerId只有10位决定了最多只有1024个节点)。实际生产中,算法的效果取决于业务流量,并发度越低,算法越接近理想情况。

那么,换成新版算法又会如何呢?
新版算法从全局角度来看,ID是无序的,但对于每一个workerId,它生成的ID都是严格单调递增的,又因为workerId是有限的,所以最多可划分出1024个子序列,每个子序列都是单调递增的。
对于数据库而言,也许它初期接收的ID都是无序的,来自各个子序列的ID都混在一起,就像这样: 初期
如果这时候来了个worker1-seq2,显然会造成页分裂: 首次分裂
但分裂之后,有趣的事情发生了,对于worker1而言,后续的seq3,seq4不会再造成页分裂(因为还装得下),seq5也只需要像顺序增长那样新建页进行链接(区别是这个新页不是在双向链表的尾部)。注意,worker1的后续ID,不会排到worker2及之后的任意节点(因而不会造成后边节点的页分裂),因为它们总比worker2的ID小;也不会排到worker1当前节点的前边(因而不会造成前边节点的页分裂),因为worker1的子序列总是单调递增的。在这里,我们称worker1这样的子序列达到了稳态,意为这条子序列已经"稳定"了,它的后续增长只会出现在子序列的尾部,而不会造成其它节点的页分裂。

同样的事情,可以推广到各个子序列上。无论前期数据库接收到的ID有多乱,经过有限次的页分裂后,双向链表总能达到这样一个稳定的终态: 终态
到达终态后,后续的ID只会在该ID所属的子序列上进行顺序增长,而不会造成页分裂。该状态下的顺序增长与auto_increment的顺序增长的区别是,前者有1024个增长位点(各个子序列的尾部),后者只有尾部一个。

到这里,我们可以回答开头所提出的问题了:新算法从全局来看的确不是全局递增的,但该算法是收敛的,达到稳态后,新算法同样能达成像全局顺序递增一样的效果。


扩展思考

以上只提到了序列不停增长的情况,而实践生产中,不光有新数据的插入,也有旧数据的删除。而数据的删除有可能会导致页合并(innodb若发现相邻2个数据页的空间利用率都不到50%,就会把它俩合并),这对新算法的影响如何呢?

经过上面的流程,我们可以发现,新算法的本质是利用前期的页分裂,把不同的子序列逐渐分离开来,让算法不断收敛到稳态。而页合并则恰好相反,它有可能会把不同的子序列又合并回同一个数据页里,妨碍算法的收敛。尤其是在收敛的前期,频繁的页合并甚至可以让算法永远无法收敛(你刚分离出来我就又把它们合并回去,一夜回到解放前~)!但在收敛之后,只有在各个子序列的尾节点进行的页合并,才有可能破坏稳态(一个子序列的尾节点和下一个子序列的头节点进行合并)。而在子序列其余节点上的页合并,不影响稳态,因为子序列仍然是有序的,只不过长度变短了而已。

以seata的服务端为例,服务端那3张表的数据的生命周期都是比较短的,一个全局事务结束之后,它们就会被清除了,这对于新算法是不友好的,没有给时间它进行收敛。不过已经有延迟删除的PR在review中,搭配这个PR,效果会好很多。比如定期每周清理一次,前期就有足够的时间给算法进行收敛,其余的大部分时间,数据库就能从中受益了。到期清理时,最坏的结果也不过是表被清空,算法从头再来。

如果您希望把新算法应用到业务系统当中,请务必确保算法有时间进行收敛。比如用户表之类的,数据本就打算长期保存的,算法可以自然收敛。或者也做了延迟删除的机制,给算法足够的时间进行收敛。

如果您有更好的意见和建议,也欢迎跟seata社区联系!

· 阅读需 9 分钟

Seata内置了一个分布式UUID生成器,用于辅助生成全局事务ID和分支事务ID。我们希望该生成器具有如下特点:

  • 高性能
  • 全局唯一
  • 趋势递增

高性能不必多言。全局唯一很重要,否则不同的全局事务/分支事务会混淆在一起。 此外,趋势递增对于使用数据库作为TC集群的存储工具的用户而言,能降低数据页分裂的频率,从而减少数据库的IO压力 (branch_table表以分支事务ID作为主键)。

在老版Seata(1.4以前),该生成器的实现基于标准版的雪花算法。标准版雪花算法网上已经有很多解读文章了,此处就不再赘述了。 尚未了解的同学可以先看看网上的相关资料,再来看此文章。 此处我们谈谈标准版雪花算法的几个缺点:

  1. 时钟敏感。因为ID生成总是和当前操作系统的时间戳绑定的(利用了时间的单调递增性),因此若操作系统的时钟出现回拨, 生成的ID就会重复(一般而言不会人为地去回拨时钟,但服务器会有偶发的"时钟漂移"现象)。 对于此问题,Seata的解决策略是记录上一次的时间戳,若发现当前时间戳小于记录值(意味着出现了时钟回拨),则拒绝服务, 等待时间戳追上记录值。 但这也意味着这段时间内该TC将处于不可用状态。
  2. 突发性能有上限。标准版雪花算法宣称的QPS很大,约400w/s,但严格来说这算耍了个文字游戏~ 因为算法的时间戳单位是毫秒,而分配给序列号的位长度为12,即每毫秒4096个序列空间。 所以更准确的描述应该是4096/ms。400w/s与4096/ms的区别在于前者不要求每一毫秒的并发都必须低于4096 (也许有些毫秒会高于4096,有些则低于)。Seata亦遵循此限制,若当前时间戳的序列空间已耗尽,会自旋等待下一个时间戳。

在较新的版本上(1.4之后),该生成器针对原算法进行了一定的优化改良,很好地解决了上述的2个问题。 改进的核心思想是解除与操作系统时间戳的时刻绑定,生成器只在初始化时获取了系统当前的时间戳,作为初始时间戳, 但之后就不再与系统时间戳保持同步了。它之后的递增,只由序列号的递增来驱动。比如序列号当前值是4095,下一个请求进来, 序列号+1溢出12位空间,序列号重新归零,而溢出的进位则加到时间戳上,从而让时间戳+1。 至此,时间戳和序列号实际可视为一个整体了。实际上我们也是这样做的,为了方便这种溢出进位,我们调整了64位ID的位分配策略, 由原版的: 原版位分配策略

改成(即时间戳和节点ID换个位置): 改进版位分配策略

这样时间戳和序列号在内存上是连在一块的,在实现上就很容易用一个AtomicLong来同时保存它俩:

/**
* timestamp and sequence mix in one Long
* highest 11 bit: not used
* middle 41 bit: timestamp
* lowest 12 bit: sequence
*/
private AtomicLong timestampAndSequence;

最高11位可以在初始化时就确定好,之后不再变化:

/**
* business meaning: machine ID (0 ~ 1023)
* actual layout in memory:
* highest 1 bit: 0
* middle 10 bit: workerId
* lowest 53 bit: all 0
*/
private long workerId;

那么在生产ID时就很简单了:

public long nextId() {
// 获得递增后的时间戳和序列号
long next = timestampAndSequence.incrementAndGet();
// 截取低53位
long timestampWithSequence = next & timestampAndSequenceMask;
// 跟先前保存好的高11位进行一个或的位运算
return workerId | timestampWithSequence;
}

至此,我们可以发现:

  1. 生成器不再有4096/ms的突发性能限制了。倘若某个时间戳的序列号空间耗尽,它会直接推进到下一个时间戳, "借用"下一个时间戳的序列号空间(不必担心这种"超前消费"会造成严重后果,下面会阐述理由)。
  2. 生成器弱依赖于操作系统时钟。在运行期间,生成器不受时钟回拨的影响(无论是人为回拨还是机器的时钟漂移), 因为生成器仅在启动时获取了一遍系统时钟,之后两者不再保持同步。 唯一可能产生重复ID的只有在重启时的大幅度时钟回拨(人为刻意回拨或者修改操作系统时区,如北京时间改为伦敦时间~ 机器时钟漂移基本是毫秒级的,不会有这么大的幅度)。
  3. 持续不断的"超前消费"会不会使得生成器内的时间戳大大超前于系统的时间戳, 从而在重启时造成ID重复? 理论上如此,但实际几乎不可能。要达到这种效果,意味该生成器接收的QPS得持续稳定在400w/s之上~ 说实话,TC也扛不住这么高的流量,所以说呢,天塌下来有个子高的先扛着,瓶颈一定不在生成器这里。

此外,我们还调整了下节点ID的生成策略。原版在用户未手动指定节点ID时,会截取本地IPv4地址的低10位作为节点ID。 在实践生产中,发现有零散的节点ID重复的现象(多为采用k8s部署的用户)。例如这样的IP就会重复:

  • 192.168.4.10
  • 192.168.8.10

即只要IP的第4个字节和第3个字节的低2位一样就会重复。 新版的策略改为优先从本机网卡的MAC地址截取低10位,若本机未配置有效的网卡,则在[0, 1023]中随机挑一个作为节点ID。 这样调整后似乎没有新版的用户再报同样的问题了(当然,有待时间的检验,不管怎样,不会比IP截取策略更糟糕)。

以上就是对Seata的分布式UUID生成器的简析,如果您喜欢这个生成器,也可以直接在您的项目里使用它, 它的类声明是public的,完整类名为: io.seata.common.util.IdWorker

当然,如果您有更好的点子,也欢迎跟Seata社区讨论。

· 阅读需 6 分钟

现状 & 痛点

对于Seata而言,是通过记录DML操作的前后的数据用于进行后续可能的回滚操作的,并且把这些数据保存到数据库的一个blob的字段里面。对于批量插入,更新,删除等操作,其影响的行数可能会比较多,拼接成一个大的字段插入到数据库,可能会带来以下问题:

  1. 超出数据库单次操作的最大写入限制(比如MySQL的max_allowed_package参数);
  2. 较大的数据量带来的网络IO和数据库磁盘IO开销比较大。

头脑风暴

对于第1点的问题,可以根据业务的实际情况,调大max_allowed_package参数的限制,从而避免出现query is too large的问题;对于第2点,可以通过提高带宽和选用高性能的SSD作为数据库的存储介质。

以上都是通过外部方案或者加钱方案去解决的。那么有没有框架层面解决方案以解决上面的痛点?

此时结合到以上的痛点出现的根源,在于生成的数据字段过大。为此,如果可以把对应的数据进行业务方压缩之后,再进行数据传输以及落库,理论上也可以解决上面的问题。

可行性分析

结合以上头脑风暴的内容,考虑在实际开发中,当需要进行大批量操作的时候,大多会选在较少用户操作,并发相对较低的时间点执行,此时CPU,内存等资源可以相对占用多一点以快速完成对应的操作。因此,可以通过消耗CPU资源和内存资源,来对对应的回滚的数据进行压缩,从而缩小数据传输和存储的大小。

此时,还需要证明以下两件事:

  1. 经过压缩之后,可以减少网络IO和数据库磁盘IO的压力,这里可以采用数据压缩+落库完成的总时间作为侧面参考指标。
  2. 经过压缩之后,数据大小跟原来比较的压缩效率有多高,这里使用压缩前后的数据大小来作为指标。

压缩网络用时指标测试:

image

压缩比测试:

image

通过以上的测试结果,可以明显的看出,使用gzip或zip进行压缩的情况下,可以较大程度的减少数据库的压力和网络传输的压力,同时也可以较大幅度的减少保存的数据的大小。

实现

实现思路

压缩

部分代码

properties配置:

# 是否开启undo_log压缩,默认为true
seata.client.undo.compress.enable=true
# 压缩器类型,默认为zip,一般建议都是zip
seata.client.undo.compress.type=zip
# 启动压缩的阈值,默认为64k
seata.client.undo.compress.threshold=64k

判断是否开启了undo_log压缩功能以及是否达到压缩的阈值:

protected boolean needCompress(byte[] undoLogContent) {
// 1. 判断是否开启了undo_log压缩功能(1.4.2默认开启)
// 2. 判断是否达到了压缩的阈值(默认64k)
// 如果都满足返回需要对对应的undoLogContent进行压缩
return ROLLBACK_INFO_COMPRESS_ENABLE
&& undoLogContent.length > ROLLBACK_INFO_COMPRESS_THRESHOLD;
}

确定需要压缩后,对undo_log进行压缩:

// 如果需要压缩,对undo_log进行压缩
if (needCompress(undoLogContent)) {
// 获取压缩类型,默认zip
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
// 获取对应的压缩器,并且进行压缩
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
}
// else 不需要压缩就不需要做任何操作

将压缩类型同步保存到数据库,供回滚时使用:

protected String buildContext(String serializer, CompressorType compressorType) {
Map<String, String> map = new HashMap<>();
map.put(UndoLogConstants.SERIALIZER_KEY, serializer);
// 保存压缩类型到数据库
map.put(UndoLogConstants.COMPRESSOR_TYPE_KEY, compressorType.name());
return CollectionUtils.encodeMap(map);
}

回滚时解压缩对应的信息:

protected byte[] getRollbackInfo(ResultSet rs) throws SQLException  {
// 获取保存到数据库的回滚信息的字节数组
byte[] rollbackInfo = rs.getBytes(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
// 获取压缩类型
// getOrDefault使用默认值CompressorType.NONE来兼容1.4.2之前的版本直接升级1.4.2+
String rollbackInfoContext = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = CollectionUtils.decodeMap(rollbackInfoContext);
CompressorType compressorType = CompressorType.getByName(context.getOrDefault(UndoLogConstants.COMPRESSOR_TYPE_KEY,
CompressorType.NONE.name()));
// 获取对应的压缩器,并且解压缩
return CompressorFactory.getCompressor(compressorType.getCode())
.decompress(rollbackInfo);
}

结语

通过对undo_log的压缩,在框架层面,进一步提高Seata在处理数据量较大的时候的性能。同时,也提供了对应的开关和相对合理的默认值,既方便用户进行开箱即用,也方便用户根据实际需求进行一定的调整,使得对应的功能更适合实际使用场景。

· 阅读需 16 分钟
  1. seata版本:1.4.0,但1.4以下的所有版本也都有这个问题
  2. 问题描述:在一个全局事务中,一个分支事务上的纯查询操作突然卡住了,没有任何反馈(日志/异常),直到消费端RPC超时

image.png

问题排查

  1. 整个流程在一个全局事务中,消费者和提供者可以看成是全局事务中的两个分支事务,消费者 --> 提供者
  2. 消费者先执行本地的一些逻辑,然后向提供者发送RPC请求,确定消费者发出了请求已经并且提供者接到了请求
  3. 提供者先打印一条日志,然后执行一条纯查询SQL,如果SQL正常执行会打印日志,但目前的现象是只打印了执行SQL前的那条日志,而没有打印任何SQL相关的日志。找DBA查SQL日志,发现该SQL没有执行
  4. 确定了该操作只是全局事务下的一个纯查询操作,在该操作之前,全局事务中的整体流程完全正常
  5. 其实到这里现象已经很明显了,不过当时想法没转变过来,一直关注那条查询SQL,总在想就算查询超时等原因也应该抛出异常啊,不应该什么都没有。DBA都找不到查询记录,那是不是说明SQL可能根本就没执行啊,而是在执行SQL前就出问题了,比如代理?
  6. 借助arthas的watch命令,一直没有东西输出。第一条日志的输出代表这个方法一定执行了,迟迟没有结果输出说明当前请求卡住了,为什么卡住了呢?
  7. 借助arthas的thread命令 thread -bthread -n,就是要找出当前最忙的线程。这个效果很好,有一个线程CPU使用率92%,并且因为该线程导致其它20多个Dubbo线程BLOCKED了。堆栈信息如下
  8. 分析堆栈信息,已经可以很明显的发现和seata相关的接口了,估计和seata的数据源代理有关;同时发现CPU占用最高的那个线程卡在了ConcurrentHashMap#computeIfAbsent方法中。难道ConcurrentHashMap#computeIfAbsent方法有bug?
  9. 到这里,问题的具体原因我们还不知道,但应该和seata的数据源代理有点关系,具体原因我们需要分析业务代码和seata代码

image.png

问题分析

ConcurrentHashMap#computeIfAbsent

这个方法确实有可能出问题:如果两个key的hascode相同,并且在对应的mappingFunction中又进行了computeIfAbsent操作,则会导致死循环,具体分析参考这篇文章:https://juejin.cn/post/6844904191077384200

Seata数据源自动代理

相关内容之前有分析过,我们重点来看看以下几个核心的类:

  1. SeataDataSourceBeanPostProcessor
  2. SeataAutoDataSourceProxyAdvice
  3. DataSourceProxyHolder
SeataDataSourceBeanPostProcessor

SeataDataSourceBeanPostProcessorBeanPostProcessor实现类,在postProcessAfterInitialization方法(即Bean初始化之后)中,会为业务方配置的数据源创建对应的seata代理数据源

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}
//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy.");
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean;
}
}
SeataAutoDataSourceProxyAdvice

SeataAutoDataSourceProxyAdvice是一个MethodInterceptor,seata中的SeataAutoDataSourceProxyCreator会针对DataSource类型的Bean创建动态代理对象,代理逻辑就是SeataAutoDataSourceProxyAdvice#invoke逻辑。即:执行数据源AOP代理对象的相关方法时候,会经过其invoke方法,在invoke方法中再根据当原生数据源,找到对应的seata代理数据源,最终达到执行seata代理数据源逻辑的目的

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
......
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
return invocation.proceed();
}
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m != null) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
}
DataSourceProxyHolder

流程上我们已经清楚了,现在还有一个问题,如何维护原生数据源seata代理数据源之间的关系?通过DataSourceProxyHolder维护,这是一个单例对象,该对象中通过一个ConcurrentHashMap维护两者的关系:原生数据源为key --> seata代理数据源 为value

public class DataSourceProxyHolder {
public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource = dataSource;
......
return CollectionUtils.computeIfAbsent(this.dataSourceProxyMap, originalDataSource,
BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new);
}
}


// CollectionUtils.java
public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value != null) {
return value;
}
return map.computeIfAbsent(key, mappingFunction);
}

客户端数据源配置

  1. 配置了两个数据源:DynamicDataSourceP6DataSource
  2. P6DataSource可以看成是对DynamicDataSource的一层包装
  3. 我们暂时不去管这个配置合不合理,现在只是单纯的基于这个数据源配置分析问题
@Qualifier("dsMaster")
@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}

@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(dsMaster());
return p6DataSource;
}

分析过程

假设现在大家都已经知道了 ConcurrentHashMap#computeIfAbsent 可能会产生的问题,已知现在产生了这个问题,结合堆栈信息,我们可以知道大概哪里产生了这个问题。

1、ConcurrentHashMap#computeIfAbsent会产生这个问题的前提条件是:两个key的hashcode相同mappingFunction中对应了一个put操作。结合我们seata的使用场景,mappingFunction对应的是DataSourceProxy::new,说明在DataSourceProxy的构造方法中可能会触发put操作

image.png

执行AOP代理数据源相关方法 =>
进入SeataAutoDataSourceProxyAdvice切面逻辑 =>
执行DataSourceProxyHolder#putDataSource方法 =>
执行DataSourceProxy::new =>
AOP代理数据源的getConnection方法 =>
原生数据源的getConnection方法 =>
进入SeataAutoDataSourceProxyAdvice切面逻辑 =>
执行DataSourceProxyHolder#putDataSource方法 =>
执行DataSourceProxy::new =>
DuridDataSource的getConnection方法

2、步骤1中说的AOP代理数据源原生数据源分别是什么?看下面这张图 image.png

3、上面还说到了产生这个问题还有一个条件两个key的hashcode相同,但我看这两个数据源对象都没有重写hashcode方法,所以按理来说,这两个对象的hashcode一定是不同的。后面又再看了一遍ConcurrentHashMap这个问题,感觉两个key的hashcode相同这个说法是不对的,两个key会产生hash冲突更合理一些,这样就能解释两个hashcode不同的对象为啥会遇上这个问题了。为了证明这个,下面我给了一个例子

public class Test {
public static void main(String[] args) {
ConcurrentHashMap map = new ConcurrentHashMap(8);
Num n1 = new Num(3);
Num n2 = new Num(19);
Num n3 = new Num(20);

// map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)); // 这行代码不会导致程序死循环
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)); // 这行代码会导致程序死循环
}

static class Num{
private int i;
public Num(int i){
this.i = i;
}

@Override
public int hashCode() {
return i;
}
}
}
  1. 为了方便重现问题,我们重写了Num#hashCode方法,保证构造函数入参就是hashcode的值
  2. 创建一个ConcurrentHashMap对象,initialCapacity为8,sizeCtl计算出来的值为16,即该map中数组长度默认为16
  3. 创建对象n1,入参为3,即hashcode为3,计算得出其对应的数组下标为3
  4. 创建对象n2,入参为19,即hashcode为19,计算得出其对应的数组下标为3,此时我们可以认为n1和n2产生了hash冲突
  5. 创建对象n3,入参为20,即hashcode为20,计算得出其对应的数组下标为4
  6. 执行map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)),程序正常退出:因为n1和n3没有hash冲突,所以正常结束
  7. 执行map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)),程序正常退出:因为n1和n2产生了hash冲突,所以陷入死循环

4、在对象初始化的时候,SeataDataSourceBeanPostProcessor不是已经将对象对应的数据源代理初始化好了吗?为什么在SeataAutoDataSourceProxyAdvice中还是会创建对应的数据源代理呢?

  1. 首先,SeataDataSourceBeanPostProcessor执行时期是晚于AOP代理对象创建的,所以在执行SeataDataSourceBeanPostProcessor相关方法的时候,SeataAutoDataSourceProxyAdvice其实应生效了
  2. SeataDataSourceBeanPostProcessor中向map中添加元素时,key为AOP代理数据源SeataAutoDataSourceProxyAdvice中的invocation.getThis()中拿到的是原生数据源,所以key不相同

image.png

5、还有一个问题,SeataAutoDataSourceProxyAdvic#invoke方法中并没有过滤toString、hashCode等方法,cglib创建的代理对象默认会重写这几个方法,如果在向map中put元素的时候触发了代理对象的这些方法,此时又会重新进入SeataAutoDataSourceProxyAdvic#invoke切面,直到线程堆栈益处

问题总结

  1. 在两个key会产生hash冲突的时候,会触发ConcurrentHashMap#computeIfAbsentBUG,该BUG的表现就是让当前线程陷入死循环
  2. 业务反馈,该问题是偶现的,偶现的原因有两种:首先,该应用是多节点部署,但线上只有一个节点触发了该BUG(hashcode冲突),所以只有当请求打到这个节点的时候才有可能会触发该BUG;其次,因为每次重启对象地址(hashcode)都是不确定的,所以并不是每次应用重启之后都会触发,但如果一旦触发,该节点就会一直存在这个问题。有一个线程一直在死循环,并将其它尝试从map中获取代理数据源的线程阻塞了,这种现象在业务上的反馈就是请求卡住了。如果连续请求都是这样,此时业务方可能会重启服务,然后因为重启后hash冲突不一定存在,可能重启后业务表现就正常了,但也有可能在下次重启的时候又会触发了这个BUG
  3. 当遇到这个问题时,从整个问题上来看,确实就是死锁了,因为那个死循环的线程占者锁一直不释放,导致其它操作该map的线程被BLOCK了
  4. 本质上还是因为ConcurrentHashMap#computeIfAbsent方法可能会触发BUG,而seata的使用场景刚好就触发了该BUG
  5. 下面这个demo其实就完整的模拟了线上出问题时的场景,如下:
public class Test {
public static void main(String[] args) {

ConcurrentHashMap map = new ConcurrentHashMap(8);

Num n1 = new Num(3);
Num n2 = new Num(19);

for(int i = 0; i< 20; i++){
new Thread(()-> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

map.computeIfAbsent(n1, k-> 200);
}).start();
}
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200));
}


static class Num{
private int i;

public Num(int i){
this.i = i;
}
@Override
public int hashCode() {
return i;
}
}
}

image.png

解决问题

可以从两方面解决这个问题:

  1. 业务方改动:P6DataSource 和 DynamicDataSource 没必要都被代理,直接代理P6DataSource就可以了,DynamicDataSource没有必要声明成一个Bean;或者通过excluds属性排除P6DataSource,这样就不会存在重复代理问题
  2. Seata完善:完善数据源代理相关逻辑
业务方改动

1、数据源相关的配置改成如下即可:

@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(new TuYaDynamicDataSource(masterDsRoute));
logger.warn("dsMaster={}, hashcode={}",p6DataSource, p6DataSource.hashCode());
return p6DataSource;
}

2、或者不改变目前的数据源配置,添加excluds属性

@EnableAutoDataSourceProxy(excludes={"P6DataSource"})
Seata完善

1、ConcurrentHashMap#computeIfAbsent方法改成双重检查,如下:

SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
synchronized (dataSourceProxyMap) {
dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
dataSourceProxyMap.put(originalDataSource, dsProxy);
}
}
}
return dsProxy;

之前我想直接改CollectionUtils#computeIfAbsent方法,群里大佬反馈这样可能会导致数据源多次创建,确实有这个问题:如下

public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value != null) {
return value;
}
value = mappingFunction.apply(key);
return map.computeIfAbsent(key, value);
}

2、SeataAutoDataSourceProxyAdvice切面逻辑中添加一些过滤

Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m != null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}

遗留问题

SeataDataSourceBeanPostProcessorSeataAutoDataSourceProxyAdvice对应方法中,向map中初始化seata数据源代理时对应的key根本就不同,SeataDataSourceBeanPostProcessor中对应的key是AOP代理数据源SeataAutoDataSourceProxyAdvice中对应的key是原生对象,此时就造成了不必要的seata数据源代理对象的创建。

针对这个问题,大家有什么好的建议?能不能为SeataDataSourceBeanPostProcessor指定一个order,让其在创建AOP代理对象之前生效

原文链接

https://juejin.cn/post/6939041336964153352/

· 阅读需 19 分钟

“刚上手Seata,对其各个模块了解还不够深入?
想深入研究Seata源码,却还未付诸实践?
想探究下在集成Seata后,自己的应用在启动过程中“偷偷”干了些啥?
想学习Seata作为一款优秀开源框架蕴含的设计理念和最佳实践?
如果你有上述任何想法之一,那么今天这篇文章,就是为你量身打造的~

前言

在Seata的应用侧(RM、TM)启动过程中,首先要做的就是与协调器侧(TC)建立通信,这是Seata能够完成分布式事务协调的前提,那么Seata在完成应用侧初始化以及与TC建立连接的过程中,是如何找到TC事务协调器的集群和地址的?又是如何从配置模块中获取各种配置信息的呢?这正是本文要探究的重点。

给个限定

Seata作为一款中间件级的底层组件,是很谨慎引入第三方框架具体实现的,感兴趣的同学可以深入了解下Seata的SPI机制,看看Seata是如何通过大量扩展点(Extension),来将依赖组件的具体实现倒置出去,转而依赖抽象接口的,同时,Seata为了更好地融入微服务、云原生等流行架构所衍生出来的生态中,也基于SPI机制对多款主流的微服务框架、注册中心、配置中心以及Java开发框架界“扛把子”——SpringBoot等做了主动集成,在保证微内核架构、松耦合、可扩展的同时,又可以很好地与各类组件“打成一片”,使得采用了各种技术栈的环境都可以比较方便地引入Seata。

本文为了贴近大家刚引入Seata试用时的场景,在以下介绍中,选择应用侧的限定条件如下:使用File(文件)作为配置中心与注册中心,并基于SpringBoot启动。

有了这个限定条件,接下来就让我们深入Seata源码,一探究竟吧。

多模块交替协作的RM/TM初始化过程

Seata客户端启动过程剖析(一)中,我们分析了Seata应用侧TM与RM的初始化、以及应用侧如何创建Netty Channel并向TC Server发送注册请求的过程。除此之外,在RM初始化过程中,Seata的其他多个模块(注册中心、配置中心、负载均衡)也都纷纷登场,相互协作,共同完成了连接TC Server的过程。

当执行Client重连TC Server的方法:NettyClientChannelManager.Channreconnect()时,首先需要根据当前的事务分组获取可用的TC Server地址列表:

    /**
* NettyClientChannelManager.reconnect()
* Reconnect to remote server of current transaction service group.
*
* @param transactionServiceGroup transaction service group
*/
void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
//从注册中心中获取可用的TC Server地址
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
//以下代码略
}

关于事务分组的详细概念介绍,大家可以参考官方文档事务分组介绍。这里简单介绍一下:

  • 每个Seata应用侧的RM、TM,都具有一个事务分组
  • 每个Seata协调器侧的TC,都具有一个集群名地址 应用侧连接协调器侧时,经历如下两步:
  • 通过事务分组的名称,从配置中获取到该应用侧对应的TC集群名
  • 通过集群名称,可以从注册中心中获取TC集群的地址列表 以上概念、关系与过程,如下图所示: Seata事务分组与建立连接的关系

注册中心获取TC Server集群地址

了解RM/TC连接TC时涉及的主要概念与步骤后,我们继续探究getAvailServerList方法:

    private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {
//① 使用注册中心工厂,获取注册中心实例
//② 调用注册中心的查找方法lookUp(),根据事务分组名称获取TC集群中可用Server的地址列表
List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);
if (CollectionUtils.isEmpty(availInetSocketAddressList)) {
return Collections.emptyList();
}

return availInetSocketAddressList.stream()
.map(NetUtil::toStringAddress)
.collect(Collectors.toList());
}

用哪个注册中心?Seata元配置文件给出答案

上面已提到,Seata支持多种注册中心的实现,那么,Seata首先需要从一个地方先获取到“注册中心的类型”这个信息。

从哪里获取呢?Seata设计了一个“配置文件”用于存放其框架内所用组件的一些基本信息,我更愿意称这个配置文件为 『元配置文件』,这是因为它包含的信息,其实是“配置的配置”,也即“元”的概念,大家可以对比数据库表中的信息,和数据库表本身结构的信息(表数据和表元数据)来理解。

我们可以把注册中心、配置中心中的信息,都看做是配置信息本身,而这些配置信息的配置是什么?这些信息,就包含在Seata的元配置文件中。实际上,『元配置文件』中只包含两类信息

  • 一是注册中心的类型:registry.type,以及该类型注册中心的一些基本信息,比如当注册中心类型为文件时,元配置文件中存放了文件的名字信息;当注册中心类型是Nacos时,元配置文件中则存放着Nacos的地址、命名空间、集群名等信息
  • 二是配置中心的类型:config.type,以及该类型配置中心的一些基本信息,比如当配置中心为文件时,元配置文件中存放了文件的名字信息;当注册中心类型为Consul时,元配置文件中存放了Consul的地址信息

Seata的元配置文件支持Yaml、Properties等多种格式,而且可以集成到SpringBoot的application.yaml文件中(使用seata-spring-boot-starter即可),方便与SpringBoot集成。

Seata中自带的默认元配置文件是registry.conf,当我们采用文件作为注册与配置中心时,registry.conf中的内容设置如下:

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
file {
name = "file.conf"
}
}

在如下源码中,我们可以发现,Seata使用的注册中心的类型,是从ConfigurationFactory.CURRENT_FILE_INSTANCE中获取的,而这个CURRENT_FILE_INSTANCE,就是我们所说的,Seata元配置文件的实例

    //在getInstance()中,调用buildRegistryService,构建具体的注册中心实例
public static RegistryService getInstance() {
if (instance == null) {
synchronized (RegistryFactory.class) {
if (instance == null) {
instance = buildRegistryService();
}
}
}
return instance;
}

private static RegistryService buildRegistryService() {
RegistryType registryType;
//获取注册中心类型
String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
try {
registryType = RegistryType.getType(registryTypeName);
} catch (Exception exx) {
throw new NotSupportYetException("not support registry type: " + registryTypeName);
}
if (RegistryType.File == registryType) {
return FileRegistryServiceImpl.getInstance();
} else {
//根据注册中心类型,使用SPI的方式加载注册中心的实例
return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
}
}

我们来看一下元配置文件的初始化过程,当首次获取静态字段CURRENT_FILE_INSTANCE时,触发ConfigurationFactory类的初始化:

    //ConfigurationFactory类的静态块
static {
load();
}

/**
* load()方法中,加载Seata的元配置文件
*/
private static void load() {
//元配置文件的名称,支持通过系统变量、环境变量扩展
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (seataConfigName == null) {
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}
if (seataConfigName == null) {
seataConfigName = REGISTRY_CONF_DEFAULT;
}
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (envValue == null) {
envValue = System.getenv(ENV_SYSTEM_KEY);
}
//根据元配置文件名称,创建一个实现了Configuration接口的文件配置实例
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName,
false) : new FileConfiguration(seataConfigName + "-" + envValue, false);
Configuration extConfiguration = null;
//通过SPI加载,来判断是否存在扩展配置提供者
//当应用侧使用seata-spring-boot-starer时,将通过SpringBootConfigurationProvider作为扩展配置提供者,这时当获取元配置项时,将不再从file.conf(默认)中获取,而是从application.properties/application.yaml中获取
try {
//通过ExtConfigurationProvider的provide方法,将原有的Configuration实例替换为扩展配置的实例
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null ? configuration.getClass().getSimpleName()
: extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
//存在扩展配置,则返回扩展配置实例,否则返回文件配置实例
CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
}

load()方法的调用序列图如下: Seata元配置文件的加载过程

上面的序列图中,大家可以关注以下几点:

  • Seata元配置文件名称支持扩展
  • Seata元配置文件后缀支持3种后缀,分别为yaml/properties/conf,在创建元配置文件实例时,会依次尝试匹配
  • Seata中配置能力相关的顶级接口为Configuration,各种配置中心均需实现此接口,Seata的元配置文件就是使用FileConfiguration(文件类型的配置中心)实现了此接口
/**
* Seata配置能力接口
* package:io.seata.config
*/

public interface Configuration {
/**
* Gets short.
*
* @param dataId the data id
* @param defaultValue the default value
* @param timeoutMills the timeout mills
* @return the short
*/
short getShort(String dataId, int defaultValue, long timeoutMills);

//以下内容略,主要能力为配置的增删改查
}
  • Seata提供了一个类型为ExtConfigurationProvider的扩展点,开放了对配置具体实现的扩展能力,它具有一个provide()方法,接收原有的Configuration,返回一个全新的Configuration,此接口方法的形式决定了,一般可以采用静态代理、动态代理、装饰器等设计模式来实现此方法,实现对原有Configuration的增强
/**
* Seata扩展配置提供者接口
* package:io.seata.config
*/
public interface ExtConfigurationProvider {
/**
* provide a AbstractConfiguration implementation instance
* @param originalConfiguration
* @return configuration
*/
Configuration provide(Configuration originalConfiguration);
}
  • 当应用侧基于seata-seata-spring-boot-starter启动时,将采用『SpringBootConfigurationProvider』作为扩展配置提供者,在其provide方法中,使用动态字节码生成(CGLIB)的方式为『FileConfiguration』实例创建了一个动态代理类,拦截了所有以"get"开头的方法,来从application.properties/application.yaml中获取元配置项。

关于SpringBootConfigurationProvider类,本文只说明下实现思路,不再展开分析源码,这也仅是ExtConfigurationProvider接口的一种实现方式,从Configuration可扩展、可替换的角度来看,Seata正是通过ExtConfigurationProvider这样一个扩展点,为多种配置的实现提供了一个广阔的舞台,允许配置的多种实现与接入方案。

经历过上述加载流程后,如果我们没有扩展配置提供者,我们将从Seata元配置文件中获取到注册中心的类型为file,同时创建了一个文件注册中心实例:FileRegistryServiceImpl

从注册中心获取TC Server地址

获取注册中心的实例后,需要执行lookup()方法(RegistryFactory.getInstance().lookup(transactionServiceGroup)),FileRegistryServiceImpl.lookup()的实现如下:

    /**
* 根据事务分组名称,获取TC Server可用地址列表
* package:io.seata.discovery.registry
* class:FileRegistryServiceImpl
*/
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
//获取TC Server集群名称
String clusterName = getServiceGroup(key);
if (clusterName == null) {
return null;
}
//从配置中心中获取TC集群中所有可用的Server地址
String endpointStr = CONFIG.getConfig(
PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + clusterName + POSTFIX_GROUPLIST);
if (StringUtils.isNullOrEmpty(endpointStr)) {
throw new IllegalArgumentException(clusterName + POSTFIX_GROUPLIST + " is required");
}
//将地址封装为InetSocketAddress并返回
String[] endpoints = endpointStr.split(ENDPOINT_SPLIT_CHAR);
List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();
for (String endpoint : endpoints) {
String[] ipAndPort = endpoint.split(IP_PORT_SPLIT_CHAR);
if (ipAndPort.length != 2) {
throw new IllegalArgumentException("endpoint format should like ip:port");
}
inetSocketAddresses.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])));
}
return inetSocketAddresses;
}

/**
* 注册中心接口中的default方法
* package:io.seata.discovery.registry
* class:RegistryService
*/
default String getServiceGroup(String key) {
key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
//在配置缓存中,添加事务分组名称变化监听事件
if (!SERVICE_GROUP_NAME.contains(key)) {
ConfigurationCache.addConfigListener(key);
SERVICE_GROUP_NAME.add(key);
}
//从配置中心中获取事务分组对应的TC集群名称
return ConfigurationFactory.getInstance().getConfig(key);
}

可以看到,代码逻辑与第一节中图Seata事务分组与建立连接的关系中的流程相符合, 这时,注册中心将需要配置中心的协助,来获取事务分组对应的集群名称、并查找集群中可用的服务地址。

配置中心获取TC集群名称

配置中心的初始化

配置中心的初始化(在ConfigurationFactory.buildConfiguration()),与注册中心的初始化流程类似,都是先从元配置文件中获取配置中心的类型等信息,然后初始化一个具体的配置中心实例,有了之前的分析基础,这里不再赘述。

获取配置项的值

上方代码段的两个方法:*FileRegistryServiceImpl.lookup()以及RegistryService.getServiceGroup()*中,都从配置中心中获取的配置项的值:

  • lookup()需要由具体的注册中心实现,使用文件作为注册中心,其实是一种直连TC Server的情况,其特殊点在于TC Server的地址是写死在配置中的的(正常应存于注册中心中),因此FileRegistryServiceImpl.lookup()方法,是通过配置中心获取的TC集群中Server的地址信息
  • getServiceGroup()是RegistryServer接口中的default方法,即所有注册中心的公共实现,Seata中任何一种注册中心,都需要通过配置中心来根据事务分组名称来获取TC集群名称

负载均衡

经过上述环节配置中心、注册中心的协作,现在我们已经获取到了当前应用侧所有可用的TC Server地址,那么在发送真正的请求之前,还需要通过特定的负载均衡策略,选择一个TC Server地址,这部分源码比较简单,就不带着大家分析了。

关于负载均衡的源码,大家可以阅读AbstractNettyRemotingClient.doSelect(),因本文分析的代码是RMClient/TMClient的重连方法,此方法中,所有获取到的Server地址,都会通过遍历依次连接(重连),因此这里不需要再做负载均衡。

以上就是Seata应用侧在启动过程中,注册中心与配置中心这两个关键模块之间的协作关系与工作流程,欢迎共同探讨、学习!

后记:本文及其上篇 Seata客户端启动过程剖析(一),是本人撰写的首批技术博客,将上手Seata时,个人认为Seata中较为复杂、需要研究和弄通的部分源码进行了分析和记录。 在此欢迎各位读者提出各种改进建议,谢谢!

· 阅读需 12 分钟

“刚上手 Seata,对其各个模块了解还不够深入?
想深入研究 Seata 源码,却还未付诸实践?
想探究下在集成 Seata 后,自己的应用在启动过程中“偷偷”干了些啥?
想学习 Seata 作为一款优秀开源框架蕴含的设计理念和最佳实践?
如果你有上述任何想法之一,那么今天这篇文章,就是为你量身打造的~

前言

看过官网 README 的第一张图片的同学都应该清楚,Seata 协调分布式事务的原理便在于通过其协调器侧的 TC,来与应用侧的 TM、RM 进行各种通信与交互,来保证分布式事务中,多个事务参与者的数据一致性。那么 Seata 的协调器侧与应用侧之间,是如何建立连接并进行通信的呢?

没错,答案就是 Netty,Netty 作为一款高性能的 RPC 通信框架,保证了 TC 与 RM 之间的高效通信,关于 Netty 的详细介绍,本文不再展开,今天我们探究的重点,在于应用侧在启动过程中,如何通过一系列 Seata 关键模块之间的协作(如 RPC、Config/Registry Center 等),来建立与协调器侧之间的通信

从 GlobalTransactionScanner 说起

我们知道 Seata 提供了多个开发期注解,比如用于开启分布式事务的@GlobalTransactional、用于声明 TCC 两阶段服务的@TwoPhraseBusinessAction 等,它们都是基于 Spring AOP 机制,对使用了注解的 Bean 方法分配对应的拦截器进行增强,来完成对应的处理逻辑。而 GlobalTransactionScanner 这个 Spring Bean,就承载着为各个注解分配对应的拦截器的职责,从其 Scanner 的命名,我们也不难推断出,它是为了在 Spring 应用启动过程中,对与全局事务(GlobalTransactionScanner)相关的 Bean 进行扫描、处理的。

除此之外,应用侧 RPC 客户端(TMClient、RMClient)初始化、与 TC 建立连接的流程,也是在 GlobalTransactionScanner#afterPropertiesSet()中发起的:

    /**
* package:io.seata.spring.annotation
* class:GlobalTransactionScanner
*/
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
//在Bean属性初始化之后,执行TM、RM的初始化
initClient();

}

RM & TM 的初始化与连接过程

这里,我们以 RMClient.init()为例说明,TMClient 的初始化过程亦同理。

类关系的设计

查看 RMClient#init()的源码,我们发现,RMClient 先构造了一个 RmNettyRemotingClient,然后执行其初始化init()方法。而 RmNettyRemotingClient 的构造器初始化方法,都会逐层调用父类的构造器与初始化方法

    /**
* RMClient的初始化逻辑
* package:io.seata.rm
* class:RMClient
*/
public static void init(String applicationId, String transactionServiceGroup) {
//① 首先从RmNettyRemotingClient类开始,依次调用父类的构造器
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
//② 然后从RmNettyRemotingClient类开始,依次调用父类的init()
rmNettyRemotingClient.init();
}

上述 RMClient 系列各类之间的关系以及调用构造器和 init()初始化方法的过程如下图示意: RMClient.init简化版流程与主要类之间的关系

那么为何要将 RMClient 设计成这样较为复杂的继承关系呢?其实是为了将各层的职责、边界划分清楚,使得各层可以专注于特定逻辑处理,实现更好的扩展性,这部分的详细设计思路,可参考 Seata RPC 模块重构 PR 的操刀者乘辉兄的文章Seata-RPC 重构之路

初始化的完整流程

各类的构造器与初始化方法中的主要逻辑,大家可以借助下面这个能表意的序列图来梳理下,此图大家也可先跳过不看,在下面我们分析过几个重点类后,再回头来看这些类是何时登场、如何交互的协作的。 RMClient的初始化流程

抓住核心——Channel 的创建

首先我们需要知道,应用侧与协调器侧的通信是借助 Netty 的 Channel(网络通道)来完成的,因此通信过程的关键在于 Channel 的创建,在 Seata 中,通过池化的方式(借助了 common-pool 中的对象池)方式来创建、管理 Channel。

这里我们有必要简要介绍下对象池的简单概念及其在 Seata 中的实现: 涉及到的 common-pool 中的主要类:

  • GenericKeydObjectPool<K, V>:KV 泛型对象池,提供对所有对象的存取管理,而对象的创建由其内部的工厂类来完成
  • KeyedPoolableObjectFactory<K, V>:KV 泛型对象工厂,负责池化对象的创建,被对象池持有

涉及到的 Seata 中对象池实现相关的主要类:

  • 首先,被池化管理的对象就是Channel,对应 common-pool 中的泛型 V
  • NettyPoolKey:Channel 对应的 Key,对应 common-pool 中的泛型 K,NettyPoolKey 主要包含两个信息:
    • address:创建 Channel 时,对应的 TC Server 地址
    • message:创建 Channel 时,向 TC Server 发送的 RPC 消息体
  • GenericKeydObjectPool<NettyPoolKey,Channel>:Channel 对象池
  • NettyPoolableFactory:创建 Channel 的工厂类

认识了上述对象池相关的主要类之后,我们再来看看 Seata 中涉及 Channel 管理以及与 RPC 相关的几个主要类:

  • NettyClientChannelManager:
    • 持有 Channel 对象池
    • 与 Channel 对象池交互,对应用侧 Channel 进行管理(获取、释放、销毁、缓存等)
  • RpcClientBootstrap:RPC 客户端核心引导类,持有 Netty 框架的 Bootstrap 对象,具备启停能力;具有根据连接地址来获取新 Channel 的能力,供 Channel 工厂类调用
  • AbstractNettyRemotingClient:
    • 初始化并持有 RpcClientBootstrap
    • 应用侧 Netty 客户端的顶层抽象,抽象了应用侧 RM/TM 取得各自 Channel 对应的 NettyPoolKey 的能力,供 NettyClientChannelManager 调用
    • 初始化 NettyPoolableFactory

了解上述概念后,我们可以把 Seata 中创建 Channel 的过程简化如下: 创建Channel对象过程

看到这里,大家可以回过头再看看上面的RMClient 的初始化序列图,应该会对图中各类的职责、关系,以及整个初始化过程的意图有一个比较清晰的理解了。

建立连接的时机与流程

那么,RMClient 是何时与 Server 建立连接的呢?

在 RMClient 初始化的过程中,大家会发现,很多 init()方法都设定了一些定时任务,而 Seata 应用侧与协调器的重连(连接)机制,就是通过定时任务来实现的:

    /**
* package:io.seata.core.rpcn.netty
* class:AbstractNettyRemotingClient
*/
public void init() {
//设置定时器,定时重连TC Server
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}

我们通过跟踪一次 reconnect 的执行,看看上面探究的几个类之间是如何协作,完成 RMClient 与 TC 的连接的(实际上首次连接可能发生在 registerResource 的过程中,但流程一致) RMClient与TC Server连接过程

这个图中,大家可以重点关注这几个点:

  • NettyClientChannelManager 执行具体 AbstractNettyRemotingClient 中,获取 NettyPoolKey 的回调函数(getPoolKeyFunction()):应用侧的不同 Client(RMClient 与 TMClient),在创建 Channel 时使用的 Key 不同,使两者在重连 TC Server 时,发送的注册消息不同,这也是由两者在 Seata 中扮演的角色不同而决定的:
    • TMClient:扮演事务管理器角色,创建 Channel 时,仅向 TC 发送 TM 注册请求(RegisterTMRequest)即可
    • RMClient:扮演资源管理器角色,需要管理应用侧所有的事务资源,因此在创建 Channel 时,需要在发送 RM 注册请求(RegesterRMRequest)前,获取应用侧所有事务资源(Resource)信息,注册至 TC Server
  • 在 Channel 对象工厂 NettyPoolableFactory 的 makeObject(制造 Channel)方法中,使用 NettyPoolKey 中的两项信息,完成了两项任务:
    • 使用 NettyPoolKey 的 address 创建新的 Channel
    • 使用 NettyPoolKey 的 message 以及新的 Channel 向 TC Server 发送注册请求,这就是 Client 向 TC Server 的连接(首次执行)或重连(非首次,由定时任务驱动执行)请求

以上内容,就是关于 Seata 应用侧的初始化及其与 TC Server 协调器侧建立连接的全过程分析。

更深层次的细节,建议大家再根据本文梳理的脉络和提到的几个重点,细致地阅读下源码,相信定会有更深层次的理解和全新的收获!

后记:考虑到篇幅以及保持一篇源码分析文章较为合适的信息量,本文前言中所说的配置、注册等模块协作配合并没有在文章中展开和体现。
在下篇源码剖析中,我会以配置中心注册中心为重点,为大家分析,在 RMClient/TM Client 与 TC Server 建立连接之前,Seata 应用侧是如何通过服务发现找到 TC Server、如何从配置模块获取各种信息的。

· 阅读需 8 分钟

本文将介绍基于Spring Cloud + feign 如何集成 Seata(1.4.0)的TCC模式。实际上,Seata的AT模式基本上能满足我们使用分布式事务80%的需求,但涉及不支持事务的数据库与中间件(如redis)等的操作,或AT模式暂未支持的数据库(目前AT支持Mysql、Oracle与PostgreSQL)、跨公司服务的调用、跨语言的应用调用或有手动控制整个二阶段提交过程的需求,则需要结合TCC模式。不仅如此,TCC模式还支持与AT模式混合使用。

本文作者:弓行(谭志坚)

一、TCC模式的概念

一个分布式的全局事务,整体是两阶段提交Try-[Comfirm/Cancel] 的模型。在Seata中,AT模式与TCC模式事实上都是两阶段提交的具体实现。他们的区别在于:

AT 模式基于支持本地 ACID 事务关系型数据库(目前支持Mysql、Oracle与PostgreSQL):

一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。 二阶段 commit 行为:马上成功结束,自动异步批量清理回滚日志。 二阶段 rollback 行为:通过回滚日志,自动生成补偿操作,完成数据回滚。

相应的,TCC 模式,不依赖于底层数据资源的事务支持:

一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。 二阶段 commit 行为:调用 自定义的 commit 逻辑。 二阶段 rollback 行为:调用 自定义的 rollback 逻辑。

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

简单点概括,SEATA的TCC模式就是手工的AT模式,它允许你自定义两阶段的处理逻辑而不依赖AT模式的undo_log。

二、前提准备

三、TM与TCC-RM的搭建

本章着重讲基于Spring Cloud + Feign的TCC的实现,项目的搭建直接看源码(本工程提供了AT模式与TCC模式的DEMO)

DEMO工程源码

3.1 seata服务端的搭建

服务端搭建文档

3.2 TM的搭建

service-tm

3.3 RM-TCC的搭建

3.3.1 定义TCC接口

由于我们使用的是 SpringCloud + Feign,Feign的调用基于http,因此此处我们使用@LocalTCC便可。值得注意的是,@LocalTCC一定需要注解在接口上,此接口可以是寻常的业务接口,只要实现了TCC的两阶段提交对应方法便可,TCC相关注解如下:

  • @LocalTCC 适用于SpringCloud+Feign模式下的TCC
  • @TwoPhaseBusinessAction 注解try方法,其中name为当前tcc方法的bean名称,写方法名便可(全局唯一),commitMethod指向提交方法,rollbackMethod指向事务回滚方法。指定好三个方法之后,seata会根据全局事务的成功或失败,去帮我们自动调用提交方法或者回滚方法。
  • @BusinessActionContextParameter 注解可以将参数传递到二阶段(commitMethod/rollbackMethod)的方法。
  • BusinessActionContext 便是指TCC事务上下文

实例如下:

/**
* 这里定义tcc的接口
* 一定要定义在接口上
* 我们使用springCloud的远程调用
* 那么这里使用LocalTCC便可
*
* @author tanzj
*/
@LocalTCC
public interface TccService {

/**
* 定义两阶段提交
* name = 该tcc的bean名称,全局唯一
* commitMethod = commit 为二阶段确认方法
* rollbackMethod = rollback 为二阶段取消方法
* BusinessActionContextParameter注解 传递参数到二阶段中
*
* @param params -入参
* @return String
*/
@TwoPhaseBusinessAction(name = "insert", commitMethod = "commitTcc", rollbackMethod = "cancel")
String insert(
@BusinessActionContextParameter(paramName = "params") Map<String, String> params
);

/**
* 确认方法、可以另命名,但要保证与commitMethod一致
* context可以传递try方法的参数
*
* @param context 上下文
* @return boolean
*/
boolean commitTcc(BusinessActionContext context);

/**
* 二阶段取消方法
*
* @param context 上下文
* @return boolean
*/
boolean cancel(BusinessActionContext context);
}

3.3.2 TCC接口的业务实现

为了保证代码的简洁,此处将路由层与业务层结合讲解,实际项目则不然。

  • 在try方法中使用@Transational可以直接通过spring事务回滚关系型数据库中的操作,而非关系型数据库等中间件的回滚操作可以交给rollbackMethod方法处理。
  • 使用context.getActionContext("params")便可以得到一阶段try中定义的参数,在二阶段对此参数进行业务回滚操作。
  • **注意1:**此处亦不可以捕获异常(同理切面处理异常),否则TCC将识别该操作为成功,二阶段直接执行commitMethod。
  • 注意2:TCC模式要开发者自行保证幂等和事务防悬挂
@Slf4j
@RestController
public class TccServiceImpl implements TccService {

@Autowired
TccDAO tccDAO;

/**
* tcc服务t(try)方法
* 根据实际业务场景选择实际业务执行逻辑或者资源预留逻辑
*
* @param params - name
* @return String
*/
@Override
@PostMapping("/tcc-insert")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public String insert(@RequestBody Map<String, String> params) {
log.info("xid = " + RootContext.getXID());
//todo 实际的操作,或操作MQ、redis等
tccDAO.insert(params);
//放开以下注解抛出异常
//throw new RuntimeException("服务tcc测试回滚");
return "success";
}

/**
* tcc服务 confirm方法
* 若一阶段采用资源预留,在二阶段确认时要提交预留的资源
*
* @param context 上下文
* @return boolean
*/
@Override
public boolean commitTcc(BusinessActionContext context) {
log.info("xid = " + context.getXid() + "提交成功");
//todo 若一阶段资源预留,这里则要提交资源
return true;
}

/**
* tcc 服务 cancel方法
*
* @param context 上下文
* @return boolean
*/
@Override
public boolean cancel(BusinessActionContext context) {
//todo 这里写中间件、非关系型数据库的回滚操作
System.out.println("please manually rollback this data:" + context.getActionContext("params"));
return true;
}
}

3.3.3 在TM中开启全局事务,调用RM-TCC接口

工程源码见3.2


至此,Spring Cloud整合TCC模式完成

· 阅读需 13 分钟

说到Seata中的配置管理,大家可能会想到Seata中适配的各种配置中心,其实今天要说的不是这个,虽然也会简单分析Seata和各配置中心的适配过程,但主要还是讲解Seata配置管理的核心实现

在讲配置中心之前,先简单介绍一下Server端的启动流程,因为这一块就涉及到配置管理的初始化,核心流程如下:

  1. 程序入口在Server#main方法中
  2. 获取port的几种形式:从容器中获取;从命令行获取;默认端口
  3. 解析命令行参数:host、port、storeMode等参数,这个过程可能涉及到配置管理的初始化
  4. Metric相关:无关紧要,跳过
  5. NettyServer初始化
  6. 核心控制器初始化:Server端的核心,还包括几个定时任务
  7. NettyServer启动

代码如下,删除了非核心代码

public static void main(String[] args) throws IOException {
// 获取port的几种形式:从容器中获取;从命令行获取;默认端口, use to logback.xml
int port = PortHelper.getPort(args);
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

// 解析启动参数,分容器和非容器两种情况
ParameterParser parameterParser = new ParameterParser(args);

// Metric相关
MetricsManager.get().init();

// NettyServer初始化
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

// 核心控制器初始化
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();

// NettyServer启动
nettyRemotingServer.init();
}

为社么说步骤3中肯能涉及到配置管理的初始化呢?核心代码如下:

if (StringUtils.isBlank(storeMode)) {
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
SERVER_DEFAULT_STORE_MODE);
}

如果在启动参数中没有特别指定storeMode,就会通过ConfigurationFactory相关API去获取该配置,在ConfigurationFactory.getInstance()这行代码中,涉及到两部分内容:ConfigurationFactory静态代码初始化和Configuration初始化。接下来我们重点分析这部分内容

配置管理初始化

ConfigurationFactory初始化

我们知道在Seata中有两个关键配置文件:一个是registry.conf,这是核心配置文件,必须要有;另一个是file.conf,只有在配置中心是File的情况下才需要用到。ConfigurationFactory静态代码块中,其实主要就是加载registry.conf文件,大概如下:

1、在没有手动配置的情况,默认读取registry.conf文件,封装成一个FileConfiguration对象,核心代码如下:

Configuration configuration = new FileConfiguration(seataConfigName,false);
FileConfigFactory.load("registry.conf", "registry");
FileConfig fileConfig = EnhancedServiceLoader.load(FileConfig.class, "CONF", argsType, args);

2、配置增强:类似代理模式,获取配置时,在代理对象里面做一些其他处理,下面详细介绍

Configuration extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);

3、将步骤2中的代理对象赋值给CURRENT_FILE_INSTANCE引用,在很多地方都直接用到了CURRENT_FILE_INSTANCE这个静态引用

CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;

可以简单的认为:CURRENT_FILE_INSTANCE对应的就是registry.conf里面的内容。我认为registry.conf这个文件名取的不太好,歧义太大,叫做bootstrap.conf是不是更好一些?

Configuration初始化

Configuration其实就是对应配置中心,Seata目前支持的配置中心很多,几乎主流的配置中心都支持,如:apollo、consul、etcd、nacos、zk、springCloud、本地文件。当使用本地文件作为配置中心的时候,涉及到file.conf的加载,当然这是默认的名字,可以自己配置。到这里,大家也基本上知道了registry.conffile.conf的关系了。

Configuration作为单例放在ConfigurationFactory中,所以Configuration的初始化逻辑也是在ConfigurationFactory中,大概流程如下: 1、先从registry.conf文件中读取config.type属性,默认就是file

configTypeName = CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR+ ConfigurationKeys.FILE_ROOT_TYPE);

2、基于config.type属性值加载配置中心,比如默认是:file,则先从registry.conf文件中读取config.file.name读取本地文件配置中心对应的文件名,然后基于该文件名创建FileConfiguration对象,这样就将file.conf中的配置加载到内存中了。同理,如果配置的是其他配置中心,则会通过SPI初始化其他配置中心。还有一点需要注意的是,在这阶段,如果配置中心是本地文件,则会为其创建代理对象;如果不是本地文件,则通过SPI加载对应的配置中心

if (ConfigType.File == configType) {
String pathDataId = String.join("config.file.name");
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
configuration = new FileConfiguration(name);
try {
// 配置增强 代理
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
} else {
configuration = EnhancedServiceLoader
.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
}

3、基于步骤2创建的Configuration对象,为其再创建一层代理,这个代理对象有两个作用:一个是本地缓存,不需要每次获取配置的时候都从配置中获取;另一个是监听,当配置发生变更会更新它维护的缓存。如下:

if (null != extConfiguration) {
configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
} else {
configurationCache = ConfigurationCache.getInstance().proxy(configuration);
}

到这里,配置管理的初始化就完成了。Seata通过先先加载registry.conf文件获取对应的配置中心信息、注册中心,然后再根据获取到的的对应信息初始化配置中心。在使用本地文件作为配置中心的情况下,默认是加载file.conf文件。然后再为对应的配置中心创建对应的代理对象,使其支持本地缓存和配置监听

整理流程还是比较简单,在这里我要抛出几个问题:

  1. 什么是配置增强?Seata中的配置增强是怎么做的?
  2. 如果使用本地文件作为配置中心,就必须要将配置定义在file.conf文件中。如果是Spring应用,能不能直接将对应的配置项定义在application.yaml中?
  3. 在上面说的步骤2中,为什么在使用本地文件配置中心的情况下,要先为Configuration创建对应配置增强代理对象,而其他配置中心不用?

这3个问题都是紧密联系的,都和Seata的配置增加相关。下面详细介绍

配置管理增强

配置增强,简单来说就是为其创建一个代理对象,在执行目标独对象的目标方法时候,执行代理逻辑,从配置中心的角度来讲,就是在通过配置中心获取对应配置的时候,执行代理逻辑。

  1. 通过ConfigurationFactory.CURRENT_FILE_INSTANCE.getgetConfig(key)获取配置
  2. 加载registry.conf文件创建FileConfiguration对象configuration
  3. 基于SpringBootConfigurationProviderconfiguration创建代理对象configurationProxy
  4. configurationProxy中获取配置中心的连接信息file zk nacos 等
  5. 基于连接信息创建配中心Configuration对象configuration2
  6. 基于SpringBootConfigurationProviderconfiguration2创建代理对象configurationProxy2
  7. 执行configurationProxy2的代理逻辑
  8. 基于key找到对应的SpringBean
  9. 执行SpringBean的getXxx方法

配置增强实现

上面也简单提到过配置增强,相关代码如下:

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
  1. 首先通过SPI机获取一个ExtConfigurationProvider对象,在Seata中默认只有一个实现:SpringBootConfigurationProvider
  2. 通过ExtConfigurationProvider#provider方法为Configuration创建代理对象

核心代码如下:

public Configuration provide(Configuration originalConfiguration) {
return (Configuration) Enhancer.create(originalConfiguration.getClass(), new MethodInterceptor() {
@Override
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy)
throws Throwable {
if (method.getName().startsWith("get") && args.length > 0) {
Object result = null;
String rawDataId = (String) args[0];
if (args.length == 1) {
result = get(convertDataId(rawDataId));
} else if (args.length == 2) {
result = get(convertDataId(rawDataId), args[1]);
} else if (args.length == 3) {
result = get(convertDataId(rawDataId), args[1], (Long) args[2]);
}
if (result != null) {
//If the return type is String,need to convert the object to string
if (method.getReturnType().equals(String.class)) {
return String.valueOf(result);
}
return result;
}
}

return method.invoke(originalConfiguration, args);
}
});
}

private Object get(String dataId) throws IllegalAccessException, InstantiationException {
String propertyPrefix = getPropertyPrefix(dataId);
String propertySuffix = getPropertySuffix(dataId);
ApplicationContext applicationContext = (ApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT);
Class<?> propertyClass = PROPERTY_BEAN_MAP.get(propertyPrefix);
Object valueObject = null;
if (propertyClass != null) {
try {
Object propertyBean = applicationContext.getBean(propertyClass);
valueObject = getFieldValue(propertyBean, propertySuffix, dataId);
} catch (NoSuchBeanDefinitionException ignore) {

}
} else {
throw new ShouldNeverHappenException("PropertyClass for prefix: [" + propertyPrefix + "] should not be null.");
}
if (valueObject == null) {
valueObject = getFieldValue(propertyClass.newInstance(), propertySuffix, dataId);
}

return valueObject;
}

1、如果方法是以get开头,并且参数个数为1/2/3,则执行其他的获取配置的逻辑,否则执行原生Configuration对象的逻辑 2、我们没必要纠结为啥是这样的规则,这就是Seata的一个约定 3、其他获取配置的逻辑,就是指通过Spring的方式获取对应配置值

到这里已经清楚了配置增强的原理,同时,也可以猜测得出唯一的ExtConfigurationProvider实现SpringBootConfigurationProvider,肯定是和Spring相关

配置增强与Spring

在介绍这块内容之前,我们先简单介绍一下Seata的使用方式:

  1. 非Starter方式:引入依赖 seata-all, 然后手动配置几个核心的Bean
  2. Starter方式: 引入依赖seata-spring-boot-starter,全自动准配,不需要自动注入核心Bean

SpringBootConfigurationProvider就在seata-spring-boot-starter模块中,也就是说,当我们通过引入seata-all的方式来使用Seata时,配置增强其实没有什么作用,因为此时根本找不到ExtConfigurationProvider实现类,自然就不会增强。

seata-spring-boot-starter是如何将这些东西串联起来的?

1、首先,在seata-spring-boot-starter模块的resources/META-INF/services目录下,存在一个spring.factories文件,内容分如下

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\

# 暂时不管
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration

2、在SeataAutoConfiguration文件中,会创建以下Bean: GlobalTransactionScanner 、SeataDataSourceBeanPostProcessor、SeataAutoDataSourceProxyCreator、SpringApplicationContextProvider。前3个和我们本文要讲的内容不相关,主要关注SpringApplicationContextProvider,核心代码非常简单,就是将ApplicationContext保存下来:

public class SpringApplicationContextProvider implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT, applicationContext);
}
}

3、然后,在SeataAutoConfiguration文件中,还会将一些xxxProperties.Class和对应的Key前缀缓存到PROPERTY_BEAN_MAP中。``xxxProperties就简单理解成application.yaml`中的各种配置项:

static {
PROPERTY_BEAN_MAP.put(SEATA_PREFIX, SeataProperties.class);
PROPERTY_BEAN_MAP.put(CLIENT_RM_PREFIX, RmProperties.class);
PROPERTY_BEAN_MAP.put(SHUTDOWN_PREFIX, ShutdownProperties.class);
...省略...
}

至此,整个流程其实已经很清晰,在有SpringBootConfigurationProvider配置增强的时候,我们获取一个配置项的流程如下:

  1. 先根据p配置项Key获取对应的xxxProperties对象
  2. 通过ObjectHolder中的ApplicationContext获取对应xxxProperties的SpringBean
  3. 基于xxxProperties的SpringBean获取对应配置的值
  4. 至此,通过配置增强,我们成功的获取到application.yaml中的值

· 阅读需 19 分钟

作者 | 刘晓敏 于雨

一、简介

Java 的世界里,大家广泛使用的一个高性能网络通信框架 netty,很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里,getty 也是一个类似 netty 的高性能网络通信库。getty 最初由 dubbogo 项目负责人于雨开发,作为底层通信库在 dubbo-go 中使用。随着 dubbo-go 捐献给 apache 基金会,在社区小伙伴的共同努力下,getty 也最终进入到 apache 这个大家庭,并改名 dubbo-getty

18 年的时候,我在公司里实践微服务,当时遇到最大的问题就是分布式事务问题。同年,阿里在社区开源他们的分布式事务解决方案,我也很快关注到这个项目,起初还叫 fescar,后来更名 seata。由于我对开源技术很感兴趣,加了很多社区群,当时也很关注 dubbo-go 这个项目,在里面默默潜水。随着对 seata 的了解,逐渐萌生了做一个 go 版本的分布式事务框架的想法。

要做一个 golang 版的分布式事务框架,首要的一个问题就是如何实现 RPC 通信。dubbo-go 就是很好的一个例子摆在眼前,遂开始研究 dubbo-go 的底层 getty。

二、如何基于 getty 实现 RPC 通信

getty 框架的整体模型图如下:

image.png

下面结合相关代码,详述 seata-golang 的 RPC 通信过程。

1. 建立连接

实现 RPC 通信,首先要建立网络连接吧,我们从 client.go 开始看起。

func (c *client) connect() {
var (
err error
ss Session
)

for {
// 建立一个 session 连接
ss = c.dial()
if ss == nil {
// client has been closed
break
}
err = c.newSession(ss)
if err == nil {
// 收发报文
ss.(*session).run()
// 此处省略部分代码

break
}
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
ss.Conn().Close()
}
}

connect() 方法通过 dial() 方法得到了一个 session 连接,进入 dial() 方法:

func (c *client) dial() Session {
switch c.endPointType {
case TCP_CLIENT:
return c.dialTCP()
case UDP_CLIENT:
return c.dialUDP()
case WS_CLIENT:
return c.dialWS()
case WSS_CLIENT:
return c.dialWSS()
}

return nil
}

我们关注的是 TCP 连接,所以继续进入 c.dialTCP() 方法:

func (c *client) dialTCP() Session {
var (
err error
conn net.Conn
)

for {
if c.IsClosed() {
return nil
}
if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
d := &net.Dialer{Timeout: connectTimeout}
// 建立加密连接
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
}
} else {
// 建立 tcp 连接
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
}
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err == nil {
// 返回一个 TCPSession
return newTCPSession(conn, c)
}

log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}

至此,我们知道了 getty 如何建立 TCP 连接,并返回 TCPSession。

2. 收发报文

那它是怎么收发报文的呢,我们回到 connection 方法接着往下看,有这样一行 ss.(*session).run(),在这行代码之后代码都是很简单的操作,我们猜测这行代码运行的逻辑里面一定包含收发报文的逻辑,接着进入 run() 方法:

func (s *session) run() {
// 省略部分代码

go s.handleLoop()
go s.handlePackage()
}

这里起了两个 goroutine,handleLoophandlePackage,看字面意思符合我们的猜想,进入 handleLoop() 方法:

func (s *session) handleLoop() {
// 省略部分代码

for {
// A select blocks until one of its cases is ready to run.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select {
// 省略部分代码

case outPkg, ok = <-s.wQ:
// 省略部分代码

iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
// 通过 s.writer 将 interface{} 类型的 outPkg 编码成二进制的比特
pkgBytes, err = s.writer.Write(s, outPkg)
// 省略部分代码

iovec = append(iovec, pkgBytes)

//省略部分代码
}
// 将这些二进制比特发送出去
err = s.WriteBytesArray(iovec[:]...)
if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}

case <-wheel.After(s.period):
if flag {
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
// 定时执行的逻辑,心跳等
s.listener.OnCron(s)
}
}
}
}

通过上面的代码,我们不难发现,handleLoop() 方法处理的是发送报文的逻辑,RPC 需要发送的消息首先由 s.writer 编码成二进制比特,然后通过建立的 TCP 连接发送出去。这个 s.writer 对应的 Writer 接口是 RPC 框架必须要实现的一个接口。

继续看 handlePackage() 方法:

func (s *session) handlePackage() {
// 省略部分代码

if _, ok := s.Connection.(*gettyTCPConn); ok {
if s.reader == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
log.Error(errStr)
panic(errStr)
}

err = s.handleTCPPackage()
} else if _, ok := s.Connection.(*gettyWSConn); ok {
err = s.handleWSPackage()
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
err = s.handleUDPPackage()
} else {
panic(fmt.Sprintf("unknown type session{%#v}", s))
}
}

进入 handleTCPPackage() 方法:

func (s *session) handleTCPPackage() error {
// 省略部分代码

conn = s.Connection.(*gettyTCPConn)
for {
// 省略部分代码

bufLen = 0
for {
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
// 从 TCP 连接中收到报文
bufLen, err = conn.recv(buf)
// 省略部分代码

break
}
// 省略部分代码

// 将收到的报文二进制比特写入 pkgBuf
pktBuf.Write(buf[:bufLen])
for {
if pktBuf.Len() <= 0 {
break
}
// 通过 s.reader 将收到的报文解码成 RPC 消息
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// 省略部分代码

s.UpdateActive()
// 将收到的消息放入 TaskQueue 供 RPC 消费端消费
s.addTask(pkg)
pktBuf.Next(pkgLen)
// continue to handle case 5
}
if exit {
break
}
}

return perrors.WithStack(err)
}

从上面的代码逻辑我们分析出,RPC 消费端需要将从 TCP 连接收到的二进制比特报文解码成 RPC 能消费的消息,这个工作由 s.reader 实现,所以,我们要构建 RPC 通信层也需要实现 s.reader 对应的 Reader 接口。

3. 底层处理网络报文的逻辑如何与业务逻辑解耦

我们都知道,netty 通过 boss 线程和 worker 线程实现了底层网络逻辑和业务逻辑的解耦。那么,getty 是如何实现的呢?

handlePackage() 方法最后,我们看到,收到的消息被放入了 s.addTask(pkg) 这个方法,接着往下分析:

func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return
}
f()
}

pkg 参数传递到了一个匿名方法,这个方法最终放入了 taskPool。这个方法很关键,在我后来写 seata-golang 代码的时候,就遇到了一个坑,这个坑后面分析。

接着我们看一下 taskPool 的定义:

// NewTaskPoolSimple build a simple task pool
func NewTaskPoolSimple(size int) GenericTaskPool {
if size < 1 {
size = runtime.NumCPU() * 100
}
return &taskPoolSimple{
work: make(chan task),
sem: make(chan struct{}, size),
done: make(chan struct{}),
}
}

构建了一个缓冲大小为 size (默认为  runtime.NumCPU() * 100) 的 channel sem。再看方法 AddTaskAlways(t task)

func (p *taskPoolSimple) AddTaskAlways(t task) {
select {
case <-p.done:
return
default:
}

select {
case p.work <- t:
return
default:
}
select {
case p.work <- t:
case p.sem <- struct{}{}:
p.wg.Add(1)
go p.worker(t)
default:
goSafely(t)
}
}

加入的任务,会先由 len(p.sem) 个 goroutine 去消费,如果没有 goroutine 空闲,则会启动一个临时的 goroutine 去运行 t()。相当于有  len(p.sem) 个 goroutine 组成了 goroutine pool,pool 中的 goroutine 去处理业务逻辑,而不是由处理网络报文的 goroutine 去运行业务逻辑,从而实现了解耦。写 seata-golang 时遇到的一个坑,就是忘记设置 taskPool 造成了处理业务逻辑和处理底层网络报文逻辑的 goroutine 是同一个,我在业务逻辑中阻塞等待一个任务完成时,阻塞了整个 goroutine,使得阻塞期间收不到任何报文。

4. 具体实现

下面的代码见 getty.go

// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
Read(Session, []byte) (interface{}, int, error)
}

// Writer is used to marshal pkg and write to session
type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) ([]byte, error)
}

// ReadWriter interface use for handle application packages
type ReadWriter interface {
Reader
Writer
}
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(Session) error

// invoked when session closed.
OnClose(Session)

// invoked when got error.
OnError(Session, error)

// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(Session)

// invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
// Reader{Read} and getty will close this connection soon.
//
// If ur logic processing in this func will take a long time, u should start a goroutine
// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u
// can do the logic processing in other asynchronous way.
// !!!In short, ur OnMessage callback func should return asap.
//
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}

通过对整个 getty 代码的分析,我们只要实现  ReadWriter 来对 RPC  消息编解码,再实现 EventListener 来处理 RPC 消息的对应的具体逻辑,将 ReadWriter 实现和 EventLister 实现注入到 RPC 的 Client 和 Server 端,则可实现 RPC 通信。

4.1 编解码协议实现

下面是 seata 协议的定义: image-20201205214556457.png

在 ReadWriter 接口的实现 RpcPackageHandler 中,调用 Codec 方法对消息体按照上面的格式编解码:

// 消息编码为二进制比特
func MessageEncoder(codecType byte, in interface{}) []byte {
switch codecType {
case SEATA:
return SeataEncoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil
}
}

// 二进制比特解码为消息体
func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
switch codecType {
case SEATA:
return SeataDecoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil, 0
}
}

4.2 Client 端实现

再来看 client 端 EventListener 的实现 RpcRemotingClient

func (client *RpcRemoteClient) OnOpen(session getty.Session) error {
go func()
request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{
ApplicationId: client.conf.ApplicationId,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
}}
// 建立连接后向 Transaction Coordinator 发起注册 TransactionManager 的请求
_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err == nil {
// 将与 Transaction Coordinator 建立的连接保存在连接池供后续使用
clientSessionManager.RegisterGettySession(session)
client.GettySessionOnOpenChannel <- session.RemoteAddr()
}
}()

return nil
}

// OnError ...
func (client *RpcRemoteClient) OnError(session getty.Session, err error) {
clientSessionManager.ReleaseGettySession(session)
}

// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {
clientSessionManager.ReleaseGettySession(session)
}

// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {
log.Info("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {
log.Debugf("received PONG from %s", session.RemoteAddr())
}
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)

// 处理事务消息,提交 or 回滚
client.onMessage(rpcMessage, session.RemoteAddr())
} else {
resp, loaded := client.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
client.futures.Delete(rpcMessage.Id)
}
}
}

// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
// 发送心跳
client.defaultSendRequest(session, protocal.HeartBeatMessagePing)
}

clientSessionManager.RegisterGettySession(session) 的逻辑 4.4 小节分析。

4.3 Server 端 Transaction Coordinator 实现

代码见 DefaultCoordinator

func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {
log.Infof("got getty_session:%s", session.Stat())
return nil
}

func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {
// 释放 TCP 连接
SessionManager.ReleaseGettySession(session)
session.Close()
log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)
}

func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {
log.Info("getty_session{%s} is closing......", session.Stat())
}

func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {
log.Debugf("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)
if isRegTM {
// 将 TransactionManager 信息和 TCP 连接建立映射关系
coordinator.OnRegTmMessage(rpcMessage, session)
return
}

heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {
coordinator.OnCheckMessage(rpcMessage, session)
return
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
_, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)
if isRegRM {
// 将 ResourceManager 信息和 TCP 连接建立映射关系
coordinator.OnRegRmMessage(rpcMessage, session)
} else {
if SessionManager.IsRegistered(session) {
defer func() {
if err := recover(); err != nil {
log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)
}
}()
// 处理事务消息,全局事务注册、分支事务注册、分支事务提交、全局事务回滚等
coordinator.OnTrxMessage(rpcMessage, session)
} else {
session.Close()
log.Infof("close a unhandled connection! [%v]", session)
}
}
} else {
resp, loaded := coordinator.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
coordinator.futures.Delete(rpcMessage.Id)
}
}
}
}

func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {

}

coordinator.OnRegTmMessage(rpcMessage, session) 注册 Transaction Manager,coordinator.OnRegRmMessage(rpcMessage, session) 注册 Resource Manager。具体逻辑分析见 4.4 小节。 消息进入 coordinator.OnTrxMessage(rpcMessage, session) 方法,将按照消息的类型码路由到具体的逻辑当中:

	switch msg.GetTypeCode() {
case protocal.TypeGlobalBegin:
req := msg.(protocal.GlobalBeginRequest)
resp := coordinator.doGlobalBegin(req, ctx)
return resp
case protocal.TypeGlobalStatus:
req := msg.(protocal.GlobalStatusRequest)
resp := coordinator.doGlobalStatus(req, ctx)
return resp
case protocal.TypeGlobalReport:
req := msg.(protocal.GlobalReportRequest)
resp := coordinator.doGlobalReport(req, ctx)
return resp
case protocal.TypeGlobalCommit:
req := msg.(protocal.GlobalCommitRequest)
resp := coordinator.doGlobalCommit(req, ctx)
return resp
case protocal.TypeGlobalRollback:
req := msg.(protocal.GlobalRollbackRequest)
resp := coordinator.doGlobalRollback(req, ctx)
return resp
case protocal.TypeBranchRegister:
req := msg.(protocal.BranchRegisterRequest)
resp := coordinator.doBranchRegister(req, ctx)
return resp
case protocal.TypeBranchStatusReport:
req := msg.(protocal.BranchReportRequest)
resp := coordinator.doBranchReport(req, ctx)
return resp
default:
return nil
}

4.4 session manager 分析

Client 端同 Transaction Coordinator 建立连接起连接后,通过 clientSessionManager.RegisterGettySession(session) 将连接保存在 serverSessions = sync.Map{} 这个 map 中。map 的 key 为从 session 中获取的 RemoteAddress 即 Transaction Coordinator 的地址,value 为 session。这样,Client 端就可以通过 map 中的一个 session 来向 Transaction Coordinator 注册 Transaction Manager 和 Resource Manager 了。具体代码见 getty_client_session_manager.go Transaction Manager 和 Resource Manager 注册到 Transaction Coordinator 后,一个连接既有可能用来发送 TM 消息也有可能用来发送 RM 消息。我们通过 RpcContext 来标识一个连接信息:

type RpcContext struct {
Version string
TransactionServiceGroup string
ClientRole meta.TransactionRole
ApplicationId string
ClientId string
ResourceSets *model.Set
Session getty.Session
}

当收到事务消息时,我们需要构造这样一个 RpcContext 供后续事务处理逻辑使用。所以,我们会构造下列 map 来缓存映射关系:

var (
// session -> transactionRole
// TM will register before RM, if a session is not the TM registered,
// it will be the RM registered
session_transactionroles = sync.Map{}

// session -> applicationId
identified_sessions = sync.Map{}

// applicationId -> ip -> port -> session
client_sessions = sync.Map{}

// applicationId -> resourceIds
client_resources = sync.Map{}
)

这样,Transaction Manager 和 Resource Manager 分别通过 coordinator.OnRegTmMessage(rpcMessage, session)coordinator.OnRegRmMessage(rpcMessage, session) 注册到 Transaction Coordinator 时,会在上述 client_sessions map 中缓存 applicationId、ip、port 与 session 的关系,在 client_resources map 中缓存 applicationId 与 resourceIds(一个应用可能存在多个 Resource Manager) 的关系。在需要时,我们就可以通过上述映射关系构造一个 RpcContext。这部分的实现和 java 版 seata 有很大的不同,感兴趣的可以深入了解一下。具体代码见 getty_session_manager.go 至此,我们就分析完了 seata-golang 整个 RPC 通信模型的机制。

三、seata-golang 的未来

seata-golang  从今年 4 月份开始开发,到 8 月份基本实现和 java 版 seata 1.2 协议的互通,对 mysql 数据库实现了 AT 模式(自动协调分布式事务的提交回滚),实现了 TCC 模式,TC 端使用 mysql 存储数据,使 TC 变成一个无状态应用支持高可用部署。下图展示了 AT 模式的原理:image20201205-232516.png

后续,还有许多工作可以做,比如:对注册中心的支持、对配置中心的支持、和 java 版 seata 1.4 的协议互通、其他数据库的支持、raft transaction coordinator 的实现等,希望对分布式事务问题感兴趣的开发者可以加入进来一起来打造一个完善的 golang 的分布式事务框架。

如果你有任何疑问,欢迎钉钉扫码加入交流群【钉钉群号 33069364】:

作者简介

刘晓敏 (GitHubID dk-lockdown),目前就职于 h3c 成都分公司,擅长使用 Java/Go 语言,在云原生和微服务相关技术方向均有涉猎,目前专攻分布式事务。 于雨(github @AlexStocks),dubbo-go 项目和社区负责人,一个有十多年服务端基础架构研发一线工作经验的程序员,陆续参与改进过 Muduo/Pika/Dubbo/Sentinel-go 等知名项目,目前在蚂蚁金服可信原生部从事容器编排和 service mesh 工作。

参考资料

seata 官方:https://seata.apache.org

java 版 seata:https://github.com/apache/incubator-seata

seata-golang 项目地址:https://github.com/apache/incubator-seata-go

seata-golang go 夜读 b站分享:https://www.bilibili.com/video/BV1oz411e72T