跳到主要内容

· 阅读需 11 分钟

欢迎大家报名Seata 开源之夏2023课题

开源之夏 2023 学生报名期为 4 月 29 日-6月4日,欢迎报名Seata 2023 课题!在这里,您将有机会深入探讨分布式事务的理论和应用,并与来自不同背景的同学一起合作完成实践项目。我们期待着您的积极参与和贡献,共同推动分布式事务领域的发展。

summer2023-1

开源之夏2023

开源之夏是由中科院软件所“开源软件供应链点亮计划”发起并长期支持的一项暑期开源活动,旨在鼓励在校学生积极参与开源软件的开发维护,培养和发掘更多优秀的开发者,促进优秀开源软件社区的蓬勃发展,助力开源软件供应链建设。

参与学生通过远程线上协作方式,配有资深导师指导,参与到开源社区各组织项目开发中并收获奖金、礼品与证书。这些收获,不仅仅是未来毕业简历上浓墨重彩的一笔,更是迈向顶尖开发者的闪亮起点,可以说非常值得一试。 每个项目难度分为基础和进阶两档,对应学生结项奖金分别为税前人民币 8000 元和税前人民币 12000 元。

Seata社区介绍

Seata 是一款开源的分布式事务解决方案,GitHub获得超过23K+ Starts致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,Seata 在阿里内部一直扮演着分布式数据一致性的中间件角色,几乎每笔交易都要使用Seata,历经双11洪荒流量的洗礼,对业务进行了有力的技术支撑。

Seata社区开源之夏2023项目课题汇总

Seata社区为开源之夏2023组委会推荐6项精选项目课题,您可以访问以下链接进行选报:
https://summer-ospp.ac.cn/org/orgdetail/064c15df-705c-483a-8fc8-02831370db14?lang=zh
请及时与各导师沟通并准备项目申请材料,并登录官方注册申报(以下课题顺序不分先后): seata2023-2

项目一: 实现用于服务发现和注册的NamingServer

难度: 进阶/Advanced

项目社区导师: 陈健斌

导师联系邮箱: 364176773@qq.com

项目简述:
目前seata的服务暴露及发现主要依赖于第三方注册中心,随着项目的演进发展,带来了额外的学习使用成本,而业内主流具有服务端的中间件大多都开始演进自身的服务自闭环和控制及提供于服务端更高契合度和可靠性的组件或功能如kafka 的KRaft,rocketmq的NameServer,clickhouse的ClickHouse Keeper等.故为了解决如上问题和架构演进要求,seata需要构建自身的nameserver来保证更加稳定可靠。

项目链接: https://summer-ospp.ac.cn/org/prodetail/230640380?list=org&navpage=org



项目二: 在seata-go中实现saga事务模式

难度: 进阶/Advanced

项目社区导师: 刘月财

导师联系邮箱: luky116@apache.org

项目简述: Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。Seata-go 中当前没有支持saga模式,希望后面参考Java版本的实现能将该功能支持上。

项目链接: https://summer-ospp.ac.cn/org/prodetail/230640382?list=org&navpage=org



项目三: seata saga模式产品化能力提升

难度: 进阶/Advanced

项目社区导师: 李宗杰

导师联系邮箱: leezongjie@163.com

项目简述: saga作为分布式事务的解决方案之一,在长事务上应用尤其广泛,seata也提供了对应的状态机实现。随着业务的不断发展和接入,对seata提出了更高的要求,我们需要支持流式saga 编排,对当前状态机的实现进行优化和扩展,进一步服务业务。

项目链接: https://summer-ospp.ac.cn/org/prodetail/230640415?list=org&navpage=org



项目四: 增加控制台事务控制能力

难度: 进阶/Advanced

项目社区导师: 王良

导师联系邮箱: 841369634@qq.com

项目简述: 在分布式事务中,经常会存在非常多的异常情况,这些异常情况往往会导致系统无法继续运行。而这些异常往往需要人工介入排查原因并排除故障,才能够使系统继续正常运行。虽然seata 提供了控制台查询事务数据,但还未提供任何事务控制能力,帮助排除故障。所以,本课题主要是在seata服务端控制台上,添加事务控制能力。

项目链接: https://summer-ospp.ac.cn/org/prodetail/230640423?list=org&navpage=org



项目五: 提高单测覆盖率和建立集成测试

难度: 基础/Basic

项目社区导师: 张嘉伟

导师联系邮箱: 349071347@qq.com

项目简述: 为了进一步提高项目的质量以及稳定性, 需要进一步提升项目单元测试覆盖率以及加入集成测试来模拟生产中不同的场景. 集成测试的目的是为了模拟client与server的交互过程, 而非单一的对某个接口进行测试。

项目链接: https://summer-ospp.ac.cn/org/prodetail/230640424?list=org&navpage=org



项目六: 实现Seata运维ctl工具

难度: 进阶/Advanced

项目社区导师: 季敏

导师联系邮箱: jimin.jm@alibaba-inc.com

项目简述: 运维ctl命令在Seata中非常重要,它是Seata的命令行工具,可以帮助我们管理和操作Seata的各种组件。运维ctl命令可以让我们快速地启动、停止和管理Seata服务,定位和解决问题。此外,运维ctl 命令还提供了丰富的指令,可以让我们方便地检查Seata的健康状态、模拟事务和打印导出配置信息等,大大提高了我们的工作效率和运维体验。

以下是对实现定制ctl运维命令行的一些建议:

  • 借鉴其他开源项目的实现方式,比如kubectl,helm等,并根据Seata的特点和需求进行定制。
  • 将常用的运维操作直接封装进命令行,减少用户的手动操作。
  • 考虑使用友好的命令和参数名称,将命令行设计得易于理解和记忆。
  • 提供详细的帮助文档和示例,帮助用户快速上手和了解如何使用各种参数和选项。
  • 考虑命令行的跨平台支持,例如支持Windows、Linux和MacOS等操作系统。 一款好的ctl命令行应该是易用、灵活、可定制、健壮和易维护的。

项目链接:https://summer-ospp.ac.cn/org/prodetail/230640431?list=org&navpage=org



如何参与开源之夏2023并快速选定项目?

欢迎通过上方联系方式,与各导师沟通并准备项目申请材料。

课题参与期间,学生可以在世界任何地方线上工作,Seata相关项目结项需要在9月30日前以 PR 的形式提交到Seata社区仓库中并完成合并,请务必尽早准备。 seata2023-3

需要在课题期间第一时间获取导师及其他信息,可扫码进入钉钉群交流 ——了解Seata社区各领域项目、结识Seata社区开源导师,以助力后续申请。



参考资料:

Seata网站 : https://seata.apache.org/

Seata GitHub : https://github.com/seata

开源之夏官网: https://summer-ospp.ac.cn/org/orgdetail/ab188e59-fab8-468f-bc89-bdc2bd8b5e64?lang=zh

