跳到主要内容

· 阅读需 14 分钟

Seata阿里开源的一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

1.1 四种事务模式

Seata 目标打造一站式的分布事务的解决方案,最终会提供四种事务模式:

目前使用的流行度情况是:AT > TCC > Saga。因此,我们在学习 Seata 的时候,可以花更多精力在 AT 模式上,最好搞懂背后的实现原理,毕竟分布式事务涉及到数据的正确性,出问题需要快速排查定位并解决。

友情提示:具体的流行度,朋友可以选择看看 Wanted: who's using Seata 每个公司登记的使用方式。

1.2 三种角色

在 Seata 的架构中,一共有三个角色:

三个角色

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围,开始全局事务、提交或回滚全局事务。
  • RM ( Resource Manager ) - 资源管理器:管理分支事务处理的资源( Resource ),与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

其中,TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端。

在 Seata 中,一个分布式事务的生命周期如下:

架构图

友情提示:看下艿艿添加的红色小勾。

  • TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。

    XID,会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。

  • RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。

  • TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。

  • TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。

1.3 框架支持情况

Seata 目前提供了对主流的微服务框架的支持:

同时方便我们集成到 Java 项目当中,Seata 也提供了相应的 Starter 库:

因为 Seata 是基于 DataSource 数据源进行代理来拓展,所以天然对主流的 ORM 框架提供了非常好的支持:

  • MyBatis、MyBatis-Plus
  • JPA、Hibernate

1.4 案例情况

Wanted: who's using Seata 的登记情况,Seata 已经在国内很多团队开始落地,其中不乏有滴滴、韵达等大型公司。可汇总如下图:

汇总图

另外,在 awesome-seata 仓库中,艿艿看到了滴滴等等公司的落地时的技术分享,还是非常真实可靠的。如下图所示:awesome-seata 滴滴

从案例的情况来说,Seata 可能给是目前已知最可靠的分布式事务解决方案,至少对它进行技术投入是非常不错的选择。

2. 部署单机 TC Server

本小节,我们来学习部署单机 Seata TC Server,常用于学习或测试使用,不建议在生产环境中部署单机。

因为 TC 需要进行全局事务和分支事务的记录,所以需要对应的存储。目前,TC 有两种存储模式( store.mode ):

  • file 模式:适合单机模式,全局事务会话信息在内存中读写,并持久化本地文件 root.data,性能较高。
  • db 模式:适合集群模式,全局事务会话信息通过 db 共享,相对性能差点。

显然,我们将采用 file 模式,最终我们部署单机 TC Server 如下图所示:单机 TC Server

哔哔完这么多,我们开始正式部署单机 TC Server,这里艿艿使用 macOS 系统,和 Linux、Windows 是差不多的,朋友脑补翻译。

2.1 下载 Seata 软件包

打开 Seata 下载页面,选择想要的 Seata 版本。这里,我们选择 v1.1.0 最新版本。

# 创建目录
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# 下载
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# 解压
$ tar -zxvf seata-server-1.1.0.tar.gz

# 查看目录
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # 执行脚本
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # 配置文件
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + 依赖库

2.2 启动 TC Server

执行 nohup sh bin/seata-server.sh & 命令,启动 TC Server 在后台。在 nohup.out 文件中,我们看到如下日志,说明启动成功:

# 使用 File 存储器
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[FILE] extension by class[io.seata.server.store.file.FileTransactionStoreManager]
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[FILE] extension by class[io.seata.server.session.file.FileBasedSessionManager]
# 启动成功
2020-04-02 08:36:01.597 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
  • 默认配置下,Seata TC Server 启动在 8091 端点。

因为我们使用 file 模式,所以可以看到用于持久化的本地文件 root.data。操作命令如下:

$ ls -ls sessionStore/
total 0
0 -rw-r--r-- 1 yunai staff 0 Apr 2 08:36 root.data

后续,朋友可以阅读「4. 接入 Java 应用」小节,开始使用 Seata 实现分布式事务。

3. 部署集群 TC Server

本小节,我们来学习部署集群 Seata TC Server,实现高可用,生产环境下必备。在集群时,多个 Seata TC Server 通过 db 数据库,实现全局事务会话信息的共享。

同时,每个 Seata TC Server 可以注册自己到注册中心上,方便应用从注册中心获得到他们。最终我们部署 集群 TC Server 如下图所示:集群 TC Server

Seata TC Server 对主流的注册中心都提供了集成,具体可见 discovery 目录。考虑到国内使用 Nacos 作为注册中心越来越流行,这里我们就采用它。

友情提示:如果对 Nacos 不了解的朋友,可以参考《Nacos 安装部署》文章。

哔哔完这么多,我们开始正式部署单机 TC Server,这里艿艿使用 macOS 系统,和 Linux、Windows 是差不多的,朋友脑补翻译。

3.1 下载 Seata 软件包

打开 Seata 下载页面,选择想要的 Seata 版本。这里,我们选择 v1.1.0 最新版本。

# 创建目录
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# 下载
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# 解压
$ tar -zxvf seata-server-1.1.0.tar.gz

# 查看目录
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # 执行脚本
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # 配置文件
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + 依赖库

3.2 初始化数据库

① 使用 mysql.sql 脚本,初始化 Seata TC Server 的 db 数据库。脚本内容如下:

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

在 MySQL 中,创建 seata 数据库,并在该库下执行该脚本。最终结果如下图:seata 数据库 - MySQL 5.X

② 修改 conf/file 配置文件,修改使用 db 数据库,实现 Seata TC Server 的全局事务会话信息的共享。如下图所示:conf/file 配置文件

③ MySQL8 的支持

如果朋友使用的 MySQL 是 8.X 版本,则需要看该步骤。否则,可以直接跳过。

首先,需要下载 MySQL 8.X JDBC 驱动,命令行操作如下:

$ cd lib
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar

然后,修改 conf/file 配置文件,使用该 MySQL 8.X JDBC 驱动。如下图所示:seata 数据库 - MySQL 8.X

3.3 设置使用 Nacos 注册中心

修改 conf/registry.conf 配置文件,设置使用 Nacos 注册中心。如下图所示:conf/registry.conf 配置文件

3.4 启动 TC Server

① 执行 nohup sh bin/seata-server.sh -p 18091 -n 1 & 命令,启动第一个 TC Server 在后台。

  • -p:Seata TC Server 监听的端口。
  • -n:Server node。在多个 TC Server 时,需区分各自节点,用于生成不同区间的 transactionId 事务编号,以免冲突。

nohup.out 文件中,我们看到如下日志,说明启动成功:

# 使用 DB 存储器
2020-04-05 16:54:12.793 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load DataSourceGenerator[dbcp] extension by class[io.seata.server.store.db.DbcpDataSourceGenerator]
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load LogStore[DB] extension by class[io.seata.core.store.db.LogStoreDataBaseDAO]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[DB] extension by class[io.seata.server.store.db.DatabaseTransactionStoreManager]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[DB] extension by class[io.seata.server.session.db.DataBaseSessionManager]
# 启动成功
2020-04-05 16:54:13.779 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
# 使用 Nacos 注册中心
2020-04-05 16:54:13.788 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load RegistryProvider[Nacos] extension by class[io.seata.discovery.registry.nacos.NacosRegistryProvider]

② 执行 nohup sh bin/seata-server.sh -p 28091 -n 2 & 命令,启动第二个 TC Server 在后台。

③ 打开 Nacos 注册中心的控制台,我们可以看到有两个 Seata TC Server 示例。如下图所示:Nacos 控制台

4. 接入 Java 应用

4.1 AT 模式

① Spring Boot

1、《芋道 Spring Boot 分布式事务 Seata 入门》「2. AT 模式 + 多数据源」小节,实现单体 Spring Boot 项目在多数据源下的分布式事务。

整体图

2、《芋道 Spring Boot 分布式事务 Seata 入门》「AT 模式 + HttpClient 远程调用」小节,实现多个 Spring Boot 项目的分布事务。

整体图

② Dubbo

《Dubbo 分布式事务 Seata 入门》「2. AT 模式」小节,实现多个 Dubbo 服务下的分布式事务。

整体图

③ Spring Cloud

《芋道 Spring Cloud Alibaba 分布式事务 Seata 入门》「3. AT 模式 + Feign」小节,实现多个 Spring Cloud 服务下的分布式事务。

整体图

4.2 TCC 模式

4.3 Saga 模式

4.4 XA 模式

Seata 正在开发中...

· 阅读需 7 分钟

使用配置中心和数据库来实现 Seata 的高可用,以 Nacos 和 MySQL 为例,将cloud-seata-nacos应用部署到 Kubernetes 集群中

该应用使用 Nacos 作为配置和注册中心,总共有三个服务: order-service, pay-service, storage-service, 其中 order-service 对外提供下单接口,当余额和库存充足时,下单成功,会提交事务,当不足时会抛出异常,下单失败,回滚事务

准备工作

需要准备可用的注册中心、配置中心 Nacos 和 MySQL,通常情况下,注册中心、配置中心和数据库都是已有的,不需要特别配置,在这个实践中,为了简单,只部署单机的注册中心、配置中心和数据库,假设他们是可靠的

  • 部署 Nacos

在服务器部署 Nacos,开放 8848 端口,用于 seata-server 注册,服务器地址为 192.168.199.2

docker run --name nacos -p 8848:8848 -e MODE=standalone nacos/nacos-server
  • 部署 MySQL

部署一台MySQL 数据库,用于保存事务数据,服务器地址为 192.168.199.2

docker run --name mysql -p 30060:3306-e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17

部署 seata-server

  • 创建seata-server需要的表

具体的 SQL 参考 script/server/db,这里使用的是 MySQL 的脚本,数据库名称为 seata

同时,也需要创建 undo_log 表, 可以参考 script/client/at/db/

  • 修改seata-server配置

将以下配置添加到 Nacos 配置中心,具体添加方法可以参考 script/config-center

service.vgroupMapping.my_test_tx_group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true
store.db.user=root
store.db.password=123456

部署 seata-server 到 Kubernetes

  • seata-server.yaml

需要将 ConfigMap 的注册中心和配置中心地址改成相应的地址

apiVersion: v1
kind: Service
metadata:
name: seata-ha-server
namespace: default
labels:
app.kubernetes.io/name: seata-ha-server
spec:
type: ClusterIP
ports:
- port: 8091
protocol: TCP
name: http
selector:
app.kubernetes.io/name: seata-ha-server

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
name: seata-ha-server
namespace: default
labels:
app.kubernetes.io/name: seata-ha-server
spec:
serviceName: seata-ha-server
replicas: 3
selector:
matchLabels:
app.kubernetes.io/name: seata-ha-server
template:
metadata:
labels:
app.kubernetes.io/name: seata-ha-server
spec:
containers:
- name: seata-ha-server
image: docker.io/seataio/seata-server:latest
imagePullPolicy: IfNotPresent
env:
- name: SEATA_CONFIG_NAME
value: file:/root/seata-config/registry
ports:
- name: http
containerPort: 8091
protocol: TCP
volumeMounts:
- name: seata-config
mountPath: /root/seata-config
volumes:
- name: seata-config
configMap:
name: seata-ha-server-config


---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-ha-server-config
data:
registry.conf: |
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "192.168.199.2"
}
}
config {
type = "nacos"
nacos {
serverAddr = "192.168.199.2"
group = "SEATA_GROUP"
}
}
  • 部署
kubectl apply -f seata-server.yaml

部署完成后,会有三个 pod

kubectl get pod | grep seata-ha-server

seata-ha-server-645844b8b6-9qh5j 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-pzczs 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-wkpw8 1/1 Running 0 3m14s

待启动完成后,可以在 Nacos 的服务列表中发现三个 seata-server 的实例,至此,已经完成 seata-server 的高可用部署

  • 查看服务日志
kubelet logs -f seata-ha-server-645844b8b6-9qh5j
[0.012s][info   ][gc] Using Serial
2020-04-15 00:55:09.880 INFO [main]io.seata.server.ParameterParser.init:90 -The server is running in container.
2020-04-15 00:55:10.013 INFO [main]io.seata.config.FileConfiguration.<init>:110 -The configuration file used is file:/root/seata-config/registry.conf
2020-04-15 00:55:12.426 INFO [main]com.alibaba.druid.pool.DruidDataSource.init:947 -{dataSource-1} inited
2020-04-15 00:55:13.127 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started

其中{dataSource-1} 说明使用了数据库,并正常初始化完成

  • 查看注册中心,此时seata-serve 这个服务会有三个实例

seata-ha-nacos-list.png

部署业务服务

  • 创建业务表并初始化数据

具体的业务表可以参考 cloud-seata-nacos/README.md

  • 添加 Nacos 配置

在 public 的命名空间下,分别创建 data-id 为 order-service.properties, pay-service.properties, storage-service.properties 的配置,内容相同,需要修改数据库的地址、用户名和密码

# MySQL
spring.datasource.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Seata
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
  • 部署服务

通过 application.yaml 配置文件部署服务,需要注意的是修改 ConfigMap 的 NACOS_ADDR为自己的 Nacos 地址

apiVersion: v1
kind: Service
metadata:
namespace: default
name: seata-ha-service
labels:
app.kubernetes.io/name: seata-ha-service
spec:
type: NodePort
ports:
- port: 8081
nodePort: 30081
protocol: TCP
name: http
selector:
app.kubernetes.io/name: seata-ha-service

---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-ha-service-config
data:
NACOS_ADDR: 192.168.199.2:8848

---
apiVersion: v1
kind: ServiceAccount
metadata:
name: seata-ha-account
namespace: default

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: seata-ha-account
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: seata-ha-account
namespace: default

---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: default
name: seata-ha-service
labels:
app.kubernetes.io/name: seata-ha-service
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: seata-ha-service
template:
metadata:
labels:
app.kubernetes.io/name: seata-ha-service
spec:
serviceAccountName: seata-ha-account
containers:
- name: seata-ha-order-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-order-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8081
protocol: TCP
- name: seata-ha-pay-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-pay-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8082
protocol: TCP
- name: seata-ha-storage-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-storage-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8083
protocol: TCP

通过以下命令,将应用部署到集群中

kubectl apply -f application.yaml

然后查看创建的 pod,seata-ha-service 这个服务下有三个 pod

kubectl get pod | grep seata-ha-service

seata-ha-service-7dbdc6894b-5r8q4 3/3 Running 0 12m

待应用启动后,在 Nacos 的服务列表中,会有相应的服务

seata-ha-service-list.png