如果同学们对微服务其他领域项目感兴趣,也可以尝试申请,例如:

  • 对于微服务配置注册中心有兴趣的同学,可以尝试填报Nacos 开源之夏
  • 对于微服务框架和RPC框架有兴趣的同学,可以尝试填报Spring Cloud Alibaba 开源之夏Dubbo 开源之夏
  • 对于云原生网关有兴趣的同学,可以尝试填报Higress 开源之夏
  • 对于分布式高可用防护有兴趣的同学,可以尝试填报 [Sentinel 开源之夏](https://summer-ospp.ac. cn/org/orgdetail/5e879522-bd90-4a8b-bf8b-b11aea48626b?lang=zh) ;
  • 对于微服务治理有兴趣的同学,可以尝试填报 [OpenSergo 开源之夏](https://summer-ospp.ac. cn/org/orgdetail/aaff4eec-11b1-4375-997d-5eea8f51762b?lang=zh)。

· 阅读需 7 分钟

Seata 1.6.0 重磅发布,大幅提升性能

Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务。

seata-server 下载链接:

source | binary

此版本更新如下:

feature:

  • [#4863] 支持 oracle 和 postgresql 多主键
  • [#4649] seata-server支持多注册中心
  • [#4779] 支持 Apache Dubbo3
  • [#4479] TCC注解支持添加在接口和实现类上
  • [#4877] client sdk 支持jdk17
  • [#4914] 支持 mysql 的update join联表更新语法
  • [#4542] 支持 oracle timestamp 类型
  • [#5111] 支持Nacos contextPath 配置
  • [#4802] dockerfile 支持 arm64

bugfix:

  • [#4780] 修复超时回滚成功后无法发送TimeoutRollbacked事件
  • [#4954] 修复output表达式错误时,保存执行结果空指针异常
  • [#4817] 修复高版本springboot配置不标准的问题
  • [#4838] 修复使用 Statement.executeBatch() 时无法生成undo log 的问题
  • [#4533] 修复handleRetryRollbacking的event重复导致的指标数据不准确
  • [#4912] 修复mysql InsertOnDuplicateUpdate 列名大小写不一致无法正确匹配
  • [#4543] 修复对 Oracle 数据类型nclob的支持
  • [#4915] 修复获取不到ServerRecoveryProperties属性的问题
  • [#4919] 修复XID的port和address出现null:0的情况
  • [#4928] 修复 rpcContext.getClientRMHolderMap NPE 问题
  • [#4953] 修复InsertOnDuplicateUpdate可绕过修改主键的问题
  • [#4978] 修复 kryo 支持循环依赖
  • [#4985] 修复 undo_log id重复的问题
  • [#4874] 修复OpenJDK 11 启动失败
  • [#5018] 修复启动脚本中 loader path 使用相对路径导致 server 启动失败问题
  • [#5004] 修复mysql update join行数据重复的问题
  • [#5032] 修复mysql InsertOnDuplicateUpdate中条件参数填充位置计算错误导致的镜像查询SQL语句异常问题
  • [#5033] 修复InsertOnDuplicateUpdate的SQL语句中无插入列字段导致的空指针问题
  • [#5038] 修复SagaAsyncThreadPoolProperties冲突问题
  • [#5050] 修复Saga模式下全局状态未正确更改成Committed问题
  • [#5052] 修复update join条件中占位符参数问题
  • [#5031] 修复InsertOnDuplicateUpdate中不应该使用null值索引作为查询条件
  • [#5075] 修复InsertOnDuplicateUpdate无法拦截无主键和唯一索引的SQL
  • [#5093] 修复seata server重启后accessKey丢失问题
  • [#5092] 修复当seata and jpa共同使用时, AutoConfiguration的顺序不正确的问题
  • [#5109] 修复当RM侧没有加@GlobalTransactional报NPE的问题
  • [#5098] Druid 禁用 oracle implicit cache
  • [#4860] 修复metrics tag覆盖问题
  • [#5028] 修复 insert on duplicate SQL中 null 值问题
  • [#5078] 修复SQL语句中无主键和唯一键拦截问题
  • [#5097] 修复当Server重启时 accessKey 丢失问题
  • [#5131] 修复XAConn处于active状态时无法回滚的问题
  • [#5134] 修复hikariDataSource 自动代理在某些情况下失效的问题
  • [#5163] 修复高版本JDK编译失败的问题

optimize:

  • [#4681] 优化竞争锁过程
  • [#4774] 优化 seataio/seata-server 镜像中的 mysql8 依赖
  • [#4750] 优化AT分支释放全局锁不使用xid
  • [#4790] 添加自动发布 OSSRH github action
  • [#4765] mysql8.0.29版本及以上XA模式不持connection至二阶段
  • [#4797] 优化所有github actions脚本
  • [#4800] 添加 NOTICE 文件
  • [#4761] 使用 hget 代替 RedisLocker 中的 hmget
  • [#4414] 移除log4j依赖
  • [#4836] 优化 BaseTransactionalExecutor#buildLockKey(TableRecords rowsIncludingPK) 方法可读性
  • [#4865] 修复 Saga 可视化设计器 GGEditor 安全漏洞
  • [#4590] 自动降级支持开关支持动态配置
  • [#4490] tccfence 记录表优化成按索引删除
  • [#4911] 添加 header 和license 检测
  • [#4917] 升级 package-lock.json 修复漏洞
  • [#4924] 优化 pom 依赖
  • [#4932] 抽取部分配置的默认值
  • [#4925] 优化 javadoc 注释
  • [#4921] 修复控制台模块安全漏洞和升级 skywalking-eyes 版本
  • [#4936] 优化存储配置的读取
  • [#4946] 将获取锁时遇到的sql异常传递给客户端
  • [#4962] 优化构建配置,并修正docker镜像的基础镜像
  • [#4974] 取消redis模式下,查询globalStatus数量的限制
  • [#4981] 优化当tcc fence记录查不到时的错误提示
  • [#4995] 修复mysql InsertOnDuplicateUpdate后置镜像查询SQL中重复的主键查询条件
  • [#5047] 移除无用代码
  • [#5051] 回滚时undolog产生脏写需要抛出不再重试(BranchRollbackFailed_Unretriable)的异常
  • [#5075] 拦截没有主键及唯一索引值的insert on duplicate update语句
  • [#5104] ConnectionProxy脱离对druid的依赖
  • [#5124] 支持oracle删除TCC fence记录表
  • [#4468] 支持kryo 5.3.0
  • [#4807] 优化镜像和OSS仓库发布流水线
  • [#4445] 优化事务超时判断
  • [#4958] 优化超时事务 triggerAfterCommit() 的执行
  • [#4582] 优化redis存储模式的事务排序
  • [#4963] 增加 ARM64 流水线 CI 测试
  • [#4434] 移除 seata-server CMS GC 参数

test:

  • [#4411] 测试Oracle数据库AT模式下类型支持
  • [#4794] 重构代码,尝试修复单元测试 DataSourceProxyTest.getResourceIdTest()
  • [#5101] 修复zk注册和配置中心报ClassNotFoundException的问题 DataSourceProxyTest.getResourceIdTest()

非常感谢以下 contributors 的代码贡献。若有无意遗漏,请报告。

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。

· 阅读需 3 分钟

Seata 1.5.2 重磅发布,支持xid负载均衡

Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务。

seata-server 下载链接:

source | binary

此版本更新如下:

feature:

  • [#4661] 支持根据xid负载均衡算法
  • [#4676] 支持Nacos作为注册中心时,server通过挂载SLB暴露服务
  • [#4642] 支持client批量请求并行处理
  • [#4567] 支持where条件中find_in_set函数

bugfix:

  • [#4515] 修复develop分支SeataTCCFenceAutoConfiguration在客户端未使用DB时,启动抛出ClassNotFoundException的问题。
  • [#4661] 修复控制台中使用PostgreSQL出现的SQL异常
  • [#4667] 修复develop分支RedisTransactionStoreManager迭代时更新map的异常
  • [#4678] 修复属性transport.enableRmClientBatchSendRequest没有配置的情况下缓存穿透的问题
  • [#4701] 修复命令行参数丢失问题
  • [#4607] 修复跳过全局锁校验的缺陷
  • [#4696] 修复 oracle 存储模式时的插入问题
  • [#4726] 修复批量发送消息时可能的NPE问题
  • [#4729] 修复AspectTransactional.rollbackForClassName设置错误
  • [#4653] 修复 INSERT_ON_DUPLICATE 主键为非数值异常

optimize:

  • [#4650] 修复安全漏洞
  • [#4670] 优化branchResultMessageExecutor线程池的线程数
  • [#4662] 优化回滚事务监控指标
  • [#4693] 优化控制台导航栏
  • [#4700] 修复 maven-compiler-plugin 和 maven-resources-plugin 执行失败
  • [#4711] 分离部署时 lib 依赖
  • [#4720] 优化pom描述
  • [#4728] 将logback版本依赖升级至1.2.9
  • [#4745] 发行包中支持 mysql8 driver
  • [#4626] 使用 easyj-maven-plugin 插件代替 flatten-maven-plugin插件,以修复shade 插件与 flatten 插件不兼容的问题
  • [#4629] 更新globalSession状态时检查更改前后的约束关系
  • [#4662] 优化 EnhancedServiceLoader 可读性

test:

  • [#4544] 优化TransactionContextFilterTest中jackson包依赖问题
  • [#4731] 修复 AsyncWorkerTest 和 LockManagerTest 的单测问题。

非常感谢以下 contributors 的代码贡献。若有无意遗漏,请报告。

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。

· 阅读需 14 分钟

今天来聊一聊阿里巴巴 Seata 新版本(1.5.1)是怎么解决 TCC 模式下的幂等、悬挂和空回滚问题的。

1 TCC 回顾

TCC 模式是最经典的分布式事务解决方案,它将分布式事务分为两个阶段来执行,try 阶段对每个分支事务进行预留资源,如果所有分支事务都预留资源成功,则进入 commit 阶段提交全局事务,如果有一个节点预留资源失败则进入 cancel 阶段回滚全局事务。

以传统的订单、库存、账户服务为例,在 try 阶段尝试预留资源,插入订单、扣减库存、扣减金额,这三个服务都是要提交本地事务的,这里可以把资源转入中间表。在 commit 阶段,再把 try 阶段预留的资源转入最终表。而在 cancel 阶段,把 try 阶段预留的资源进行释放,比如把账户金额返回给客户的账户。

注意:try 阶段必须是要提交本地事务的,比如扣减订单金额,必须把钱从客户账户扣掉,如果不扣掉,在 commit 阶段客户账户钱不够了,就会出问题。

1.1 try-commit

try 阶段首先进行预留资源,然后在 commit 阶段扣除资源。如下图:

fence-try-commit

1.2 try-cancel

try 阶段首先进行预留资源,预留资源时扣减库存失败导致全局事务回滚,在 cancel 阶段释放资源。如下图:

fence-try-cancel

2 TCC 优势

TCC 模式最大的优势是效率高。TCC 模式在 try 阶段的锁定资源并不是真正意义上的锁定,而是真实提交了本地事务,将资源预留到中间态,并不需要阻塞等待,因此效率比其他模式要高。

同时 TCC 模式还可以进行如下优化:

2.1 异步提交

try 阶段成功后,不立即进入 confirm/cancel 阶段,而是认为全局事务已经结束了,启动定时任务来异步执行 confirm/cancel,扣减或释放资源,这样会有很大的性能提升。

2.2 同库模式

TCC 模式中有三个角色:

  • TM:管理全局事务,包括开启全局事务,提交/回滚全局事务;
  • RM:管理分支事务;
  • TC: 管理全局事务和分支事务的状态。

下图来自 Seata 官网:

fence-fiffrent-db

TM 开启全局事务时,RM 需要向 TC 发送注册消息,TC 保存分支事务的状态。TM 请求提交或回滚时,TC 需要向 RM 发送提交或回滚消息。这样包含两个个分支事务的分布式事务中,TC 和 RM 之间有四次 RPC。

优化后的流程如下图:

fence-same-db

TC 保存全局事务的状态。TM 开启全局事务时,RM 不再需要向 TC 发送注册消息,而是把分支事务状态保存在了本地。TM 向 TC 发送提交或回滚消息后,RM 异步线程首先查出本地保存的未提交分支事务,然后向 TC 发送消息获取(本地分支事务)所在的全局事务状态,以决定是提交还是回滚本地事务。

这样优化后,RPC 次数减少了 50%,性能大幅提升。

3 RM 代码示例

以库存服务为例,RM 库存服务接口代码如下:

@LocalTCC
public interface StorageService {

/**
* 扣减库存
* @param xid 全局xid
* @param productId 产品id
* @param count 数量
* @return
*/
@TwoPhaseBusinessAction(name = "storageApi", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
boolean decrease(String xid, Long productId, Integer count);

/**
* 提交事务
* @param actionContext
* @return
*/
boolean commit(BusinessActionContext actionContext);

/**
* 回滚事务
* @param actionContext
* @return
*/
boolean rollback(BusinessActionContext actionContext);
}

通过 @LocalTCC 这个注解,RM 初始化的时候会向 TC 注册一个分支事务。在 try 阶段的方法(decrease方法)上有一个 @TwoPhaseBusinessAction 注解,这里定义了分支事务的 resourceId,commit 方法和 cancel 方法,useTCCFence 这个属性下一节再讲。

4 TCC 存在问题

TCC 模式中存在的三大问题是幂等、悬挂和空回滚。在 Seata1.5.1 版本中,增加了一张事务控制表,表名是 tcc_fence_log 来解决这个问题。而在上一节 @TwoPhaseBusinessAction 注解中提到的属性 useTCCFence 就是来指定是否开启这个机制,这个属性值默认是 false。

tcc_fence_log 建表语句如下(MySQL 语法):

CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
`xid` VARCHAR(128) NOT NULL COMMENT 'global id',
`branch_id` BIGINT NOT NULL COMMENT 'branch id',
`action_name` VARCHAR(64) NOT NULL COMMENT 'action name',
`status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time',
PRIMARY KEY (`xid`, `branch_id`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

4.1 幂等

在 commit/cancel 阶段,因为 TC 没有收到分支事务的响应,需要进行重试,这就要分支事务支持幂等。

我们看一下新版本是怎么解决的。下面的代码在 TCCResourceManager 类:

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
//省略判断
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
//省略判断
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
Object ret;
boolean result;
//注解 useTCCFence 属性是否设置为 true
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
//省略逻辑
}
LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
//省略
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}

上面的代码可以看到,执行分支事务提交方法时,首先判断 useTCCFence 属性是否为 true,如果为 true,则走 TCCFenceHandler 类中的 commitFence 逻辑,否则走普通提交逻辑。

TCCFenceHandler 类中的 commitFence 方法调用了 TCCFenceHandler 类的 commitFence 方法,代码如下:

public static boolean commitFence(Method commitMethod, Object targetTCCBean,
String xid, Long branchId, Object[] args) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
if (tccFenceDO == null) {
throw new TCCFenceException(String.format("TCC fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId),
FrameworkErrorCode.RecordAlreadyExists);
}
if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
return true;
}
if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
}
return false;
}
return updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, status, args);
} catch (Throwable t) {
status.setRollbackOnly();
throw new SkipCallbackWrapperException(t);
}
});
}

从代码中可以看到,提交事务时首先会判断 tcc_fence_log 表中是否已经有记录,如果有记录,则判断事务执行状态并返回。这样如果判断到事务的状态已经是 STATUS_COMMITTED,就不会再次提交,保证了幂等。如果 tcc_fence_log 表中没有记录,则插入一条记录,供后面重试时判断。

Rollback 的逻辑跟 commit 类似,逻辑在类 TCCFenceHandler 的 rollbackFence 方法。

4.2 空回滚

如下图,账户服务是两个节点的集群,在 try 阶段账户服务 1 这个节点发生了故障,try 阶段在不考虑重试的情况下,全局事务必须要走向结束状态,这样就需要在账户服务上执行一次 cancel 操作。

fence-empty-rollback

Seata 的解决方案是在 try 阶段 往 tcc_fence_log 表插入一条记录,status 字段值是 STATUS_TRIED,在 Rollback 阶段判断记录是否存在,如果不存在,则不执行回滚操作。代码如下:

//TCCFenceHandler 类
public static Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED);
LOGGER.info("TCC fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId);
if (result) {
return targetCallback.execute();
} else {
throw new TCCFenceException(String.format("Insert tcc fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId),
FrameworkErrorCode.InsertRecordError);
}
} catch (TCCFenceException e) {
//省略
} catch (Throwable t) {
//省略
}
});
}

在 Rollback 阶段的处理逻辑如下:

//TCCFenceHandler 类
public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
String xid, Long branchId, Object[] args, String actionName) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
// non_rollback
if (tccFenceDO == null) {
//不执行回滚逻辑
return true;
} else {
if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
return true;
}
if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
}
return false;
}
}
return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args);
} catch (Throwable t) {
status.setRollbackOnly();
throw new SkipCallbackWrapperException(t);
}
});
}

updateStatusAndInvokeTargetMethod 方法执行的 sql 如下:

update tcc_fence_log set status = ?, gmt_modified = ?
where xid = ? and branch_id = ? and status = ? ;

可见就是把 tcc_fence_log 表记录的 status 字段值从 STATUS_TRIED 改为 STATUS_ROLLBACKED,如果更新成功,就执行回滚逻辑。

4.3 悬挂

悬挂是指因为网络问题,RM 开始没有收到 try 指令,但是执行了 Rollback 后 RM 又收到了 try 指令并且预留资源成功,这时全局事务已经结束,最终导致预留的资源不能释放。如下图:

fence-suspend

Seata 解决这个问题的方法是执行 Rollback 方法时先判断 tcc_fence_log 是否存在当前 xid 的记录,如果没有则向 tcc_fence_log 表插入一条记录,状态是 STATUS_SUSPENDED,并且不再执行回滚操作。代码如下:

public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
String xid, Long branchId, Object[] args, String actionName) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
// non_rollback
if (tccFenceDO == null) {
//插入防悬挂记录
boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_SUSPENDED);
//省略逻辑
return true;
} else {
//省略逻辑
}
return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args);
} catch (Throwable t) {
//省略逻辑
}
});
}

而后面执行 try 阶段方法时首先会向 tcc_fence_log 表插入一条当前 xid 的记录,这样就造成了主键冲突。代码如下:

//TCCFenceHandler 类
public static Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED);
//省略逻辑
} catch (TCCFenceException e) {
if (e.getErrcode() == FrameworkErrorCode.DuplicateKeyException) {
LOGGER.error("Branch transaction has already rollbacked before,prepare fence failed. xid= {},branchId = {}", xid, branchId);
addToLogCleanQueue(xid, branchId);
}
status.setRollbackOnly();
throw new SkipCallbackWrapperException(e);
} catch (Throwable t) {
//省略
}
});
}

注意:queryTCCFenceDO 方法 sql 中使用了 for update,这样就不用担心 Rollback 方法中获取不到 tcc_fence_log 表记录而无法判断 try 阶段本地事务的执行结果了。

5 总结

TCC 模式是分布式事务中非常重要的事务模式,但是幂等、悬挂和空回滚一直是 TCC 模式需要考虑的问题,Seata 框架在 1.5.1 版本完美解决了这些问题。

对 tcc_fence_log 表的操作也需要考虑事务的控制,Seata 使用了代理数据源,使 tcc_fence_log 表操作和 RM 业务操作在同一个本地事务中执行,这样就能保证本地操作和对 tcc_fence_log 的操作同时成功或失败。

· 阅读需 16 分钟

Seata 目前支持 AT 模式、XA 模式、TCC 模式和 SAGA 模式,之前文章更多谈及的是非侵入式的 AT 模式,今天带大家认识一下同样是二阶段提交的 TCC 模式。

什么是 TCC

TCC 是分布式事务中的二阶段提交协议,它的全称为 Try-Confirm-Cancel,即资源预留(Try)、确认操作(Confirm)、取消操作(Cancel),他们的具体含义如下:

  1. Try:对业务资源的检查并预留;
  2. Confirm:对业务处理进行提交,即 commit 操作,只要 Try 成功,那么该步骤一定成功;
  3. Cancel:对业务处理进行取消,即回滚操作,该步骤回对 Try 预留的资源进行释放。

TCC 是一种侵入式的分布式事务解决方案,以上三个操作都需要业务系统自行实现,对业务系统有着非常大的入侵性,设计相对复杂,但优点是 TCC 完全不依赖数据库,能够实现跨数据库、跨应用资源管理,对这些不同数据访问通过侵入式的编码方式实现一个原子操作,更好地解决了在各种复杂业务场景下的分布式事务问题。

img

Seata TCC 模式

Seata TCC 模式跟通用型 TCC 模式原理一致,我们先来使用 Seata TCC 模式实现一个分布式事务:

假设现有一个业务需要同时使用服务 A 和服务 B 完成一个事务操作,我们在服务 A 定义该服务的一个 TCC 接口:

public interface TccActionOne {
@TwoPhaseBusinessAction(name = "DubboTccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") String a);

public boolean commit(BusinessActionContext actionContext);

public boolean rollback(BusinessActionContext actionContext);
}

同样,在服务 B 定义该服务的一个 TCC 接口:

public interface TccActionTwo {
@TwoPhaseBusinessAction(name = "DubboTccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
public void prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "b") String b);

public void commit(BusinessActionContext actionContext);

public void rollback(BusinessActionContext actionContext);
}

在业务所在系统中开启全局事务并执行服务 A 和服务 B 的 TCC 预留资源方法:

@GlobalTransactional
public String doTransactionCommit(){
//服务A事务参与者
tccActionOne.prepare(null,"one");
//服务B事务参与者
tccActionTwo.prepare(null,"two");
}

以上就是使用 Seata TCC 模式实现一个全局事务的例子,可以看出,TCC 模式同样使用 @GlobalTransactional 注解开启全局事务,而服务 A 和服务 B 的 TCC 接口为事务参与者,Seata 会把一个 TCC 接口当成一个 Resource,也叫 TCC Resource。

TCC 接口可以是 RPC,也可以是 JVM 内部调用,意味着一个 TCC 接口,会有发起方和调用方两个身份,以上例子,TCC 接口在服务 A 和服务 B 中是发起方,在业务所在系统中是调用方。如果该 TCC 接口为 Dubbo RPC,那么调用方就是一个 dubbo:reference,发起方则是一个 dubbo:service

img

Seata 启动时会对 TCC 接口进行扫描并解析,如果 TCC 接口是一个发布方,则在 Seata 启动时会向 TC 注册 TCC Resource,每个 TCC Resource 都有一个资源 ID;如果 TCC 接口时一个调用方,Seata 代理调用方,与 AT 模式一样,代理会拦截 TCC 接口的调用,即每次调用 Try 方法,会向 TC 注册一个分支事务,接着才执行原来的 RPC 调用。

当全局事务决议提交/回滚时,TC 会通过分支注册的的资源 ID 回调到对应参与者服务中执行 TCC Resource 的 Confirm/Cancel 方法。

Seata 如何实现 TCC 模式

从上面的 Seata TCC 模型可以看出,TCC 模式在 Seata 中也是遵循 TC、TM、RM 三种角色模型的,如何在这三种角色模型中实现 TCC 模式呢?我将其主要实现归纳为资源解析、资源管理、事务处理。

资源解析

资源解析即是把 TCC 接口进行解析并注册,前面说过,TCC 接口可以是 RPC,也可以是 JVM 内部调用,在 Seata TCC 模块有中一个 remoting 模块,该模块专门用于解析具有 TwoPhaseBusinessAction 注解的 TCC 接口资源:

img

RemotingParser 接口主要有 isRemotingisReferenceisServicegetServiceDesc 等方法,默认的实现为 DefaultRemotingParser,其余各自的 RPC 协议解析类都在 DefaultRemotingParser 中执行,Seata 目前已经实现了对 Dubbo、HSF、SofaRpc、LocalTCC 的 RPC 协议的解析,同时具备 SPI 可扩展性,未来欢迎大家为 Seata 提供更多的 RPC 协议解析类。

在 Seata 启动过程中,有个 GlobalTransactionScanner 注解进行扫描,会执行以下方法:

io.seata.spring.util.TCCBeanParserUtils#isTccAutoProxy

该方法目的是判断 bean 是否已被 TCC 代理,在过程中会先判断 bean 是否是一个 Remoting bean,如果是则调用 getServiceDesc 方法对 remoting bean 进行解析,同时判断如果是一个发起方,则对其进行资源注册:

io.seata.rm.tcc.remoting.parser.DefaultRemotingParser#parserRemotingServiceInfo

public RemotingDesc parserRemotingServiceInfo(Object bean,String beanName,RemotingParser remotingParser){
RemotingDesc remotingBeanDesc=remotingParser.getServiceDesc(bean,beanName);
if(remotingBeanDesc==null){
return null;
}
remotingServiceMap.put(beanName,remotingBeanDesc);

Class<?> interfaceClass=remotingBeanDesc.getInterfaceClass();
Method[]methods=interfaceClass.getMethods();
if(remotingParser.isService(bean,beanName)){
try{
//service bean, registry resource
Object targetBean=remotingBeanDesc.getTargetBean();
for(Method m:methods){
TwoPhaseBusinessAction twoPhaseBusinessAction=m.getAnnotation(TwoPhaseBusinessAction.class);
if(twoPhaseBusinessAction!=null){
TCCResource tccResource=new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
tccResource.setTargetBean(targetBean);
tccResource.setPrepareMethod(m);
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),
twoPhaseBusinessAction.commitArgsClasses()));
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),
twoPhaseBusinessAction.rollbackArgsClasses()));
// set argsClasses
tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());
tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());
// set phase two method's keys
tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),
twoPhaseBusinessAction.commitArgsClasses()));
tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),
twoPhaseBusinessAction.rollbackArgsClasses()));
//registry tcc resource
DefaultResourceManager.get().registerResource(tccResource);
}
}
}catch(Throwable t){
throw new FrameworkException(t,"parser remoting service error");
}
}
if(remotingParser.isReference(bean,beanName)){
//reference bean, TCC proxy
remotingBeanDesc.setReference(true);
}
return remotingBeanDesc;
}

以上方法,先调用解析类 getServiceDesc 方法对 remoting bean 进行解析,并将解析后的 remotingBeanDesc 放入 本地缓存 remotingServiceMap 中,同时调用解析类 isService 方法判断是否为发起方,如果是发起方,则解析 TwoPhaseBusinessAction 注解内容生成一个 TCCResource,并对其进行资源注册。

资源管理

1、资源注册

Seata TCC 模式的资源叫 TCCResource,其资源管理器叫 TCCResourceManager,前面讲过,当解析完 TCC 接口 RPC 资源后,如果是发起方,则会对其进行资源注册:

io.seata.rm.tcc.TCCResourceManager#registerResource

public void registerResource(Resource resource){
TCCResource tccResource=(TCCResource)resource;
tccResourceCache.put(tccResource.getResourceId(),tccResource);
super.registerResource(tccResource);
}

TCCResource 包含了 TCC 接口的相关信息,同时会在本地进行缓存。继续调用父类 registerResource 方法(封装了通信方法)向 TC 注册,TCC 资源的 resourceId 是 actionName,actionName 就是 @TwoParseBusinessAction 注解中的 name。

2、资源提交/回滚

io.seata.rm.tcc.TCCResourceManager#branchCommit

public BranchStatus branchCommit(BranchType branchType,String xid,long branchId,String resourceId,
String applicationData)throws TransactionException{
TCCResource tccResource=(TCCResource)tccResourceCache.get(resourceId);
if(tccResource==null){
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s",resourceId));
}
Object targetTCCBean=tccResource.getTargetBean();
Method commitMethod=tccResource.getCommitMethod();
if(targetTCCBean==null||commitMethod==null){
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s",resourceId));
}
try{
//BusinessActionContext
BusinessActionContext businessActionContext=getBusinessActionContext(xid,branchId,resourceId,
applicationData);
// ... ...
ret=commitMethod.invoke(targetTCCBean,args);
// ... ...
return result?BranchStatus.PhaseTwo_Committed:BranchStatus.PhaseTwo_CommitFailed_Retryable;
}catch(Throwable t){
String msg=String.format("commit TCC resource error, resourceId: %s, xid: %s.",resourceId,xid);
LOGGER.error(msg,t);
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}

当 TM 决议二阶段提交,TC 会通过分支注册的的资源 ID 回调到对应参与者(即 TCC 接口发起方)服务中执行 TCC Resource 的 Confirm/Cancel 方法。

资源管理器中会根据 resourceId 在本地缓存找到对应的 TCCResource,同时根据 xid、branchId、resourceId、applicationData 找到对应的 BusinessActionContext 上下文,执行的参数就在上下文中。最后,执行 TCCResource 中获取 commit 的方法进行二阶段提交。

二阶段回滚同理类似。

事务处理

前面讲过,如果 TCC 接口时一个调用方,则会使用 Seata TCC 代理对调用方进行拦截处理,并在处理调用真正的 RPC 方法前对分支进行注册。

执行方法io.seata.spring.util.TCCBeanParserUtils#isTccAutoProxy除了对 TCC 接口资源进行解析,还会判断 TCC 接口是否为调用方,如果是调用方则返回 true:

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

img

如图,当 GlobalTransactionalScanner 扫描到 TCC 接口调用方(Reference)时,会使 TccActionInterceptor 对其进行代理拦截处理,TccActionInterceptor 实现 MethodInterceptor

TccActionInterceptor 中还会调用 ActionInterceptorHandler 类型执行拦截处理逻辑,事务相关处理就在 ActionInterceptorHandler#proceed 方法中:

public Object proceed(Method method,Object[]arguments,String xid,TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback)throws Throwable{
//Get action context from arguments, or create a new one and then reset to arguments
BusinessActionContext actionContext=getOrCreateActionContextAndResetToArguments(method.getParameterTypes(),arguments);
//Creating Branch Record
String branchId=doTccActionLogStore(method,arguments,businessAction,actionContext);
// ... ...
try{
// ... ...
return targetCallback.execute();
}finally{
try{
//to report business action context finally if the actionContext.getUpdated() is true
BusinessActionContextUtil.reportContext(actionContext);
}finally{
// ... ...
}
}
}

以上,在执行 TCC 接口一阶段之前,会调用 doTccActionLogStore 方法分支注册,同时还会将 TCC 相关信息比如参数放置在上下文,上面讲的资源提交/回滚就会用到这个上下文。

如何控制异常

在 TCC 模型执行的过程中,还可能会出现各种异常,其中最为常见的有空回滚、幂等、悬挂等。下面我讲下 Seata 是如何处理这三种异常的。

如何处理空回滚

什么是空回滚?

空回滚指的是在一个分布式事务中,在没有调用参与方的 Try 方法的情况下,TM 驱动二阶段回滚调用了参与方的 Cancel 方法。

那么空回滚是如何产生的呢?

img

如上图所示,全局事务开启后,参与者 A 分支注册完成之后会执行参与者一阶段 RPC 方法,如果此时参与者 A 所在的机器发生宕机,网络异常,都会造成 RPC 调用失败,即参与者 A 一阶段方法未成功执行,但是此时全局事务已经开启,Seata 必须要推进到终态,在全局事务回滚时会调用参与者 A 的 Cancel 方法,从而造成空回滚。

要想防止空回滚,那么必须在 Cancel 方法中识别这是一个空回滚,Seata 是如何做的呢?

Seata 的做法是新增一个 TCC 事务控制表,包含事务的 XID 和 BranchID 信息,在 Try 方法执行时插入一条记录,表示一阶段执行了,执行 Cancel 方法时读取这条记录,如果记录不存在,说明 Try 方法没有执行。

如何处理幂等

幂等问题指的是 TC 重复进行二阶段提交,因此 Confirm/Cancel 接口需要支持幂等处理,即不会产生资源重复提交或者重复释放。

那么幂等问题是如何产生的呢?

img

如上图所示,参与者 A 执行完二阶段之后,由于网络抖动或者宕机问题,会造成 TC 收不到参与者 A 执行二阶段的返回结果,TC 会重复发起调用,直到二阶段执行结果成功。

Seata 是如何处理幂等问题的呢?

同样的也是在 TCC 事务控制表中增加一个记录状态的字段 status,该字段有 3 个值,分别为:

  1. tried:1
  2. committed:2
  3. rollbacked:3

二阶段 Confirm/Cancel 方法执行后,将状态改为 committed 或 rollbacked 状态。当重复调用二阶段 Confirm/Cancel 方法时,判断事务状态即可解决幂等问题。

如何处理悬挂

悬挂指的是二阶段 Cancel 方法比 一阶段 Try 方法优先执行,由于允许空回滚的原因,在执行完二阶段 Cancel 方法之后直接空回滚返回成功,此时全局事务已结束,但是由于 Try 方法随后执行,这就会造成一阶段 Try 方法预留的资源永远无法提交和释放了。

那么悬挂是如何产生的呢?

img

如上图所示,在执行参与者 A 的一阶段 Try 方法时,出现网路拥堵,由于 Seata 全局事务有超时限制,执行 Try 方法超时后,TM 决议全局回滚,回滚完成后如果此时 RPC 请求才到达参与者 A,执行 Try 方法进行资源预留,从而造成悬挂。

Seata 是怎么处理悬挂的呢?

在 TCC 事务控制表记录状态的字段 status 中增加一个状态:

  1. suspended:4

当执行二阶段 Cancel 方法时,如果发现 TCC 事务控制表没有相关记录,说明二阶段 Cancel 方法优先一阶段 Try 方法执行,因此插入一条 status=4 状态的记录,当一阶段 Try 方法后面执行时,判断 status=4 ,则说明有二阶段 Cancel 已执行,并返回 false 以阻止一阶段 Try 方法执行成功。

作者简介

张乘辉,目前就职于蚂蚁集团,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,GitHub ID:objcoding。

· 阅读需 10 分钟

Seata AT 模式是一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。

为什么要检查全局锁呢,这是由于 Seata AT 模式的事务隔离是建立在支事务的本地隔离级别基础之上的,在数据库本地隔离级别读已提交或以上的前提下,Seata 设计了由事务协调器维护的全局写排他锁,来保证事务间的写隔离,同时,将全局事务默认定义在读未提交的隔离级别上。

Seata 事务隔离级别解读

在讲 Seata 事务隔离级之前,我们先来回顾一下数据库事务的隔离级别,目前数据库事务的隔离级别一共有 4 种,由低到高分别为:

  1. Read uncommitted:读未提交
  2. Read committed:读已提交
  3. Repeatable read:可重复读
  4. Serializable:序列化

数据库一般默认的隔离级别为读已提交,比如 Oracle,也有一些数据的默认隔离级别为可重复读,比如 Mysql,一般而言,数据库的读已提交能够满足业务绝大部分场景了。

我们知道 Seata 的事务是一个全局事务,它包含了若干个分支本地事务,在全局事务执行过程中(全局事务还没执行完),某个本地事务提交了,如果 Seata 没有采取任务措施,则会导致已提交的本地事务被读取,造成脏读,如果数据在全局事务提交前已提交的本地事务被修改,则会造成脏写。

由此可以看出,传统意义的脏读是读到了未提交的数据,Seata 脏读是读到了全局事务下未提交的数据,全局事务可能包含多个本地事务,某个本地事务提交了不代表全局事务提交了。

在绝大部分应用在读已提交的隔离级别下工作是没有问题的,而实际上,这当中又有绝大多数的应用场景,实际上工作在读未提交的隔离级别下同样没有问题。

在极端场景下,应用如果需要达到全局的读已提交,Seata 也提供了全局锁机制实现全局事务读已提交。但是默认情况下,Seata 的全局事务是工作在读未提交隔离级别的,保证绝大多数场景的高效性。

全局锁实现

AT 模式下,会使用 Seata 内部数据源代理 DataSourceProxy,全局锁的实现就是隐藏在这个代理中。我们分别在执行、提交的过程都做了什么。

1、执行过程

执行过程在 StatementProxy 类,在执行过程中,如果执行 SQL 是 select for update,则会使用 SelectForUpdateExecutor 类,如果执行方法中带有 @GlobalTransactional or @GlobalLock注解,则会检查是否有全局锁,如果当前存在全局锁,则会回滚本地事务,通过 while 循环不断地重新竞争获取本地锁和全局锁。

io.seata.rm.datasource.exec.SelectForUpdateExecutor#doExecute

public T doExecute(Object... args) throws Throwable {
Connection conn = statementProxy.getConnection();
// ... ...
try {
// ... ...
while (true) {
try {
// ... ...
if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
// Do the same thing under either @GlobalTransactional or @GlobalLock,
// that only check the global lock here.
statementProxy.getConnectionProxy().checkLock(lockKeys);
} else {
throw new RuntimeException("Unknown situation!");
}
break;
} catch (LockConflictException lce) {
if (sp != null) {
conn.rollback(sp);
} else {
conn.rollback();
}
// trigger retry
lockRetryController.sleep(lce);
}
}
} finally {
// ...
}

2、提交过程

提交过程在 ConnectionProxy#doCommit方法中。

1)如果执行方法中带有@GlobalTransactional注解,则会在注册分支时候获取全局锁:

  • 请求 TC 注册分支

io.seata.rm.datasource.ConnectionProxy#register

private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
context.setBranchId(branchId);
}
  • TC 注册分支的时候,获取全局锁

io.seata.server.transaction.at.ATCore#branchSessionLock

protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
if (!branchSession.lock()) {
throw new BranchTransactionException(LockKeyConflict, String
.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()));
}
}

2)如果执行方法中带有@GlobalLock注解,在提交前会查询全局锁是否存在,如果存在则抛异常:

io.seata.rm.datasource.ConnectionProxy#processLocalCommitWithGlobalLocks

private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}

GlobalLock 注解说明

从执行过程和提交过程可以看出,既然开启全局事务 @GlobalTransactional注解可以在事务提交前,查询全局锁是否存在,那为什么 Seata 还要设计多处一个 @GlobalLock注解呢?

因为并不是所有的数据库操作都需要开启全局事务,而开启全局事务是一个比较重的操作,需要向 TC 发起开启全局事务等 RPC 过程,而@GlobalLock注解只会在执行过程中查询全局锁是否存在,不会去开启全局事务,因此在不需要全局事务,而又需要检查全局锁避免脏读脏写时,使用@GlobalLock注解是一个更加轻量的操作。

如何防止脏写

先来看一下使用 Seata AT 模式是怎么产生脏写的:

注:分支事务执行过程省略其它过程。

业务一开启全局事务,其中包含分支事务A(修改 A)和分支事务 B(修改 B),业务二修改 A,其中业务一执行分支事务 A 先获取本地锁,业务二则等待业务一执行完分支事务 A 之后,获得本地锁修改 A 并入库,业务一在执行分支事务时发生异常了,由于分支事务 A 的数据被业务二修改,导致业务一的全局事务无法回滚。

如何防止脏写?

1、业务二执行时加 @GlobalTransactional注解:

注:分支事务执行过程省略其它过程。

业务二在执行全局事务过程中,分支事务 A 提交前注册分支事务获取全局锁时,发现业务业务一全局锁还没执行完,因此业务二提交不了,抛异常回滚,所以不会发生脏写。

2、业务二执行时加 @GlobalLock注解:

注:分支事务执行过程省略其它过程。

@GlobalTransactional注解效果类似,只不过不需要开启全局事务,只在本地事务提交前,检查全局锁是否存在。

2、业务二执行时加 @GlobalLock 注解 + select for update语句:

如果加了select for update语句,则会在 update 前检查全局锁是否存在,只有当全局锁释放之后,业务二才能开始执行 updateA 操作。

如果单单是 transactional,那么就有可能会出现脏写,根本原因是没有 Globallock 注解时,不会检查全局锁,这可能会导致另外一个全局事务回滚时,发现某个分支事务被脏写了。所以加 select for update 也有个好处,就是可以重试。

如何防止脏读

Seata AT 模式的脏读是指在全局事务未提交前,被其它业务读到已提交的分支事务的数据,本质上是Seata默认的全局事务是读未提交。

那么怎么避免脏读现象呢?

业务二查询 A 时加 @GlobalLock 注解 + select for update语句:

select for update语句会在执行 SQL 前检查全局锁是否存在,只有当全局锁完成之后,才能继续执行 SQL,这样就防止了脏读。

作者简介:

张乘辉,目前就职于蚂蚁集团,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 11 分钟

在上一篇关于新版雪花算法的解析中,我们提到新版算法所做出的2点改变:

  1. 时间戳不再时刻追随系统时钟。
  2. 节点ID和时间戳互换位置。由原版的: 原版位分配策略 改成: 改进版位分配策略

有细心的同学提出了一个问题:新版算法在单节点内部确实是单调递增的,但是在多实例部署时,它就不再是全局单调递增了啊!因为显而易见,节点ID排在高位,那么节点ID大的,生成的ID一定大于节点ID小的,不管时间上谁先谁后。而原版算法,时间戳在高位,并且始终追随系统时钟,可以保证早生成的ID小于晚生成的ID,只有当2个节点恰好在同一时间戳生成ID时,2个ID的大小才由节点ID决定。这样看来,新版算法是不是错的?

这是一个很好的问题!能提出这个问题的同学,说明已经深入思考了标准版雪花算法和新版雪花算法的本质区别,这点值得鼓励!在这里,我们先说结论:新版算法的确不具备全局的单调递增性,但这不影响我们的初衷(减少数据库的页分裂)。这个结论看起来有点违反直觉,但可以被证明。

在证明之前,我们先简单回顾一下数据库关于页分裂的知识。以经典的mysql innodb为例,innodb使用B+树索引,其中,主键索引的叶子节点还保存了数据行的完整记录,叶子节点之间以双向链表的形式串联起来。叶子节点的物理存储形式为数据页,一个数据页内最多可以存储N条行记录(N与行的大小成反比)。如图所示: 数据��页
B+树的特性要求,左边的节点应小于右边的节点。如果此时要插入一条ID为25的记录,会怎样呢(假设每个数据页只够存放4条记录)?答案是会引起页分裂,如图: 页分裂
页分裂是IO不友好的,需要新建数据页,拷贝转移旧数据页的部分记录等,我们应尽量避免。

理想的情况下,主键ID最好是顺序递增的(例如把主键设置为auto_increment),这样就只会在当前数据页放满了的时候,才需要新建下一页,双向链表永远是顺序尾部增长的,不会有中间的节点发生分裂的情况。

最糟糕的情况下,主键ID是随机无序生成的(例如java中一个UUID字符串),这种情况下,新插入的记录会随机分配到任何一个数据页,如果该页已满,就会触发页分裂。

如果主键ID由标准版雪花算法生成,最好的情况下,是每个时间戳内只有一个节点在生成ID,这时候算法的效果等同于理想情况的顺序递增,即跟auto_increment无差。最坏的情况下,是每个时间戳内所有节点都在生成ID,这时候算法的效果接近于无序(但仍比UUID的完全无序要好得多,因为workerId只有10位决定了最多只有1024个节点)。实际生产中,算法的效果取决于业务流量,并发度越低,算法越接近理想情况。

那么,换成新版算法又会如何呢?
新版算法从全局角度来看,ID是无序的,但对于每一个workerId,它生成的ID都是严格单调递增的,又因为workerId是有限的,所以最多可划分出1024个子序列,每个子序列都是单调递增的。
对于数据库而言,也许它初期接收的ID都是无序的,来自各个子序列的ID都混在一起,就像这样: 初期
如果这时候来了个worker1-seq2,显然会造成页分裂: 首次分裂
但分裂之后,有趣的事情发生了,对于worker1而言,后续的seq3,seq4不会再造成页分裂(因为还装得下),seq5也只需要像顺序增长那样新建页进行链接(区别是这个新页不是在双向链表的尾部)。注意,worker1的后续ID,不会排到worker2及之后的任意节点(因而不会造成后边节点的页分裂),因为它们总比worker2的ID小;也不会排到worker1当前节点的前边(因而不会造成前边节点的页分裂),因为worker1的子序列总是单调递增的。在这里,我们称worker1这样的子序列达到了稳态,意为这条子序列已经"稳定"了,它的后续增长只会出现在子序列的尾部,而不会造成其它节点的页分裂。

同样的事情,可以推广到各个子序列上。无论前期数据库接收到的ID有多乱,经过有限次的页分裂后,双向链表总能达到这样一个稳定的终态: 终态
到达终态后,后续的ID只会在该ID所属的子序列上进行顺序增长,而不会造成页分裂。该状态下的顺序增长与auto_increment的顺序增长的区别是,前者有1024个增长位点(各个子序列的尾部),后者只有尾部一个。

到这里,我们可以回答开头所提出的问题了:新算法从全局来看的确不是全局递增的,但该算法是收敛的,达到稳态后,新算法同样能达成像全局顺序递增一样的效果。


扩展思考

以上只提到了序列不停增长的情况,而实践生产中,不光有新数据的插入,也有旧数据的删除。而数据的删除有可能会导致页合并(innodb若发现相邻2个数据页的空间利用率都不到50%,就会把它俩合并),这对新算法的影响如何呢?

经过上面的流程,我们可以发现,新算法的本质是利用前期的页分裂,把不同的子序列逐渐分离开来,让算法不断收敛到稳态。而页合并则恰好相反,它有可能会把不同的子序列又合并回同一个数据页里,妨碍算法的收敛。尤其是在收敛的前期,频繁的页合并甚至可以让算法永远无法收敛(你刚分离出来我就又把它们合并回去,一夜回到解放前~)!但在收敛之后,只有在各个子序列的尾节点进行的页合并,才有可能破坏稳态(一个子序列的尾节点和下一个子序列的头节点进行合并)。而在子序列其余节点上的页合并,不影响稳态,因为子序列仍然是有序的,只不过长度变短了而已。

以seata的服务端为例,服务端那3张表的数据的生命周期都是比较短的,一个全局事务结束之后,它们就会被清除了,这对于新算法是不友好的,没有给时间它进行收敛。不过已经有延迟删除的PR在review中,搭配这个PR,效果会好很多。比如定期每周清理一次,前期就有足够的时间给算法进行收敛,其余的大部分时间,数据库就能从中受益了。到期清理时,最坏的结果也不过是表被清空,算法从头再来。

如果您希望把新算法应用到业务系统当中,请务必确保算法有时间进行收敛。比如用户表之类的,数据本就打算长期保存的,算法可以自然收敛。或者也做了延迟删除的机制,给算法足够的时间进行收敛。

如果您有更好的意见和建议,也欢迎跟seata社区联系!

· 阅读需 9 分钟

Seata内置了一个分布式UUID生成器,用于辅助生成全局事务ID和分支事务ID。我们希望该生成器具有如下特点:

  • 高性能
  • 全局唯一
  • 趋势递增

高性能不必多言。全局唯一很重要,否则不同的全局事务/分支事务会混淆在一起。 此外,趋势递增对于使用数据库作为TC集群的存储工具的用户而言,能降低数据页分裂的频率,从而减少数据库的IO压力 (branch_table表以分支事务ID作为主键)。

在老版Seata(1.4以前),该生成器的实现基于标准版的雪花算法。标准版雪花算法网上已经有很多解读文章了,此处就不再赘述了。 尚未了解的同学可以先看看网上的相关资料,再来看此文章。 此处我们谈谈标准版雪花算法的几个缺点:

  1. 时钟敏感。因为ID生成总是和当前操作系统的时间戳绑定的(利用了时间的单调递增性),因此若操作系统的时钟出现回拨, 生成的ID就会重复(一般而言不会人为地去回拨时钟,但服务器会有偶发的"时钟漂移"现象)。 对于此问题,Seata的解决策略是记录上一次的时间戳,若发现当前时间戳小于记录值(意味着出现了时钟回拨),则拒绝服务, 等待时间戳追上记录值。 但这也意味着这段时间内该TC将处于不可用状态。
  2. 突发性能有上限。标准版雪花算法宣称的QPS很大,约400w/s,但严格来说这算耍了个文字游戏~ 因为算法的时间戳单位是毫秒,而分配给序列号的位长度为12,即每毫秒4096个序列空间。 所以更准确的描述应该是4096/ms。400w/s与4096/ms的区别在于前者不要求每一毫秒的并发都必须低于4096 (也许有些毫秒会高于4096,有些则低于)。Seata亦遵循此限制,若当前时间戳的序列空间已耗尽,会自旋等待下一个时间戳。

在较新的版本上(1.4之后),该生成器针对原算法进行了一定的优化改良,很好地解决了上述的2个问题。 改进的核心思想是解除与操作系统时间戳的时刻绑定,生成器只在初始化时获取了系统当前的时间戳,作为初始时间戳, 但之后就不再与系统时间戳保持同步了。它之后的递增,只由序列号的递增来驱动。比如序列号当前值是4095,下一个请求进来, 序列号+1溢出12位空间,序列号重新归零,而溢出的进位则加到时间戳上,从而让时间戳+1。 至此,时间戳和序列号实际可视为一个整体了。实际上我们也是这样做的,为了方便这种溢出进位,我们调整了64位ID的位分配策略, 由原版的: 原版位分配策略

改成(即时间戳和节点ID换个位置): 改进版位分配策略

这样时间戳和序列号在内存上是连在一块的,在实现上就很容易用一个AtomicLong来同时保存它俩:

/**
* timestamp and sequence mix in one Long
* highest 11 bit: not used
* middle 41 bit: timestamp
* lowest 12 bit: sequence
*/
private AtomicLong timestampAndSequence;

最高11位可以在初始化时就确定好,之后不再变化:

/**
* business meaning: machine ID (0 ~ 1023)
* actual layout in memory:
* highest 1 bit: 0
* middle 10 bit: workerId
* lowest 53 bit: all 0
*/
private long workerId;

那么在生产ID时就很简单了:

public long nextId() {
// 获得递增后的时间戳和序列号
long next = timestampAndSequence.incrementAndGet();
// 截取低53位
long timestampWithSequence = next & timestampAndSequenceMask;
// 跟先前保存好的高11位进行一个或的位运算
return workerId | timestampWithSequence;
}

至此,我们可以发现:

  1. 生成器不再有4096/ms的突发性能限制了。倘若某个时间戳的序列号空间耗尽,它会直接推进到下一个时间戳, "借用"下一个时间戳的序列号空间(不必担心这种"超前消费"会造成严重后果,下面会阐述理由)。
  2. 生成器弱依赖于操作系统时钟。在运行期间,生成器不受时钟回拨的影响(无论是人为回拨还是机器的时钟漂移), 因为生成器仅在启动时获取了一遍系统时钟,之后两者不再保持同步。 唯一可能产生重复ID的只有在重启时的大幅度时钟回拨(人为刻意回拨或者修改操作系统时区,如北京时间改为伦敦时间~ 机器时钟漂移基本是毫秒级的,不会有这么大的幅度)。
  3. 持续不断的"超前消费"会不会使得生成器内的时间戳大大超前于系统的时间戳, 从而在重启时造成ID重复? 理论上如此,但实际几乎不可能。要达到这种效果,意味该生成器接收的QPS得持续稳定在400w/s之上~ 说实话,TC也扛不住这么高的流量,所以说呢,天塌下来有个子高的先扛着,瓶颈一定不在生成器这里。

此外,我们还调整了下节点ID的生成策略。原版在用户未手动指定节点ID时,会截取本地IPv4地址的低10位作为节点ID。 在实践生产中,发现有零散的节点ID重复的现象(多为采用k8s部署的用户)。例如这样的IP就会重复:

  • 192.168.4.10
  • 192.168.8.10

即只要IP的第4个字节和第3个字节的低2位一样就会重复。 新版的策略改为优先从本机网卡的MAC地址截取低10位,若本机未配置有效的网卡,则在[0, 1023]中随机挑一个作为节点ID。 这样调整后似乎没有新版的用户再报同样的问题了(当然,有待时间的检验,不管怎样,不会比IP截取策略更糟糕)。

以上就是对Seata的分布式UUID生成器的简析,如果您喜欢这个生成器,也可以直接在您的项目里使用它, 它的类声明是public的,完整类名为: io.seata.common.util.IdWorker

当然,如果您有更好的点子,也欢迎跟Seata社区讨论。

· 阅读需 6 分钟

现状 & 痛点

对于Seata而言,是通过记录DML操作的前后的数据用于进行后续可能的回滚操作的,并且把这些数据保存到数据库的一个blob的字段里面。对于批量插入,更新,删除等操作,其影响的行数可能会比较多,拼接成一个大的字段插入到数据库,可能会带来以下问题:

  1. 超出数据库单次操作的最大写入限制(比如MySQL的max_allowed_package参数);
  2. 较大的数据量带来的网络IO和数据库磁盘IO开销比较大。

头脑风暴

对于第1点的问题,可以根据业务的实际情况,调大max_allowed_package参数的限制,从而避免出现query is too large的问题;对于第2点,可以通过提高带宽和选用高性能的SSD作为数据库的存储介质。

以上都是通过外部方案或者加钱方案去解决的。那么有没有框架层面解决方案以解决上面的痛点?

此时结合到以上的痛点出现的根源,在于生成的数据字段过大。为此,如果可以把对应的数据进行业务方压缩之后,再进行数据传输以及落库,理论上也可以解决上面的问题。

可行性分析

结合以上头脑风暴的内容,考虑在实际开发中,当需要进行大批量操作的时候,大多会选在较少用户操作,并发相对较低的时间点执行,此时CPU,内存等资源可以相对占用多一点以快速完成对应的操作。因此,可以通过消耗CPU资源和内存资源,来对对应的回滚的数据进行压缩,从而缩小数据传输和存储的大小。

此时,还需要证明以下两件事:

  1. 经过压缩之后,可以减少网络IO和数据库磁盘IO的压力,这里可以采用数据压缩+落库完成的总时间作为侧面参考指标。
  2. 经过压缩之后,数据大小跟原来比较的压缩效率有多高,这里使用压缩前后的数据大小来作为指标。

压缩网络用时指标测试:

image

压缩比测试:

image

通过以上的测试结果,可以明显的看出,使用gzip或zip进行压缩的情况下,可以较大程度的减少数据库的压力和网络传输的压力,同时也可以较大幅度的减少保存的数据的大小。

实现

实现思路

压缩

部分代码

properties配置:

# 是否开启undo_log压缩,默认为true
seata.client.undo.compress.enable=true
# 压缩器类型,默认为zip,一般建议都是zip
seata.client.undo.compress.type=zip
# 启动压缩的阈值,默认为64k
seata.client.undo.compress.threshold=64k

判断是否开启了undo_log压缩功能以及是否达到压缩的阈值:

protected boolean needCompress(byte[] undoLogContent) {
// 1. 判断是否开启了undo_log压缩功能(1.4.2默认开启)
// 2. 判断是否达到了压缩的阈值(默认64k)
// 如果都满足返回需要对对应的undoLogContent进行压缩
return ROLLBACK_INFO_COMPRESS_ENABLE
&& undoLogContent.length > ROLLBACK_INFO_COMPRESS_THRESHOLD;
}

确定需要压缩后,对undo_log进行压缩:

// 如果需要压缩,对undo_log进行压缩
if (needCompress(undoLogContent)) {
// 获取压缩类型,默认zip
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
// 获取对应的压缩器,并且进行压缩
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
}
// else 不需要压缩就不需要做任何操作

将压缩类型同步保存到数据库,供回滚时使用:

protected String buildContext(String serializer, CompressorType compressorType) {
Map<String, String> map = new HashMap<>();
map.put(UndoLogConstants.SERIALIZER_KEY, serializer);
// 保存压缩类型到数据库
map.put(UndoLogConstants.COMPRESSOR_TYPE_KEY, compressorType.name());
return CollectionUtils.encodeMap(map);
}

回滚时解压缩对应的信息:

protected byte[] getRollbackInfo(ResultSet rs) throws SQLException  {
// 获取保存到数据库的回滚信息的字节数组
byte[] rollbackInfo = rs.getBytes(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
// 获取压缩类型
// getOrDefault使用默认值CompressorType.NONE来兼容1.4.2之前的版本直接升级1.4.2+
String rollbackInfoContext = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = CollectionUtils.decodeMap(rollbackInfoContext);
CompressorType compressorType = CompressorType.getByName(context.getOrDefault(UndoLogConstants.COMPRESSOR_TYPE_KEY,
CompressorType.NONE.name()));
// 获取对应的压缩器,并且解压缩
return CompressorFactory.getCompressor(compressorType.getCode())
.decompress(rollbackInfo);
}

结语

通过对undo_log的压缩,在框架层面,进一步提高Seata在处理数据量较大的时候的性能。同时,也提供了对应的开关和相对合理的默认值,既方便用户进行开箱即用,也方便用户根据实际需求进行一定的调整,使得对应的功能更适合实际使用场景。

· 阅读需 16 分钟
  1. seata版本:1.4.0,但1.4以下的所有版本也都有这个问题
  2. 问题描述:在一个全局事务中,一个分支事务上的纯查询操作突然卡住了,没有任何反馈(日志/异常),直到消费端RPC超时

image.png

问题排查

  1. 整个流程在一个全局事务中,消费者和提供者可以看成是全局事务中的两个分支事务,消费者 --> 提供者
  2. 消费者先执行本地的一些逻辑,然后向提供者发送RPC请求,确定消费者发出了请求已经并且提供者接到了请求
  3. 提供者先打印一条日志,然后执行一条纯查询SQL,如果SQL正常执行会打印日志,但目前的现象是只打印了执行SQL前的那条日志,而没有打印任何SQL相关的日志。找DBA查SQL日志,发现该SQL没有执行
  4. 确定了该操作只是全局事务下的一个纯查询操作,在该操作之前,全局事务中的整体流程完全正常
  5. 其实到这里现象已经很明显了,不过当时想法没转变过来,一直关注那条查询SQL,总在想就算查询超时等原因也应该抛出异常啊,不应该什么都没有。DBA都找不到查询记录,那是不是说明SQL可能根本就没执行啊,而是在执行SQL前就出问题了,比如代理?
  6. 借助arthas的watch命令,一直没有东西输出。第一条日志的输出代表这个方法一定执行了,迟迟没有结果输出说明当前请求卡住了,为什么卡住了呢?
  7. 借助arthas的thread命令 thread -bthread -n,就是要找出当前最忙的线程。这个效果很好,有一个线程CPU使用率92%,并且因为该线程导致其它20多个Dubbo线程BLOCKED了。堆栈信息如下
  8. 分析堆栈信息,已经可以很明显的发现和seata相关的接口了,估计和seata的数据源代理有关;同时发现CPU占用最高的那个线程卡在了ConcurrentHashMap#computeIfAbsent方法中。难道ConcurrentHashMap#computeIfAbsent方法有bug?
  9. 到这里,问题的具体原因我们还不知道,但应该和seata的数据源代理有点关系,具体原因我们需要分析业务代码和seata代码

image.png

问题分析

ConcurrentHashMap#computeIfAbsent

这个方法确实有可能出问题:如果两个key的hascode相同,并且在对应的mappingFunction中又进行了computeIfAbsent操作,则会导致死循环,具体分析参考这篇文章:https://juejin.cn/post/6844904191077384200

Seata数据源自动代理

相关内容之前有分析过,我们重点来看看以下几个核心的类:

  1. SeataDataSourceBeanPostProcessor
  2. SeataAutoDataSourceProxyAdvice
  3. DataSourceProxyHolder
SeataDataSourceBeanPostProcessor

SeataDataSourceBeanPostProcessorBeanPostProcessor实现类,在postProcessAfterInitialization方法(即Bean初始化之后)中,会为业务方配置的数据源创建对应的seata代理数据源

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}
//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy.");
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean;
}
}
SeataAutoDataSourceProxyAdvice

SeataAutoDataSourceProxyAdvice是一个MethodInterceptor,seata中的SeataAutoDataSourceProxyCreator会针对DataSource类型的Bean创建动态代理对象,代理逻辑就是SeataAutoDataSourceProxyAdvice#invoke逻辑。即:执行数据源AOP代理对象的相关方法时候,会经过其invoke方法,在invoke方法中再根据当原生数据源,找到对应的seata代理数据源,最终达到执行seata代理数据源逻辑的目的

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
......
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
return invocation.proceed();
}
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m != null) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
}
DataSourceProxyHolder

流程上我们已经清楚了,现在还有一个问题,如何维护原生数据源seata代理数据源之间的关系?通过DataSourceProxyHolder维护,这是一个单例对象,该对象中通过一个ConcurrentHashMap维护两者的关系:原生数据源为key --> seata代理数据源 为value

public class DataSourceProxyHolder {
public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource = dataSource;
......
return CollectionUtils.computeIfAbsent(this.dataSourceProxyMap, originalDataSource,
BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new);
}
}


// CollectionUtils.java
public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value != null) {
return value;
}
return map.computeIfAbsent(key, mappingFunction);
}

客户端数据源配置

  1. 配置了两个数据源:DynamicDataSourceP6DataSource
  2. P6DataSource可以看成是对DynamicDataSource的一层包装
  3. 我们暂时不去管这个配置合不合理,现在只是单纯的基于这个数据源配置分析问题
@Qualifier("dsMaster")
@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}

@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(dsMaster());
return p6DataSource;
}

分析过程

假设现在大家都已经知道了 ConcurrentHashMap#computeIfAbsent 可能会产生的问题,已知现在产生了这个问题,结合堆栈信息,我们可以知道大概哪里产生了这个问题。

1、ConcurrentHashMap#computeIfAbsent会产生这个问题的前提条件是:两个key的hashcode相同mappingFunction中对应了一个put操作。结合我们seata的使用场景,mappingFunction对应的是DataSourceProxy::new,说明在DataSourceProxy的构造方法中可能会触发put操作

image.png

执行AOP代理数据源相关方法 =>
进入SeataAutoDataSourceProxyAdvice切面逻辑 =>
执行DataSourceProxyHolder#putDataSource方法 =>
执行DataSourceProxy::new =>
AOP代理数据源的getConnection方法 =>
原生数据源的getConnection方法 =>
进入SeataAutoDataSourceProxyAdvice切面逻辑 =>
执行DataSourceProxyHolder#putDataSource方法 =>
执行DataSourceProxy::new =>
DuridDataSource的getConnection方法

2、步骤1中说的AOP代理数据源原生数据源分别是什么?看下面这张图 image.png

3、上面还说到了产生这个问题还有一个条件两个key的hashcode相同,但我看这两个数据源对象都没有重写hashcode方法,所以按理来说,这两个对象的hashcode一定是不同的。后面又再看了一遍ConcurrentHashMap这个问题,感觉两个key的hashcode相同这个说法是不对的,两个key会产生hash冲突更合理一些,这样就能解释两个hashcode不同的对象为啥会遇上这个问题了。为了证明这个,下面我给了一个例子

public class Test {
public static void main(String[] args) {
ConcurrentHashMap map = new ConcurrentHashMap(8);
Num n1 = new Num(3);
Num n2 = new Num(19);
Num n3 = new Num(20);

// map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)); // 这行代码不会导致程序死循环
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)); // 这行代码会导致程序死循环
}

static class Num{
private int i;
public Num(int i){
this.i = i;
}

@Override
public int hashCode() {
return i;
}
}
}
  1. 为了方便重现问题,我们重写了Num#hashCode方法,保证构造函数入参就是hashcode的值
  2. 创建一个ConcurrentHashMap对象,initialCapacity为8,sizeCtl计算出来的值为16,即该map中数组长度默认为16
  3. 创建对象n1,入参为3,即hashcode为3,计算得出其对应的数组下标为3
  4. 创建对象n2,入参为19,即hashcode为19,计算得出其对应的数组下标为3,此时我们可以认为n1和n2产生了hash冲突
  5. 创建对象n3,入参为20,即hashcode为20,计算得出其对应的数组下标为4
  6. 执行map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)),程序正常退出:因为n1和n3没有hash冲突,所以正常结束
  7. 执行map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)),程序正常退出:因为n1和n2产生了hash冲突,所以陷入死循环

4、在对象初始化的时候,SeataDataSourceBeanPostProcessor不是已经将对象对应的数据源代理初始化好了吗?为什么在SeataAutoDataSourceProxyAdvice中还是会创建对应的数据源代理呢?

  1. 首先,SeataDataSourceBeanPostProcessor执行时期是晚于AOP代理对象创建的,所以在执行SeataDataSourceBeanPostProcessor相关方法的时候,SeataAutoDataSourceProxyAdvice其实应生效了
  2. SeataDataSourceBeanPostProcessor中向map中添加元素时,key为AOP代理数据源SeataAutoDataSourceProxyAdvice中的invocation.getThis()中拿到的是原生数据源,所以key不相同

image.png

5、还有一个问题,SeataAutoDataSourceProxyAdvic#invoke方法中并没有过滤toString、hashCode等方法,cglib创建的代理对象默认会重写这几个方法,如果在向map中put元素的时候触发了代理对象的这些方法,此时又会重新进入SeataAutoDataSourceProxyAdvic#invoke切面,直到线程堆栈益处

问题总结

  1. 在两个key会产生hash冲突的时候,会触发ConcurrentHashMap#computeIfAbsentBUG,该BUG的表现就是让当前线程陷入死循环
  2. 业务反馈,该问题是偶现的,偶现的原因有两种:首先,该应用是多节点部署,但线上只有一个节点触发了该BUG(hashcode冲突),所以只有当请求打到这个节点的时候才有可能会触发该BUG;其次,因为每次重启对象地址(hashcode)都是不确定的,所以并不是每次应用重启之后都会触发,但如果一旦触发,该节点就会一直存在这个问题。有一个线程一直在死循环,并将其它尝试从map中获取代理数据源的线程阻塞了,这种现象在业务上的反馈就是请求卡住了。如果连续请求都是这样,此时业务方可能会重启服务,然后因为重启后hash冲突不一定存在,可能重启后业务表现就正常了,但也有可能在下次重启的时候又会触发了这个BUG
  3. 当遇到这个问题时,从整个问题上来看,确实就是死锁了,因为那个死循环的线程占者锁一直不释放,导致其它操作该map的线程被BLOCK了
  4. 本质上还是因为ConcurrentHashMap#computeIfAbsent方法可能会触发BUG,而seata的使用场景刚好就触发了该BUG
  5. 下面这个demo其实就完整的模拟了线上出问题时的场景,如下:
public class Test {
public static void main(String[] args) {

ConcurrentHashMap map = new ConcurrentHashMap(8);

Num n1 = new Num(3);
Num n2 = new Num(19);

for(int i = 0; i< 20; i++){
new Thread(()-> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

map.computeIfAbsent(n1, k-> 200);
}).start();
}
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200));
}


static class Num{
private int i;

public Num(int i){
this.i = i;
}
@Override
public int hashCode() {
return i;
}
}
}

image.png

解决问题

可以从两方面解决这个问题:

  1. 业务方改动:P6DataSource 和 DynamicDataSource 没必要都被代理,直接代理P6DataSource就可以了,DynamicDataSource没有必要声明成一个Bean;或者通过excluds属性排除P6DataSource,这样就不会存在重复代理问题
  2. Seata完善:完善数据源代理相关逻辑
业务方改动

1、数据源相关的配置改成如下即可:

@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(new TuYaDynamicDataSource(masterDsRoute));
logger.warn("dsMaster={}, hashcode={}",p6DataSource, p6DataSource.hashCode());
return p6DataSource;
}

2、或者不改变目前的数据源配置,添加excluds属性

@EnableAutoDataSourceProxy(excludes={"P6DataSource"})
Seata完善

1、ConcurrentHashMap#computeIfAbsent方法改成双重检查,如下:

SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
synchronized (dataSourceProxyMap) {
dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
dataSourceProxyMap.put(originalDataSource, dsProxy);
}
}
}
return dsProxy;

之前我想直接改CollectionUtils#computeIfAbsent方法,群里大佬反馈这样可能会导致数据源多次创建,确实有这个问题:如下

public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value != null) {
return value;
}
value = mappingFunction.apply(key);
return map.computeIfAbsent(key, value);
}

2、SeataAutoDataSourceProxyAdvice切面逻辑中添加一些过滤

Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m != null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}

遗留问题

SeataDataSourceBeanPostProcessorSeataAutoDataSourceProxyAdvice对应方法中,向map中初始化seata数据源代理时对应的key根本就不同,SeataDataSourceBeanPostProcessor中对应的key是AOP代理数据源SeataAutoDataSourceProxyAdvice中对应的key是原生对象,此时就造成了不必要的seata数据源代理对象的创建。

针对这个问题,大家有什么好的建议?能不能为SeataDataSourceBeanPostProcessor指定一个order,让其在创建AOP代理对象之前生效

原文链接

https://juejin.cn/post/6939041336964153352/