此时查看服务的日志,会看到服务向每一个 TC 都注册了

kubectl logs -f seata-ha-service-7dbdc6894b-5r8q4 seata-ha-order-service

seata-ha-service-register.png

查看任意的 TC 日志,会发现每一个服务都向 TC 注册了

kubelet logs -f seata-ha-server-645844b8b6-9qh5j

seata-ha-tc-register.png

测试

测试成功场景

调用下单接口,将 price 设置为 1,因为初始化的余额为 10,可以下单成功

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 1
}'

此时返回结果为:

{"success":true,"message":null,"data":null}

查看TC 的日志,事务成功提交:

seata-ha-commit-tc-success.png

查看 order-service 服务日志 seata-ha-commit-success.png

测试失败场景

设置 price 为 100,此时余额不足,会下单失败抛出异常,事务会回滚

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 100
}'

查看 TC 的日志: seata-ha-commit-tc-rollback.png

查看服务的日志 : seata-ha-commit-service-rollback.png

多次调用查看服务日志,发现会随机的向其中某台TC发起事务注册,当扩容或缩容后,有相应的 TC 参与或退出,证明高可用部署生效

· 阅读需 8 分钟

一 . 导读

根据大佬定义的分类,配置可以有三种:环境配置、描述配置、扩展配置。

环境配置:像一些组件启动时的参数等,通常是离散的简单值,多是 key-value 型数据。

描述配置:与业务逻辑相关,比如:事务发起方和参与方,通常会嵌到业务的生命周期管理中。描述配置信息较多,甚至有层次关系。

扩展配置:产品需要发现第三方实现,对配置的聚合要求比较高,比如:各种配置中心和注册中心,通常做法是在 jar 包的 META-INF/services 下放置接口类全名文件,内容为每行一个实现类类名。

二. 环境配置

seata server 在加载的时候,会使用 resources/registry.conf 来确定配置中心和注册中心的类型。而 seata client 在 1.0 版本后,不仅能使用 conf 文件进行配置的加载,也可以在 springboot 的 yml 配置文件中,使用 seata.config.{type} 来进行配置中心的选择,注册中心与之类似。通过 yml 加载配置的源码在 io.seata.spring.boot.autoconfigure.properties.registry 包下。

如果 seata 客户端的使用者既在 resources 下放了 conf 配置文件又在 yml 文件中配置,那么会优先使用 yml 中配置的。代码:

CURRENT_FILE_INSTANCE = null == extConfiguration ? configuration : extConfiguration;

这里 extConfiguration 是外部配置实例,即 ExtConfigurationProvider#provide() 外部配置提供类提供的,而 configuration 是另一个配置提供类提供的 ConfigurationProvider#provide(),这两个配置提供类是在 config 模块 ConfigurationFactory 静态块中,通过 SPI 的方式加载。

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);

上面说的是配置中心类型的选择,而配置环境的加载,是在确定了使用什么配置中心类型后,再通过相应的配置中心加载环境配置。File 即文本方式配置也是一种配置中心。

client 和 server 获取配置参数,是通过 ConfigurationFactory#getInstance() 获取配置类实例,再使用配置类实例获取配置参数,配置的 key 这些常量的定义,主要在 core 模块下 config 文件中。

一些重要的环境配置属性的意义,官网都有介绍

在实例化的时候通过 ConfigurationFactory 获取后注入构造函数中的,需要重启才能生效,而在使用时通过 ConfigurationFactory 实时获取的,配置改了就可以生效。

但是 config 模块提供了 ConfigurationChangeListener#onChangeEvent 接口方法来修改实例内部的属性。即在这个方法中,监听动态变化的属性,如果检测到自身使用的属性和刚开始注入时不一样了,就修改实例中保存的属性,和配置中心保持一致,这样就实现了动态配置。

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener {
private volatile boolean disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,false);
@Override public Object invoke(Param param) {
if(disable){//事务业务处理}
}
@Override public void onChangeEvent(Param param) {
disable = param;
}}

上面是 spring 模块下的 GlobalTransactionalInterceptor 与降级属性相关的伪代码。 GlobalTrarnsactionalScanner 在上面的 interceptor 类被实例化时,把 interceptor 注册到了配置变化监听列表中,当配置被改变的时候,会调用监听器:

ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);

降级的意思是,当服务某一项功能不可用的时候,通过动态配置的属性,把某一项功能给关了,这样就可以避免一直尝试失败的处理。interceptor#invoke() 只有当这个 disable 属性为 true 时,才会执行 seata 事务相关业务。

三. 描述配置

一般性框架描述性配置通常信息比较多,甚至有层次关系,用 xml 配置比较方便,因为树结构描述性更强。而现在的习惯都在提倡去繁琐的约束性配置,采用约定的方式。

seata AT 模式是通过代理数据源的方式来进行事务处理,对业务方入侵较小,只需让 seata 在启动时,识别哪些业务方需要开启全局事务,所以用注解就可以实现描述性配置。

@GlobalTransactional(timeoutMills = 300000, name = "busi-doBiz")
public String doBiz(String msg) {}

如果是 tcc 模式,事务参与方还需使用注解标识:

@TwoPhaseBusinessAction(name = "tccActionForSpringTest" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, int i);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);

四 .扩展配置

扩展配置,通常对产品的聚合要求比较高,因为产品需要发现第三方实现,将其加入产品内部。

在这里插入图片描述 这是一个自定义配置中心提供类的例子,在 META-INF/services 下放置一个接口同名的文本文件,文件的内容为接口的实现类。这是标准的 spi 方式。然后修改配置文件 registry.conf 中的 config.type=test 。

但是如果你认为这样就可以被 seata 识别到,并且替换掉配置中心,那你就错了。seata 在加载配置中心的时候,使用 enum ConfigType 包裹了一下配置文件中配置的配置中心的类型的值:

private static Configuration buildConfiguration() {
configTypeName = "test";//registry.conf中配置的config.type
configType = ConfigType.getType(configTypeName);//ConfigType获取不到会抛异常
}

如果在 ConfigType 中没有定义 test 这种配置中心类型,那么会抛异常。所以单纯的修改配置文件而不改变源码是无法使用 ConfigType 中定义的配置中心提供类以外的配置中心提供类。

目前 1.0 版本在 ConfigType 中定义的配置中心类型有:File,ZK,Nacos,Apollo,Consul,Etcd3,SpringCloudConfig,Custom。如果用户想使用自定义的配置中心类型,可以使用 Custom 这种类型。

在这里插入图片描述 这里可以使用不优雅的方式,即提供一个指定名称 ZK 但是级别 order=3 更高的实现类(ZK 默认 order=1),就可以让 ConfigurationFactory 使用 TestConfigurationProvider 作为配置中心提供类。

通过上面的步骤,就可以让 seata 使用我们自己提供的代码。seata 中 codec、compressor、discovery、integration 等模块,都是使用 spi 机制加载功能类,符合微内核 + 插件化,平等对待第三方的设计思想。

五 . seata 源码分析系列地址

作者:赵润泽,系列地址

· 阅读需 4 分钟

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

​ 1.首先来看下包结构,在 seata-dubbo 和 seata-dubbo-alibaba 下有统一由 TransactionPropagationFilter 这个类,分别对应 apache-dubbo 跟 alibaba-dubbo.

20200101203229

分析源码

package io.seata.integration.dubbo;

import io.seata.core.context.RootContext;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100)
public class TransactionPropagationFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//获取本地XID
String xid = RootContext.getXID();
String xidInterceptorType = RootContext.getXIDInterceptorType();
//获取Dubbo隐式传参中的XID
String rpcXid = getRpcXid();
String rpcXidInterceptorType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
//传递XID
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE, xidInterceptorType);
} else {
if (rpcXid != null) {
//绑定XID
RootContext.bind(rpcXid);
RootContext.bindInterceptorType(rpcXidInterceptorType);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] interceptorType[{}] to RootContext", rpcXid, rpcXidInterceptorType);
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
//进行剔除已完成事务的XID
String unbindInterceptorType = RootContext.unbindInterceptorType();
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[{}] interceptorType[{}] from RootContext", unbindXid, unbindInterceptorType);
}
//如果发现解绑的XID并不是当前接收到的XID
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}, xidInterceptorType from {} to {} ", rpcXid, unbindXid, rpcXidInterceptorType, unbindInterceptorType);
if (unbindXid != null) {
//重新绑定XID
RootContext.bind(unbindXid);
RootContext.bindInterceptorType(unbindInterceptorType);
LOGGER.warn("bind [{}] interceptorType[{}] back to RootContext", unbindXid, unbindInterceptorType);
}
}
}
}
}

/**
* get rpc xid
* @return
*/
private String getRpcXid() {
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (rpcXid == null) {
rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());
}
return rpcXid;
}

}

​ 1.根据源码,我们可以推出相应的逻辑处理

20200101213336

要点知识

​ 1.Dubbo @Activate 注解:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* Group过滤条件。
* <br />
* 包含{@link ExtensionLoader#getActivateExtension}的group参数给的值,则返回扩展。
* <br />
* 如没有Group设置,则不过滤。
*/
String[] group() default {};

/**
* Key过滤条件。包含{@link ExtensionLoader#getActivateExtension}的URL的参数Key中有,则返回扩展。
* <p/>
* 示例:<br/>
* 注解的值 <code>@Activate("cache,validatioin")</code>,
* 则{@link ExtensionLoader#getActivateExtension}的URL的参数有<code>cache</code>Key,或是<code>validatioin</code>则返回扩展。
* <br/>
* 如没有设置,则不过滤。
*/
String[] value() default {};

/**
* 排序信息,可以不提供。
*/
String[] before() default {};

/**
* 排序信息,可以不提供。
*/
String[] after() default {};

/**
* 排序信息,可以不提供。
*/
int order() default 0;
}

可以分析得知,Seata 的 dubbo 过滤器上的注解@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100),表示 dubbo 的服务提供方跟消费方都会触发到这个过滤器,所以我们的 Seata 发起者会产生一个 XID 的传递,上述流程图跟代码已经很清晰的表示了.

​ 2.Dubbo 隐式传参可以通过 RpcContext 上的 setAttachmentgetAttachment 在服务消费方和提供方之间进行参数的隐式传递。

获取:RpcContext.getContext().getAttachment(RootContext.KEY_XID);

传递:RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);

总结

更多源码阅读请访问Seata 官网

· 阅读需 11 分钟

一 .导读

spring 模块分析中讲到,Seata 的 spring 模块会对涉及到分布式业务的 bean 进行处理。项目启动时,当 GlobalTransactionalScanner 扫描到 TCC 服务的 reference 时(即tcc事务参与方),会对其进行动态代理,即给 bean 织入 TCC 模式下的 MethodInterceptor 的实现类。tcc 事务发起方依然使用 @GlobalTransactional 注解开启,织入的是通用的 MethodInterceptor 的实现类。

TCC 模式下的 MethodInterceptor 实现类即 TccActionInterceptor(spring模块) ,这个类中调用了 ActionInterceptorHandler(tcc模块) 进行 TCC 模式下事务流程的处理。

TCC 动态代理的主要功能是:生成TCC运行时上下文、透传业务参数、注册分支事务记录。

二 .TCC模式介绍

在2PC(两阶段提交)协议中,事务管理器分两阶段协调资源管理,资源管理器对外提供三个操作,分别是一阶段的准备操作,和二阶段的提交操作和回滚操作。

public interface TccAction {

@TwoPhaseBusinessAction(name = "tccActionForTest" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "a") int a,
@BusinessActionContextParameter(paramName = "b", index = 0) List b,
@BusinessActionContextParameter(isParamInProperty = true) TccParam tccParam);

public boolean commit(BusinessActionContext actionContext);

public boolean rollback(BusinessActionContext actionContext);
}

这是 TCC 参与者实例,参与者需要实现三个方法,第一个参数必须是 BusinessActionContext ,方法返回类型固定,对外发布成微服务,供事务管理器调用。

prepare:资源的检查和预留。例:扣减账户的余额,并增加相同的冻结余额。

commit:使用预留的资源,完成真正的业务操作。例:减少冻结余额,扣减资金业务完成。

cancel:释放预留资源。例:冻结余额加回账户的余额。

其中 BusinessActionContext 封装了本次事务的上下文环境:xid、branchId、actionName 和被 @BusinessActionContextParam 注解的参数等。

参与方业务有几个需要注意的地方: 1.控制业务幂等性,需要支持同一笔事务的重复提交和重复回滚。 2.防悬挂,即二阶段的回滚,比一阶段的 try 先执行。 3.放宽一致性协议,最终一致,所以是读已修改

三 . remoting 包解析

在这里插入图片描述

包中所有的类都是为包中的 DefaultRemotingParser 服务,Dubbo、LocalTCC、SofaRpc 分别负责解析各自RPC协议下的类。

DefaultRemotingParser 的主要方法: 1.判断 bean 是否是 remoting bean,代码:

    @Override
public boolean isRemoting(Object bean, String beanName) throws FrameworkException {
//判断是否是服务调用方或者是否是服务提供方
return isReference(bean, beanName) || isService(bean, beanName);
}

2.远程 bean 解析,把 rpc类 解析成 RemotingDesc,,代码:

@Override
public boolean isRemoting(Object bean, String beanName) throws FrameworkException {
//判断是否是服务调用方或者是否是服务提供方
return isReference(bean, beanName) || isService(bean, beanName);
}

利用 allRemotingParsers 来解析远程 bean 。allRemotingParsers是在:initRemotingParser() 中调用EnhancedServiceLoader.loadAll(RemotingParser.class) 动态进行 RemotingParser 子类的加载,即 SPI 加载机制。

如果想扩展,比如实现一个feign远程调用的解析类,只要把RemotingParser相关实现类写在 SPI 的配置中就可以了,扩展性很强。

RemotingDesc 事务流程需要的远程 bean 的一些具体信息,比如 targetBean、interfaceClass、interfaceClassName、protocol、isReference等等。

3.TCC资源注册

public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName) {
RemotingDesc remotingBeanDesc = getServiceDesc(bean, beanName);
if (remotingBeanDesc == null) {
return null;
}
remotingServiceMap.put(beanName, remotingBeanDesc);

Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if (isService(bean, beanName)) {
try {
//service bean, registry resource
Object targetBean = remotingBeanDesc.getTargetBean();
for (Method m : methods) {
TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
TCCResource tccResource = new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
tccResource.setTargetBean(targetBean);
tccResource.setPrepareMethod(m);
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
tccResource.setCommitMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),
new Class[] {BusinessActionContext.class}));
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
tccResource.setRollbackMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),
new Class[] {BusinessActionContext.class}));
//registry tcc resource
DefaultResourceManager.get().registerResource(tccResource);
}
}
} catch (Throwable t) {
throw new FrameworkException(t, "parser remoting service error");
}
}
if (isReference(bean, beanName)) {
//reference bean, TCC proxy
remotingBeanDesc.setReference(true);
}
return remotingBeanDesc;
}

首先判断是否是事务参与方,如果是,拿到 RemotingDesc 中的 interfaceClass,遍历接口中的方法,判断方法上是否有@TwoParserBusinessAction 注解,如果有,把参数封装成 TCCRecource,通过 DefaultResourceManager 进行 TCC 资源的注册。

这里 DefaultResourceManager 会根据 Resource 的 BranchType 来寻找对应的资源管理器,TCC 模式下资源管理类,在 tcc 模块中。

这个 rpc 解析类主要提供给 spring 模块进行使用。parserRemotingServiceInfo() 被封装到了 spring 模块的 TCCBeanParserUtils 工具类中。spring 模块的 GlobalTransactionScanner 在项目启动的时候,通过工具类解析 TCC bean,工具类 TCCBeanParserUtils 会调用 TCCResourceManager 进行资源的注册,并且如果是全局事务的服务提供者,会织入 TccActionInterceptor 代理。这些个流程是 spring 模块的功能,tcc 模块是提供功能类给 spring 模块使用。

三 .tcc 资源管理器

TCCResourceManager 负责管理 TCC 模式下资源的注册、分支的注册、提交、和回滚。

1.在项目启动时, spring 模块的 GlobalTransactionScanner 扫描到 bean 是 tcc bean 时,会本地缓存资源,并向 server 注册:

    @Override
public void registerResource(Resource resource) {
TCCResource tccResource = (TCCResource)resource;
tccResourceCache.put(tccResource.getResourceId(), tccResource);
super.registerResource(tccResource);
}

与server通信的逻辑被封装在了父类 AbstractResourceManage 中,这里根据 resourceId 对 TCCResource 进行缓存。父类 AbstractResourceManage 注册资源的时候,使用 resourceGroupId + actionName,actionName 就是 @TwoParseBusinessAction 注解中的 name,resourceGroupId 默认是 DEFAULT。

2.事务分支的注册在 rm-datasource 包下的 AbstractResourceManager 中,注册时参数 lockKeys 为 null,和 AT 模式下事务分支的注册还是有些不一样的。

3.分支的提交或者回滚:

    @Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
}
try {
boolean result = false;
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
LOGGER.error(msg, t);
throw new FrameworkException(t, msg);
}
}

通过参数 xid、branchId、resourceId、applicationData 恢复业务的上下文 businessActionContext。

根据获取到的上下文通过反射执行 commit 方法,并返回执行结果。回滚方法类似。

这里 branchCommit() 和 branchRollback() 提供给 rm 模块资源处理的抽象类 AbstractRMHandler 调用,这个 handler 是 core 模块定义的模板方法的进一步实现类。和 registerResource() 不一样,后者是 spring 扫描时主动注册资源。

四 . tcc 模式事务处理

spring 模块中的 TccActionInterceptor 的 invoke() 方法在被代理的 rpc bean 被调用时执行。该方法先获取 rpc 拦截器透传过来的全局事务 xid ,然后 TCC 模式下全局事务参与者的事务流程还是交给 tcc 模块 ActionInterceptorHandler 处理。

也就是说,事务参与者,在项目启动的时候,被代理。真实的业务方法,在 ActionInterceptorHandler 中,通过回调执行。

    public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback) throws Throwable {
Map<String, Object> ret = new HashMap<String, Object>(4);

//TCC name
String actionName = businessAction.name();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
//set action anme
actionContext.setActionName(actionName);

//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);

//set the parameter whose type is BusinessActionContext
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
//the final parameters of the try method
ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
//the final result
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
return ret;
}

这里有两个重要操作:

1.doTccActionLogStore() 这个方法中,调用了两个比较重要的方法: fetchActionRequestContext(method, arguments),这个方法把被 @BusinessActionContextParam 注解的参数取出来,在下面的 init 方法中塞入 BusinessActionComtext ,同时塞入的还有事务相关参数。 DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null),这个方法执行 TCC 模式下事务参与者事务分支的注册。

2.回调执行 targetCallback.execute() ,被代理的 bean 具体的业务,即 prepare() 方法。

五 .总结

tcc模块,主要提供以下功能 :

  1. 定义两阶段协议注解,提供 tcc 模式下事务流程需要的属性。
  2. 提供解析不同 rpc 框架 remoting bean 的 ParserRemoting 实现,供 spring 模块调用。
  3. 提供 TCC 模式下资源管理器,进行资源注册、事务分支注册提交回滚等。
  4. 提供 TCC 模式下事务流程的处理类,让 MethodInterceptor 代理类不执行具体模式的事务流程,而是下放到 tcc 模块。

五 .相关

作者:赵润泽,系列地址

· 阅读需 1 分钟

活动介绍

亮点解读

  • Seata 开源项目发起人带来《Seata 过去、现在和未来》以及 Seata 1.0 的新特性。
  • Seata 核心贡献者详解 Seata AT, TCC, Saga 模式。
  • Seata 落地互联网医疗,滴滴出行实践剖析。

如您不能前来参会

现场福利

  • 讲师 PPT 打包下载
  • 精美茶歇,阿里公仔,天猫精灵等好礼等你来拿

议程

· 阅读需 9 分钟

一 . 导读

core 模块定义了事务的类型、状态,通用的行为,client 和 server 通信时的协议和消息模型,还有异常处理方式,编译、压缩类型方式,配置信息名称,环境 context 等,还基于 netty 封装了 rpc ,供客户端和服务端使用。

按包顺序来分析一下 core 模块主要功能类:

在这里插入图片描述

codec:定义了一个 codec 的工厂类,提供了一个方法,根据序列化类型来找对应的处理类。还提供了一个接口类 Codec ,有两个抽象方法:

<T> byte[] encode(T t);
<T> T decode(byte[] bytes);

目前 1.0 版本在 codec 模块,有三种序列化的实现:SEATA、PROTOBUF、KRYO。

compressor:和 codec 包下面类一样,都是三个类,一个压缩类型类,一个工厂类,一个压缩和解压缩操作的抽象类。1.0 版本就只有一种压缩方式:Gzip

constants:两个 ClientTableColumnsName、ServerTableColumnsName 类,分别是 client 端存储事务的表和 server 端存储事务表对应的 model 类。还有定义支持的数据库类型类和一些定义配置信息属性的前缀的类。

context:环境类 RootContext 持有一个 ThreadLocalContextCore 用来存储事务的标识信息。比如 TX_XID 用来唯一的表示一个事务。TX_LOCK 如果存在,则表示本地事务对于 update/delete/insert/selectForUpdate SQL 需要用全局锁控制。

event:这里用到了 guava 中 EventBus 事件总线来进行注册和通知,监听器模式。在 server 模块的 metrics 包中,MetricsManager 在初始化的时候,对 GlobalStatus 即 server 模块处理事务的几个状态变化时,注册了监挺事件,当 server 处理事务时,会回调监听的方法,主要是为了进行统计各种状态事务的数量。

lock: server 在收到 registerBranch 消息进行分支注册的时候,会加锁。1.0 版本有两种锁的实现,DataBaseLocker 和 MemoryLocker,分别是数据库锁和内存锁,数据库锁根据 rowKey = resourceId + tableName + pk 进行加锁,内存锁直接就是根据 primary key。

model:BranchStatus、GlobalStatus、BranchType 用来定义事务的类型和全局、分支状态。还有 TransactionManager 和 ResourceManager,是 rm 和 tm 的抽象类。具体的 rm 和 tm 的实现,因为各种事务类型都不同,所以这里没有具体的实现类。

protocol:定义了 rpc 模块传输用的实体类,即每个事务状态场景下 request 和 response 的 model。

store:定了与数据库打交道的数据模型,和与数据库交互的语句。

二 . exception 包中 handler 类分析

这是 AbstractExceptionHandler 的 UML 图,Callback 、AbstractCallback 是 AbstractExceptionHandler 的内部接口和内部类,AbstractCallback 抽象类实现了接口 Callback 的三个方法,还有一个 execute() 未实现。AbstractExceptionHandler 使用了 AbstractCallback 作为模板方法的参数,并使用了其实现的三个方法,但是 execute() 方法仍留给子类实现。 在这里插入图片描述 从对外暴露的角度看 AbstractExceptionHandler 定义了一个带有异常处理的模板方法,模板中有四个行为,在不同的情况下执行,其中三种行为已经实现,执行的行为交由子类自行实现,详解:

1.使用模板方法模式,在 exceptionHandlerTemplate() 中,定义好了执行的模板

    public void exceptionHandleTemplate(Callback callback, AbstractTransactionRequest request,
AbstractTransactionResponse response) {
try {
callback.execute(request, response); //执行事务业务的方法
callback.onSuccess(request, response); //设置response返回码
} catch (TransactionException tex) {
LOGGER.error("Catch TransactionException while do RPC, request: {}", request, tex);
callback.onTransactionException(request, response, tex); //设置response返回码并设置msg
} catch (RuntimeException rex) {
LOGGER.error("Catch RuntimeException while do RPC, request: {}", request, rex);
callback.onException(request, response, rex); //设置response返回码并设置msg
}
}

onSuccess、onTransactionException、onException 在 AbstarctCallback 中已经被实现,execute 则由 AbstractExceptionHandler 子类即负责不同事务模式的 handler 类进行实现。 AbstractExceptionHandler 目前有两个子类:AbstractTCInboundHandler 负责处理全局事务的业务,AbstractRMHandler 负责处理分支事务的业务。

2.使用回调机制,优点是:允许 AbstractExceptionHandler 把需要调用的类 Callback 作为参数传递进来,handler 不需要知道 callback 的具体执行逻辑,只要知道 callback 的特性原型和限制条件(参数、返回值),就可以使用了。

先使用模板方法,把事务业务流程定下来,再通过回调,把具体执行事务业务的方法,留给子类实现。设计的非常巧妙。

这个 exceptionHandlerTemplate() 应该翻译成带有异常处理的模板方法。异常处理已经被抽象类实现,具体的不同模式下 commit 、rollback 的业务处理则交给子类实现。

三 . rpc 包分析

seata 对于 rpc 的封装,细节不需要纠结,可以研究一下一下对于事务业务的处理。

client 端的 rpc 类是 AbstractRpcRemotingClient: 在这里插入图片描述

重要的属性和方法都在类图中,消息发送和初始化方法没画在类图中,详细分析一下类图:

clientBootstrap:是 netty 启动类 Bootstrap 的封装类,持有了 Bootstrap 的实例,并自定义自己想要的属性。

clientChannelManager:使用 ConcurrentHashMap<serverAddress,channel> 容器维护地址和 channel 的对应关系。

clientMessageListener: 消息的处理类,根据消息的类型的不同有三种具体的处理方法

public void onMessage(RpcMessage request, String serverAddress, ClientMessageSender sender) {
Object msg = request.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("onMessage:" + msg);
}
if (msg instanceof BranchCommitRequest) {
handleBranchCommit(request, serverAddress, (BranchCommitRequest)msg, sender);
} else if (msg instanceof BranchRollbackRequest) {
handleBranchRollback(request, serverAddress, (BranchRollbackRequest)msg, sender);
} else if (msg instanceof UndoLogDeleteRequest) {
handleUndoLogDelete((UndoLogDeleteRequest)msg);
}
}

消息类中,持有 TransactionMessageHandler 对不同类型消息进行处理,最终会根据事务类型的不同(AT、TCC、SAGE)调用具体的处理类,即第二部分说的 exceptionHandleTemplate() 的实现类。

mergeSendExecutorService:是一个线程池,只有一个线程,负责对不同地址下的消息进行和并发送。在 sendAsyncRequest() 中,会给线程池的队列 LinkedBlockingQueue<> offer 消息,然后这个线程负责 poll 和处理消息。

channelRead():处理服务端的 HeartbeatMessage.PONG 心跳消息。还有消息类型是 MergeResultMessage 即异步消息的响应消息,根据 msgId 找到对应 MessageFuture ,并设置异步消息的 result 结果。

dispatch():调用 clientMessageListener 处理 server 发送过来的消息,不同类型 request 有不同的处理类。

简单点看 netty,只需要关注序列化方式和消息处理 handler 类。seata 的 rpc 序列化方式通过工厂类找 Codec 实现类进行处理,handler 即上文说的 TransactionMessageHandler 。

四 . 总结

core 模块涉及的功能很多,其中的类大多都是其他模块的抽象类。抽象出业务模型,具体的实现分布在不同的模块。core 模块的代码非常的优秀,很多设计都是经典,比如上文分析的基于模板模式改造的,非常实用也非常美,值得仔细研究。

五 . seata 源码分析系列地址

系列地址

· 阅读需 5 分钟

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

通过GA大会上滴滴出行的高级研发工程陈鹏志的在滴滴两轮车业务中的实践,发现动态降级的必要性是非常的高,所以这边简单利用spring boot aop来简单的处理降级相关的处理,这边非常感谢陈鹏志的分享!

可利用此demo项目地址

通过以下代码改造实践.

准备工作

​ 1.创建测试用的TestAspect:

package org.test.config;

import java.lang.reflect.Method;

import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.tm.api.GlobalTransaction;
import io.seata.tm.api.GlobalTransactionContext;

@Aspect
@Component
public class TestAspect {
private final static Logger logger = LoggerFactory.getLogger(TestAspect.class);

@Before("execution(* org.test.service.*.*(..))")
public void before(JoinPoint joinPoint) throws TransactionException {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
logger.info("拦截到需要分布式事务的方法," + method.getName());
// 此处可用redis或者定时任务来获取一个key判断是否需要关闭分布式事务
// 模拟动态关闭分布式事务
if ((int)(Math.random() * 100) % 2 == 0) {
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
tx.begin(300000, "test-client");
} else {
logger.info("关闭分布式事务");
}
}

@AfterThrowing(throwing = "e", pointcut = "execution(* org.test.service.*.*(..))")
public void doRecoveryActions(Throwable e) throws TransactionException {
logger.info("方法执行异常:{}", e.getMessage());
if (!StringUtils.isBlank(RootContext.getXID()))
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
}

@AfterReturning(value = "execution(* org.test.service.*.*(..))", returning = "result")
public void afterReturning(JoinPoint point, Object result) throws TransactionException {
logger.info("方法执行结束:{}", result);
if ((Boolean)result) {
if (!StringUtils.isBlank(RootContext.getXID())) {
logger.info("分布式事务Id:{}", RootContext.getXID());
GlobalTransactionContext.reload(RootContext.getXID()).commit();
}
}
}

}

请注意上面的包名可改为你自己的service包名:

​ 2.改动service代码:

    public Object seataCommit() {
testService.Commit();
return true;
}

因为异常跟返回结果我们都会拦截,所以这边可以trycatch或者直接让他抛异常来拦截也行,或者直接判断返回结果,比如你的业务代码code=200为成功,那么就commit,反之在拦截返回值那段代码加上rollback;

进行调试

​ 1.更改代码主动抛出异常

    public Object seataCommit() {
try {
testService.Commit();
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException();
}
}

​ 查看日志:

2019-12-23 11:57:55.386  INFO 23952 --- [.0-28888-exec-7] org.test.controller.TestController       : 拦截到需要分布式事务的方法,seataCommit
2019-12-23 11:57:55.489 INFO 23952 --- [.0-28888-exec-7] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.14.67:8092:2030765910]
2019-12-23 11:57:55.489 INFO 23952 --- [.0-28888-exec-7] org.test.controller.TestController : 创建分布式事务完毕192.168.14.67:8092:2030765910
2019-12-23 11:57:55.709 INFO 23952 --- [.0-28888-exec-7] org.test.controller.TestController : 方法执行异常:null
2019-12-23 11:57:55.885 INFO 23952 --- [.0-28888-exec-7] i.seata.tm.api.DefaultGlobalTransaction : [192.168.14.67:8092:2030765910] rollback status: Rollbacked
2019-12-23 11:57:55.888 ERROR 23952 --- [.0-28888-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException] with root cause

​ 可以看到已被拦截也触发了rollback了.

​ 2.恢复代码调试正常情况:

    public Object seataCommit() {
testService.Commit();
return true;
}

​ 查看日志:

2019-12-23 12:00:20.876  INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController       : 拦截到需要分布式事务的方法,seataCommit
2019-12-23 12:00:20.919 INFO 23952 --- [.0-28888-exec-2] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.14.67:8092:2030765926]
2019-12-23 12:00:20.920 INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController : 创建分布式事务完毕192.168.14.67:8092:2030765926
2019-12-23 12:00:21.078 INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController : 方法执行结束:true
2019-12-23 12:00:21.078 INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController : 分布式事务Id:192.168.14.67:8092:2030765926
2019-12-23 12:00:21.213 INFO 23952 --- [.0-28888-exec-2] i.seata.tm.api.DefaultGlobalTransaction : [192.168.14.67:8092:2030765926] commit status: Committed

​ 可以看到事务已经被提交了.

总结

更详细的内容希望希望大家访问以下地址阅读详细文档

nacos官网

dubbo官网

seata官网

docker官网

· 阅读需 8 分钟

Seata 的动态降级需要结合配置中心的动态配置订阅功能。动态配置订阅,即通过配置中心监听订阅,根据需要读取已更新的缓存值,ZK、Apollo、Nacos 等第三方配置中心都有现成的监听器可实现动态刷新配置;动态降级,即通过动态更新指定配置参数值,使得 Seata 能够在运行过程中动态控制全局事务失效(目前只有 AT 模式有这个功能)。

那么 Seata 支持的多个配置中心是如何适配不同的动态配置订阅以及如何实现降级的呢?下面从源码的层面详细给大家讲解一番。

动态配置订阅

Seata 配置中心有一个监听器基准接口,它主要有一个抽象方法和 default 方法,如下:

io.seata.config.ConfigurationChangeListener

该监听器基准接口主要有两个实现类型:

  1. 实现注册配置订阅事件监听器:用于实现各种功能的动态配置订阅,比如 GlobalTransactionalInterceptor 实现了 ConfigurationChangeListener,根据动态配置订阅实现的动态降级功能;
  2. 实现配置中心动态订阅功能与适配:对于目前还没有动态订阅功能的 file 类型默认配置中心,可以实现该基准接口来实现动态配置订阅功能;对于阻塞订阅需要另起一个线程去执行,这时候可以实现该基准接口进行适配,还可以复用该基准接口的线程池;以及还有异步订阅,有订阅单个 key,有订阅多个 key 等等,我们都可以实现该基准接口以适配各个配置中心。

Nacos 动态订阅实现

Nacos 有自己内部实现的监听器,因此直接直接继承它内部抽象监听器 AbstractSharedListener,实现如下:

如上,

  • dataId:为订阅的配置属性;
  • listener:配置订阅事件监听器,用于将外部传入的 listener 作为一个 wrapper,执行真正的变更逻辑。

值得一提的是,nacos 并没有使用 ConfigurationChangeListener 实现自己的监听配置,一方面是因为 Nacos 本身已有监听订阅功能,不需要自己再去实现;另一方面因为 nacos 属于非阻塞式订阅,不需要复用 ConfigurationChangeListener 的线程池,即无需进行适配。

添加订阅:

Nacos 配置中心为某个 dataId 添加订阅的逻辑很简单,用 dataId 和 listener 创建一个 NacosListener 调用 configService#addListener 方法,把 NacosListener 作为 dataId 的监听器,dataId 就实现了动态配置订阅功能。

file 动态订阅实现

以它的实现类 FileListener 举例子,它的实现逻辑如下:

如上,

  • dataId:为订阅的配置属性;

  • listener:配置订阅事件监听器,用于将外部传入的 listener 作为一个 wrapper,执行真正的变更逻辑,这里特别需要注意的是,该监听器与 FileListener 同样实现了 ConfigurationChangeListener 接口,只不过 FileListener 是用于给 file 提供动态配置订阅功能,而 listener 用于执行配置订阅事件

  • executor:用于处理配置变更逻辑的线程池,在 ConfigurationChangeListener#onProcessEvent 方法中用到。

FileListener#onChangeEvent 方法的实现让 file 具备了动态配置订阅的功能,它的逻辑如下:

无限循环获取订阅的配置属性当前的值,从缓存中获取旧的值,判断是否有变更,如果有变更就执行外部传入 listener 的逻辑。

ConfigurationChangeEvent 用于保存配置变更的事件类,它的成员属性如下:

这里的 getConfig 方法是如何感知 file 配置的变更呢?我们点进去,发现它最终的逻辑如下:

发现它是创建一个 future 类,然后包装成一个 Runnable 放入线程池中异步执行,最后调用 get 方法阻塞获取值,那么我们继续往下看:

allowDynamicRefresh:动态刷新配置开关;

targetFileLastModified:file 最后更改的时间缓存。

以上逻辑:

获取 file 最后更新的时间值 tempLastModified,然后对比对比缓存值 targetFileLastModified,如果 tempLastModified > targetFileLastModified,说明期间配置有更改过,这时就重新加载 file 实例,替换掉旧的 fileConfig,使得后面的操作能够获取到最新的配置值。

添加一个配置属性监听器的逻辑如下:

configListenersMap 为 FileConfiguration 的配置监听器缓存,它的数据结构如下:

ConcurrentMap<String/*dataId*/, Set<ConfigurationChangeListener>> configListenersMap

从数据结构上可看出,每个配置属性可关联多个事件监听器。

最终执行 onProcessEvent 方法,这个是监听器基准接口里面的 default 方法,它会调用 onChangeEvent 方法,即最终会调用 FileListener 中的实现。

动态降级

有了以上的动态配置订阅功能,我们只需要实现 ConfigurationChangeListener 监听器,就可以做各种各种的功能,目前 Seata 只有动态降级有用到动态配置订阅的功能。

在「Seata AT 模式启动源码分析」这篇文章中讲到,Spring 集成 Seata 的项目中,在 AT 模式启动时,会用 用GlobalTransactionalInterceptor 代替了被 GlobalTransactional 和 GlobalLock 注解的方法,GlobalTransactionalInterceptor 实现了 MethodInterceptor,最终会执行 invoker 方法,那么想要实现动态降级,就可以在这里做手脚。

  • 在 GlobalTransactionalInterceptor 中加入一个成员变量:
private volatile boolean disable; 

在构造函数中进行初始化赋值:

ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION(service.disableGlobalTransaction)这个参数目前有两个功能:

  1. 在启动时决定是否开启全局事务;
  2. 在开启全局事务后,决定是否降级。
  • 实现 ConfigurationChangeListener:

这里的逻辑简单,就是判断监听事件是否属于 ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION 配置属性,如果是,直接更新 disable 值。

  • 接下来在 GlobalTransactionalInterceptor#invoke 中做点手脚

如上,disable = true 时,不执行全局事务与全局锁。

  • 配置中心订阅降级监听器

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

在 Spring AOP 进行 wrap 逻辑过程中,当前配置中心将订阅降级事件监听器。

作者简介

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 8 分钟

Seata 可以支持多个第三方配置中心,那么 Seata 是如何同时兼容那么多个配置中心的呢?下面我给大家详细介绍下 Seata 配置中心的实现原理。

配置中心属性加载

在 Seata 配置中心,有两个默认的配置文件:

file.conf 是默认的配置属性,registry.conf 主要存储第三方注册中心与配置中心的信息,主要有两大块:

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
# ...
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
}
file {
name = "file.conf"
}
# ...
}

其中 registry 为注册中心的配置属性,这里先不讲,config 为配置中心的属性值,默认为 file 类型,即会加载本地的 file.conf 里面的属性,如果 type 为其它类型,那么会从第三方配置中心加载配置属性值。

在 config 模块的 core 目录中,有个配置工厂类 ConfigurationFactory,它的结构如下:

可以看到都是一些配置的静态常量:

REGISTRY_CONF_PREFIX、REGISTRY_CONF_SUFFIX:配置文件名、默认配置文件类型;

SYSTEM_PROPERTY_SEATA_CONFIG_NAME、ENV_SEATA_CONFIG_NAME、ENV_SYSTEM_KEY、ENV_PROPERTY_KEY:自定义文件名配置变量,也说明我们可以自定义配置中心的属性文件。

ConfigurationFactory 里面有一处静态代码块,如下:

io.seata.config.ConfigurationFactory

根据自定义文件名配置变量找出配置文件名称与类型,如果没有配置,默认使用 registry.conf,FileConfiguration 是 Seata 默认的配置实现类,如果为默认值,则会更具 registry.conf 配置文件生成 FileConfiguration 默认配置对象,这里也可以利用 SPI 机制支持第三方扩展配置实现,具体实现是继承 ExtConfigurationProvider 接口,在META-INF/services/创建一个文件并填写实现类的全路径名,如下所示:

第三方配置中心实现类加载

在静态代码块逻辑加载完配置中心属性之后,Seata 是如何选择配置中心并获取配置中心的属性值的呢?

我们刚刚也说了 FileConfiguration 是 Seata 的默认配置实现类,它继承了 AbstractConfiguration,它的基类为 Configuration,提供了获取参数值的方法:

short getShort(String dataId, int defaultValue, long timeoutMills);
int getInt(String dataId, int defaultValue, long timeoutMills);
long getLong(String dataId, long defaultValue, long timeoutMills);
// ....

那么意味着只需要第三方配置中心实现该接口,就可以整合到 Seata 配置中心了,下面我拿 zk 来做例子:

首先,第三方配置中心需要实现一个 Provider 类:

实现的 provider 方法如其名,主要是输出具体的 Configuration 实现类。

那么我们是如何获取根据配置去获取对应的第三方配置中心实现类呢?

在 Seata 项目中,获取一个第三方配置中心实现类通常是这么做的:

Configuration CONFIG = ConfigurationFactory.getInstance();

在 getInstance() 方法中主要是使用了单例模式构造配置实现类,它的构造具体实现如下:

io.seata.config.ConfigurationFactory#buildConfiguration:

首先从 ConfigurationFactory 中的静态代码块根据 registry.conf 创建的 CURRENT_FILE_INSTANCE 中获取当前环境使用的配置中心,默认为为 File 类型,我们也可以在 registry.conf 配置其它第三方配置中心,这里也是利用了 SPI 机制去加载第三方配置中心的实现类,具体实现如下:

如上,即是刚刚我所说的 ZookeeperConfigurationProvider 配置实现输出类,我们再来看看这行代码:

EnhancedServiceLoader.load(ConfigurationProvider.class,Objects.requireNonNull(configType).name()).provide();

EnhancedServiceLoader 是 Seata SPI 实现核心类,这行代码会加载 META-INF/services/META-INF/seata/目录中文件填写的类名,那么如果其中有多个配置中心实现类都被加载了怎么办呢?

我们注意到 ZookeeperConfigurationProvider 类的上面有一个注解:

@LoadLevel(name = "ZK", order = 1)

在加载多个配置中心实现类时,会根据 order 进行排序:

io.seata.common.loader.EnhancedServiceLoader#findAllExtensionClass:

io.seata.common.loader.EnhancedServiceLoader#loadFile:

这样,就不会产生冲突了。

但是我们发现 Seata 还可以用这个方法进行选择,Seata 在调用 load 方法时,还传了一个参数:

Objects.requireNonNull(configType).name()

ConfigType 为配置中心类型,是个枚举类:

public enum ConfigType {
File, ZK, Nacos, Apollo, Consul, Etcd3, SpringCloudConfig, Custom;
}

我们注意到,LoadLevel 注解上还有一个 name 属性,在进行筛选实现类时,Seata 还做了这个操作:

根据当前 configType 来判断是否等于 LoadLevel 的 name 属性,如果相等,那么就是当前配置的第三方配置中心实现类。

第三方配置中心实现类

ZookeeperConfiguration 继承了 AbstractConfiguration,它的构造方法如下:

构造方法创建了一个 zkClient 对象,这里的 FILE_CONFIG 是什么呢?

private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;

原来就是刚刚静态代码块中创建的 registry.conf 配置实现类,从该配置实现类拿到第三方配置中心的相关属性,构造第三方配置中心客户端,然后实现 Configuration 接口时:

就可以利用客户端相关方法去第三方配置获取对应的参数值了。

第三方配置中心配置同步脚本

上周末才写好,已经提交 PR 上去了,还处于 review 中,预估会在 Seata 1.0 版本提供给大家使用,敬请期待。

具体位置在 Seata 项目的 script 目录中:

config.txt 为本地配置好的值,搭建好第三方配置中心之后,运行脚本会将 config.txt 的配置同步到第三方配置中心。

作者简介

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。