跳到主要内容

· 阅读需 12 分钟

运行所使用的 demo项目地址

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

直连方式的 Seata 配置博客

Seata 整合 Nacos 配置博客

我们接着前几篇篇的基础上去配置 nacos 做配置中心跟 dubbo 注册中心.

准备工作

​ 1.安装 docker

yum -y install docker

​ 2.创建 nacos 与 seata 的数据库

/******************************************/
/* 数据库全名 = nacos */
/* 表名称 = config_info */
/******************************************/
CREATE TABLE `config_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(255) DEFAULT NULL,
`content` longtext NOT NULL COMMENT 'content',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(20) DEFAULT NULL COMMENT 'source ip',
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
`c_desc` varchar(256) DEFAULT NULL,
`c_use` varchar(64) DEFAULT NULL,
`effect` varchar(64) DEFAULT NULL,
`type` varchar(64) DEFAULT NULL,
`c_schema` text,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfo_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_aggr */
/******************************************/
CREATE TABLE `config_info_aggr` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(255) NOT NULL COMMENT 'group_id',
`datum_id` varchar(255) NOT NULL COMMENT 'datum_id',
`content` longtext NOT NULL COMMENT '内容',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfoaggr_datagrouptenantdatum` (`data_id`,`group_id`,`tenant_id`,`datum_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='增加租户字段';


/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_beta */
/******************************************/
CREATE TABLE `config_info_beta` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL COMMENT 'content',
`beta_ips` varchar(1024) DEFAULT NULL COMMENT 'betaIps',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(20) DEFAULT NULL COMMENT 'source ip',
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfobeta_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_beta';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_tag */
/******************************************/
CREATE TABLE `config_info_tag` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id',
`tag_id` varchar(128) NOT NULL COMMENT 'tag_id',
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL COMMENT 'content',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(20) DEFAULT NULL COMMENT 'source ip',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfotag_datagrouptenanttag` (`data_id`,`group_id`,`tenant_id`,`tag_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_tag';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_tags_relation */
/******************************************/
CREATE TABLE `config_tags_relation` (
`id` bigint(20) NOT NULL COMMENT 'id',
`tag_name` varchar(128) NOT NULL COMMENT 'tag_name',
`tag_type` varchar(64) DEFAULT NULL COMMENT 'tag_type',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id',
`nid` bigint(20) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`nid`),
UNIQUE KEY `uk_configtagrelation_configidtag` (`id`,`tag_name`,`tag_type`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_tag_relation';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = group_capacity */
/******************************************/
CREATE TABLE `group_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`group_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Group ID,空字符表示整个集群',
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '使用量',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数,,0表示使用默认值',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_group_id` (`group_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='集群、各Group容量信息表';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = his_config_info */
/******************************************/
CREATE TABLE `his_config_info` (
`id` bigint(64) unsigned NOT NULL,
`nid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(128) NOT NULL,
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL,
`md5` varchar(32) DEFAULT NULL,
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`src_user` text,
`src_ip` varchar(20) DEFAULT NULL,
`op_type` char(10) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`nid`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_did` (`data_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='多租户改造';


/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = tenant_capacity */
/******************************************/
CREATE TABLE `tenant_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`tenant_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Tenant ID',
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '使用量',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='租户容量信息表';


CREATE TABLE `tenant_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`kp` varchar(128) NOT NULL COMMENT 'kp',
`tenant_id` varchar(128) default '' COMMENT 'tenant_id',
`tenant_name` varchar(128) default '' COMMENT 'tenant_name',
`tenant_desc` varchar(256) DEFAULT NULL COMMENT 'tenant_desc',
`create_source` varchar(32) DEFAULT NULL COMMENT 'create_source',
`gmt_create` bigint(20) NOT NULL COMMENT '创建时间',
`gmt_modified` bigint(20) NOT NULL COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_info_kptenantid` (`kp`,`tenant_id`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='tenant_info';

CREATE TABLE users (
username varchar(50) NOT NULL PRIMARY KEY,
password varchar(500) NOT NULL,
enabled boolean NOT NULL
);

CREATE TABLE roles (
username varchar(50) NOT NULL,
role varchar(50) NOT NULL
);

INSERT INTO users (username, password, enabled) VALUES ('nacos', '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu', TRUE);

INSERT INTO roles (username, role) VALUES ('nacos', 'ROLE_ADMIN');

-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

​ 3.拉取 nacos 以及 seata 镜像并运行

docker run -d --name nacos -p 8848:8848 -e MODE=standalone -e MYSQL_MASTER_SERVICE_HOST=你的mysql所在ip -e MYSQL_MASTER_SERVICE_DB_NAME=nacos -e MYSQL_MASTER_SERVICE_USER=root -e MYSQL_MASTER_SERVICE_PASSWORD=mysql密码 -e MYSQL_SLAVE_SERVICE_HOST=你的mysql所在ip -e SPRING_DATASOURCE_PLATFORM=mysql -e MYSQL_DATABASE_NUM=1 nacos/nacos-server:latest
docker run -d --name seata -p 8091:8091 -e SEATA_IP=你想指定的ip -e SEATA_PORT=8091 seataio/seata-server:1.4.2

Seata 配置

​ 1.由于 seata 容器内没有内置 vim,我们可以直接将要文件夹 cp 到宿主机外来编辑好了,再 cp 回去

docker cp 容器id:seata-server/resources 你想放置的目录

​ 2.使用如下代码获取两个容器的 ip 地址

docker inspect --format='{{.NetworkSettings.IPAddress}}' ID/NAMES

​ 3.nacos-config.txt 编辑为如下内容

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.你的事务组名=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://你的mysql所在ip:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=mysql账号
store.db.password=mysql密码
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

详细参数配置请点此处

​ 4.registry.conf 编辑为如下内容

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"

nacos {
serverAddr = "nacos容器ip:8848"
namespace = ""
cluster = "default"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"

nacos {
serverAddr = "nacos容器ip:8848"
namespace = ""
}
}

​ 5.配置完成后使用如下命令,把修改完成的 registry.conf 复制到容器中,并重启查看日志运行

docker cp /home/seata/resources/registry.conf seata:seata-server/resources/
docker restart seata
docker logs -f seata

​ 6.运行 nacos-config.sh 导入 Nacos 配置

eg: sh ${SEATAPATH}/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u username -w password

具体参数释义参考:配置导入说明

​ 7.登录 nacos 控制中心查看

20191202205912

​ 如图所示便是成功了.

进行调试

​ 1.拉取博文中所示的项目,修改 test-service 的 application.yml 与 registry.conf

registry {
type = "nacos"
nacos {
serverAddr = "宿主机ip:8848"
namespace = ""
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "宿主机ip:8848"
namespace = ""
cluster = "default"
}
}

server:
port: 38888
spring:
application:
name: test-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://mysqlip:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
dubbo:
protocol:
threadpool: cached
scan:
base-packages: com.example
application:
qos-enable: false
name: testserver
registry:
id: my-registry
address: nacos://宿主机ip:8848
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config:
db-config:
field-strategy: not-empty
id-type: auto
db-type: mysql
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
auto-mapping-unknown-column-behavior: none

​ 2.把修改完成的 registry.conf 复制到 test-client-resources 中,并修改 application

spring:
application:
name: test
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://mysqlIp:3306/test?userSSL=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai
username: root
password: 123456
mvc:
servlet:
load-on-startup: 1
http:
encoding:
force: true
charset: utf-8
enabled: true
multipart:
max-file-size: 10MB
max-request-size: 10MB
dubbo:
registry:
id: my-registry
address: nacos://宿主机ip:8848
application:
name: dubbo-demo-client
qos-enable: false
server:
port: 28888
max-http-header-size: 8192
address: 0.0.0.0
tomcat:
max-http-post-size: 104857600

​ 4. 在每个所涉及的 db 执行 undo_log 脚本.

CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

​ 5.依次运行 test-service,test-client.

​ 6.查看 nacos 中服务列表是否如下图所示

20191203132351

总结

关于 nacos 与 seata 的 docker 部署已经完成了,更详细的内容希望希望大家访问以下地址阅读详细文档

nacos 官网

dubbo 官网

seata 官网

docker 官网

· 阅读需 7 分钟

项目地址

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

上次发布了直连方式的seata配置,详细可以看这篇博客

我们接着上一篇的基础上去配置nacos做配置中心跟dubbo注册中心.

准备工作

​ 1.首先去nacos的github上下载最新版本

​ 2.下载好了后,很简单,解压后到bin目录下去启动就好了,看到如图所示就成了:

​ 3.启动完毕后访问:http://127.0.0.1:8848/nacos/#/login

是不是看到这样的界面了?输入nacos(账号密码相同),先进去看看吧.

这时候可以发现没有任何服务注册

20191202204147

别急我们马上让seata服务连接进来.

Seata配置

​ 1.进入seata的conf文件夹看到这个木有?

就是它,编辑它:

20191202204353

20191202204437

​ 2.然后记得保存哦!接着我们把registry.conf文件打开编辑:

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}

都编辑好了后,我们运行nacos-config.sh,这时候我们配置的nacos-config.txt的内容已经被发送到nacos中了详细如图:

20191202205743

出现以上类似的代码就是说明成功了,接着我们登录nacos配置中心,查看配置列表,出现如图列表说明配置成功了:

20191202205912

看到了吧,你的配置已经全部都提交上去了,如果再git工具内运行sh不行的话,试着把编辑sh文件,试试改成如下操作

for line in $(cat nacos-config.txt)

do

key=${line%%=*}
value=${line#*=}
echo "\r\n set "${key}" = "${value}

result=`curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=$key&group=SEATA_GROUP&content=$value"`

if [ "$result"x == "true"x ]; then

echo "\033[42;37m $result \033[0m"

else

echo "\033[41;37 $result \033[0m"
let error++

fi

done


if [ $error -eq 0 ]; then

echo "\r\n\033[42;37m init nacos config finished, please start seata-server. \033[0m"

else

echo "\r\n\033[41;33m init nacos config fail. \033[0m"

fi

​ 3.目前我们的准备工作全部完成,我们去seata-service/bin去运行seata服务吧,如图所示就成功啦!

20191202210112

进行调试

​ 1.首先把springboot-dubbo-mybatsiplus-seata项目的pom的依赖更改,去除掉zk这些配置,因为我们使用nacos做注册中心了.

	<properties>
<webVersion>3.1</webVersion>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<HikariCP.version>3.2.0</HikariCP.version>
<mybatis-plus-boot-starter.version>3.2.0</mybatis-plus-boot-starter.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-nacos</artifactId>
<version>2.7.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<!-- <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId>
<version>7.0</version> <scope>provided</scope> </dependency> -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>

<!-- mybatis-plus begin -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<!-- mybatis-plus end -->
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0.1</version>
</dependency>
<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>2.5.4</version> </dependency> -->

<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId>
<version>3.1.0</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.freemarker/freemarker -->
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency>
<!-- 加上这个才能辨认到log4j2.yml文件 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency> <!-- 引入log4j2依赖 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId>
<version>2.11.0</version> </dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

​ 2.然后更改test-service的目录结构,删除zk的配置并更改application.yml文件,目录结构与代码:

server:
port: 38888
spring:
application:
name: test-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
dubbo:
protocol:
loadbalance: leastactive
threadpool: cached
scan:
base-packages: org。test.service
application:
qos-enable: false
name: testserver
registry:
id: my-registry
address: nacos://127.0.0.1:8848
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config:
db-config:
field-strategy: not-empty
id-type: auto
db-type: mysql
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
auto-mapping-unknown-column-behavior: none
20191202211833

​ 3.再更改registry.conf文件,如果你的nacos是其它服务器,请改成对应都ip跟端口

registry {
type = "nacos"
file {
name = "file.conf"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
}
config {
type = "nacos"
file {
name = "file.conf"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
}

​ 4.接着我们运行provideApplication

20191202212000

启动成功啦,我们再去看seata的日志:

20191202212028

成功了,这下我们一样,去修改test-client的内容,首先一样application.yml,把zk换成nacos,这里就不详细描述了,把test-service内的registry.conf,复制到client项目的resources中覆盖原来的registry.conf.

然后我们可以运行clientApplication:

20191202212114

​ 5.确认服务已经被发布并测试事务运行是否正常

20191202212203

服务成功发布出来,也被成功消费了.这下我们再去swagger中去测试回滚是否一切正常,访问http://127.0.0.1:28888/swagger-ui.html

20191202212240

恭喜你,看到这一定跟我一样成功了!

总结

关于nacos的使用跟seata的简单搭建已经完成了,更详细的内容希望希望大家访问以下地址阅读详细文档

nacos官网

dubbo官网

seata官网

· 阅读需 2 分钟

活动介绍

亮点解读

分享嘉宾

  • 季敏(清铭) 《Seata 的过去、现在和未来》 slides

  • 吴江坷《我与SEATA的开源之路以及SEATA在互联网医疗系统中的应用》 slides

    1577282651

  • 申海强(煊檍)《带你读透 Seata AT 模式的精髓》 slides

    1577282652

  • 张森 《分布式事务Seata之TCC模式详解》

    1577282653

  • 陈龙(屹远)《Seata 长事务解决方案 Saga 模式》

    1577282654

  • 陈鹏志《Seata%20在滴滴两轮车业务的实践》 slides

    1577282655

特别嘉奖

· 阅读需 11 分钟

项目地址:https://gitee.com/itCjb/springboot-dubbo-mybatisplus-seata

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

介绍

Mybatis-Plus:MyBatis-Plus(简称 MP)是一个 MyBatis 的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开发、提高效率而生。

MP配置:

<bean id="sqlSessionFactory" class="com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
</bean>

Seata:Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

AT模式机制:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
  • 二阶段:
    • 提交异步化,非常快速地完成。
    • 回滚通过一阶段的回滚日志进行反向补偿。

分析原因

​ 1.首先我们通过介绍,可以看到,mp是需要注册sqlSessionFactory,注入数据源,而Seata是通过代理数据源来保证事务的正常回滚跟提交。

​ 2.我们来看基于seata的官方demo提供的SeataAutoConfig的代码

package org.test.config;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;

import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties;
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);

@Bean(name = "dataSource") // 声明其为Bean实例
@Primary // 在同样的DataSource中,首先使用被标注的DataSource
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}",dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("装载dataSource........");
return druidDataSource;
}

/**
* init datasource proxy
*
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
logger.info("代理dataSource........");
return new DataSourceProxy(dataSource);
}

@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
factory.setDataSource(dataSourceProxy);
factory.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath*:/mapper/*.xml"));
return factory.getObject();
}

/**
* init global transaction scanner
*
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("配置seata........");
return new GlobalTransactionScanner("test-service", "test-group");
}
}

首先看到我们的seata配置数据源的类里,我们配置了一个数据源,然后又配置了一个seata代理datasource的bean,这时候.

然后我们如果直接启动mp整合seata的项目会发现,分页之类的插件会直接失效,连扫描mapper都得从代码上写,这是为什么呢?

通过阅读以上代码,是因为我们另外的配置了一个sqlSessionFactory,导致mp的sqlSessionFactory失效了,这时候我们发现了问题的所在了,即使我们不配置sqlSessionFactoryl,也会因为mp所使用的数据源不是被seata代理过后的数据源,导致分布式事务失效.但是如何解决这个问题呢?

这时候我们需要去阅读mp的源码,找到他的启动类,一看便知

/*
* Copyright (c) 2011-2020, baomidou (jobob@qq.com).
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.baomidou.mybatisplus.autoconfigure;


import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.core.config.GlobalConfig;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import com.baomidou.mybatisplus.core.incrementer.IKeyGenerator;
import com.baomidou.mybatisplus.core.injector.ISqlInjector;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.mapping.DatabaseIdProvider;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.scripting.LanguageDriver;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.type.TypeHandler;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.mapper.MapperFactoryBean;
import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import javax.sql.DataSource;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

/**
* {@link EnableAutoConfiguration Auto-Configuration} for Mybatis. Contributes a
* {@link SqlSessionFactory} and a {@link SqlSessionTemplate}.
* <p>
* If {@link org.mybatis.spring.annotation.MapperScan} is used, or a
* configuration file is specified as a property, those will be considered,
* otherwise this auto-configuration will attempt to register mappers based on
* the interface definitions in or under the root auto-configuration package.
* </p>
* <p> copy from {@link org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration}</p>
*
* @author Eddú Meléndez
* @author Josh Long
* @author Kazuki Shimizu
* @author Eduardo Macarrón
*/
@Configuration
@ConditionalOnClass({SqlSessionFactory.class, SqlSessionFactoryBean.class})
@ConditionalOnSingleCandidate(DataSource.class)
@EnableConfigurationProperties(MybatisPlusProperties.class)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
public class MybatisPlusAutoConfiguration implements InitializingBean {

private static final Logger logger = LoggerFactory.getLogger(MybatisPlusAutoConfiguration.class);

private final MybatisPlusProperties properties;

private final Interceptor[] interceptors;

private final TypeHandler[] typeHandlers;

private final LanguageDriver[] languageDrivers;

private final ResourceLoader resourceLoader;

private final DatabaseIdProvider databaseIdProvider;

private final List<ConfigurationCustomizer> configurationCustomizers;

private final List<MybatisPlusPropertiesCustomizer> mybatisPlusPropertiesCustomizers;

private final ApplicationContext applicationContext;


public MybatisPlusAutoConfiguration(MybatisPlusProperties properties,
ObjectProvider<Interceptor[]> interceptorsProvider,
ObjectProvider<TypeHandler[]> typeHandlersProvider,
ObjectProvider<LanguageDriver[]> languageDriversProvider,
ResourceLoader resourceLoader,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider,
ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mybatisPlusPropertiesCustomizerProvider,
ApplicationContext applicationContext) {
this.properties = properties;
this.interceptors = interceptorsProvider.getIfAvailable();
this.typeHandlers = typeHandlersProvider.getIfAvailable();
this.languageDrivers = languageDriversProvider.getIfAvailable();
this.resourceLoader = resourceLoader;
this.databaseIdProvider = databaseIdProvider.getIfAvailable();
this.configurationCustomizers = configurationCustomizersProvider.getIfAvailable();
this.mybatisPlusPropertiesCustomizers = mybatisPlusPropertiesCustomizerProvider.getIfAvailable();
this.applicationContext = applicationContext;
}

@Override
public void afterPropertiesSet() {
if (!CollectionUtils.isEmpty(mybatisPlusPropertiesCustomizers)) {
mybatisPlusPropertiesCustomizers.forEach(i -> i.customize(properties));
}
checkConfigFileExists();
}

private void checkConfigFileExists() {
if (this.properties.isCheckConfigLocation() && StringUtils.hasText(this.properties.getConfigLocation())) {
Resource resource = this.resourceLoader.getResource(this.properties.getConfigLocation());
Assert.state(resource.exists(),
"Cannot find config location: " + resource + " (please add config file or check your Mybatis configuration)");
}
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
// TODO 使用 MybatisSqlSessionFactoryBean 而不是 SqlSessionFactoryBean
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
factory.setDataSource(dataSource);
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation()));
}
applyConfiguration(factory);
if (this.properties.getConfigurationProperties() != null) {
factory.setConfigurationProperties(this.properties.getConfigurationProperties());
}
if (!ObjectUtils.isEmpty(this.interceptors)) {
factory.setPlugins(this.interceptors);
}
if (this.databaseIdProvider != null) {
factory.setDatabaseIdProvider(this.databaseIdProvider);
}
if (StringUtils.hasLength(this.properties.getTypeAliasesPackage())) {
factory.setTypeAliasesPackage(this.properties.getTypeAliasesPackage());
}
if (this.properties.getTypeAliasesSuperType() != null) {
factory.setTypeAliasesSuperType(this.properties.getTypeAliasesSuperType());
}
if (StringUtils.hasLength(this.properties.getTypeHandlersPackage())) {
factory.setTypeHandlersPackage(this.properties.getTypeHandlersPackage());
}
if (!ObjectUtils.isEmpty(this.typeHandlers)) {
factory.setTypeHandlers(this.typeHandlers);
}
if (!ObjectUtils.isEmpty(this.properties.resolveMapperLocations())) {
factory.setMapperLocations(this.properties.resolveMapperLocations());
}

// TODO 对源码做了一定的修改(因为源码适配了老旧的mybatis版本,但我们不需要适配)
Class<? extends LanguageDriver> defaultLanguageDriver = this.properties.getDefaultScriptingLanguageDriver();
if (!ObjectUtils.isEmpty(this.languageDrivers)) {
factory.setScriptingLanguageDrivers(this.languageDrivers);
}
Optional.ofNullable(defaultLanguageDriver).ifPresent(factory::setDefaultScriptingLanguageDriver);

// TODO 自定义枚举包
if (StringUtils.hasLength(this.properties.getTypeEnumsPackage())) {
factory.setTypeEnumsPackage(this.properties.getTypeEnumsPackage());
}
// TODO 此处必为非 NULL
GlobalConfig globalConfig = this.properties.getGlobalConfig();
// TODO 注入填充器
if (this.applicationContext.getBeanNamesForType(MetaObjectHandler.class,
false, false).length > 0) {
MetaObjectHandler metaObjectHandler = this.applicationContext.getBean(MetaObjectHandler.class);
globalConfig.setMetaObjectHandler(metaObjectHandler);
}
// TODO 注入主键生成器
if (this.applicationContext.getBeanNamesForType(IKeyGenerator.class, false,
false).length > 0) {
IKeyGenerator keyGenerator = this.applicationContext.getBean(IKeyGenerator.class);
globalConfig.getDbConfig().setKeyGenerator(keyGenerator);
}
// TODO 注入sql注入器
if (this.applicationContext.getBeanNamesForType(ISqlInjector.class, false,
false).length > 0) {
ISqlInjector iSqlInjector = this.applicationContext.getBean(ISqlInjector.class);
globalConfig.setSqlInjector(iSqlInjector);
}
// TODO 设置 GlobalConfig 到 MybatisSqlSessionFactoryBean
factory.setGlobalConfig(globalConfig);
return factory.getObject();
}

// TODO 入参使用 MybatisSqlSessionFactoryBean
private void applyConfiguration(MybatisSqlSessionFactoryBean factory) {
// TODO 使用 MybatisConfiguration
MybatisConfiguration configuration = this.properties.getConfiguration();
if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation())) {
configuration = new MybatisConfiguration();
}
if (configuration != null && !CollectionUtils.isEmpty(this.configurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.configurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration);
}

@Bean
@ConditionalOnMissingBean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
ExecutorType executorType = this.properties.getExecutorType();
if (executorType != null) {
return new SqlSessionTemplate(sqlSessionFactory, executorType);
} else {
return new SqlSessionTemplate(sqlSessionFactory);
}
}

/**
* This will just scan the same base package as Spring Boot does. If you want more power, you can explicitly use
* {@link org.mybatis.spring.annotation.MapperScan} but this will get typed mappers working correctly, out-of-the-box,
* similar to using Spring Data JPA repositories.
*/
public static class AutoConfiguredMapperScannerRegistrar implements BeanFactoryAware, ImportBeanDefinitionRegistrar {

private BeanFactory beanFactory;

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

if (!AutoConfigurationPackages.has(this.beanFactory)) {
logger.debug("Could not determine auto-configuration package, automatic mapper scanning disabled.");
return;
}

logger.debug("Searching for mappers annotated with @Mapper");

List<String> packages = AutoConfigurationPackages.get(this.beanFactory);
if (logger.isDebugEnabled()) {
packages.forEach(pkg -> logger.debug("Using auto-configuration base package '{}'", pkg));
}

BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MapperScannerConfigurer.class);
builder.addPropertyValue("processPropertyPlaceHolders", true);
builder.addPropertyValue("annotationClass", Mapper.class);
builder.addPropertyValue("basePackage", StringUtils.collectionToCommaDelimitedString(packages));
BeanWrapper beanWrapper = new BeanWrapperImpl(MapperScannerConfigurer.class);
Stream.of(beanWrapper.getPropertyDescriptors())
// Need to mybatis-spring 2.0.2+
.filter(x -> x.getName().equals("lazyInitialization")).findAny()
.ifPresent(x -> builder.addPropertyValue("lazyInitialization", "${mybatis.lazy-initialization:false}"));
registry.registerBeanDefinition(MapperScannerConfigurer.class.getName(), builder.getBeanDefinition());
}

@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
}

/**
* If mapper registering configuration or mapper scanning configuration not present, this configuration allow to scan
* mappers based on the same component-scanning path as Spring Boot itself.
*/
@Configuration
@Import(AutoConfiguredMapperScannerRegistrar.class)
@ConditionalOnMissingBean({MapperFactoryBean.class, MapperScannerConfigurer.class})
public static class MapperScannerRegistrarNotFoundConfiguration implements InitializingBean {

@Override
public void afterPropertiesSet() {
logger.debug(
"Not found configuration for registering mapper bean using @MapperScan, MapperFactoryBean and MapperScannerConfigurer.");
}
}
}

看到mp启动类里的sqlSessionFactory方法了吗,他也是一样的注入一个数据源,这时候大家应该都知道解决方法了吧?

没错,就是把被代理过的数据源给放到mp的sqlSessionFactory中.

很简单,我们需要稍微改动一下我们的seata配置类就行了

package org.test.config;

import javax.sql.DataSource;

import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import com.alibaba.druid.pool.DruidDataSource;

import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
@MapperScan("com.baomidou.springboot.mapper*")
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties;
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);
private DataSourceProxy dataSourceProxy;

@Bean(name = "dataSource") // 声明其为Bean实例
@Primary // 在同样的DataSource中,首先使用被标注的DataSource
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}", dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("装载dataSource........");
dataSourceProxy = new DataSourceProxy(druidDataSource);
return dataSourceProxy;
}

/**
* init datasource proxy
*
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy() {
logger.info("代理dataSource........");
return dataSourceProxy;
}

/**
* init global transaction scanner
*
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("配置seata........");
return new GlobalTransactionScanner("test-service", "test-group");
}
}

看代码,我们去掉了自己配置的sqlSessionFactory,直接让DataSource bean返回的是一个被代理过的bean,并且我们加入了@Primary,导致mp优先使用我们配置的数据源,这样就解决了mp因为seata代理了数据源跟创建了新的sqlSessionFactory,导致mp的插件,组件失效的bug了!

总结

踩到坑不可怕,主要又耐心的顺着每个组件实现的原理,再去思考,查找对应冲突的代码块,你一定能找到个兼容二者的方法。

· 阅读需 23 分钟

项目地址

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

事务:事务是由一组操作构成的可靠的独立的工作单元,事务具备ACID的特性,即原子性、一致性、隔离性和持久性。 ​ 分布式事务:当一个操作牵涉到多个服务,多台数据库协力完成时(比如分表分库后,业务拆分),多个服务中,本地的Transaction已经无法应对这个情况了,为了保证数据一致性,就需要用到分布式事务。 ​ Seata :是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 ​ 本文目的:现如今微服务越来越流行,而市面上的分布式事务的方案可谓不少,参差不齐,比较流行的以MQ代表的保证的是消息最终一致性的解决方案(消费确认,消息回查,消息补偿机制等),以及TX-LCN的LCN模式协调本地事务来保证事务统一提交或回滚(已经停止更新,对Dubbo2.7不兼容)。而MQ的分布式事务太过复杂,TX-LCN断更,这时候需要一个高效可靠及易上手的分布式事务解决方案,Seata脱颖而出,本文要介绍的就是如何快速搭建一个整合Seata的Demo项目,一起来吧!

准备工作

1.首先安装mysql,eclipse之类常用的工具,这不展开了.

2.访问seata下载中心地址我们使用的0.9.0版本

3.下载并解压seata-server

建库建表

1.首先我们链接mysql创建一个名为seata的数据库,然后运行一下建表sql,这个在seata-server的conf文件夹内的db_store.sql就是我的所需要使用的sql了.

/*
Navicat MySQL Data Transfer
Source Server : mysql
Source Server Version : 50721
Source Host : localhost:3306
Source Database : seata
Target Server Type : MYSQL
Target Server Version : 50721
File Encoding : 65001
Date: 2019-11-23 22:03:18
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------

-- Table structure for branch_table

-- ----------------------------

DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of branch_table

-- ----------------------------

-- ----------------------------

-- Table structure for global_table

-- ----------------------------

DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of global_table

-- ----------------------------

-- ----------------------------

-- Table structure for lock_table

-- ----------------------------

DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(96) DEFAULT NULL,
`transaction_id` mediumtext,
`branch_id` mediumtext,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of lock_table

-- ----------------------------

-- ----------------------------

-- Table structure for undo_log

-- ----------------------------

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------

-- Records of undo_log

2.运行完上面的seata所需要的数据库后,我们进行搭建我们所需要写的demo的库,创建一个名为test的数据库,然后执行以下sql代码:

/*
Navicat MySQL Data Transfer
Source Server : mysql
Source Server Version : 50721
Source Host : localhost:3306
Source Database : test
Target Server Type : MYSQL
Target Server Version : 50721
File Encoding : 65001
Date: 2019-11-23 22:03:24
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------

-- Table structure for test

-- ----------------------------

DROP TABLE IF EXISTS `test`;
CREATE TABLE `test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`one` varchar(255) DEFAULT NULL,
`two` varchar(255) DEFAULT NULL,
`createTime` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of test

-- ----------------------------

INSERT INTO `test` VALUES ('1', '1', '2', '2019-11-23 16:07:34');

-- ----------------------------

-- Table structure for undo_log

-- ----------------------------

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------

-- Records of undo_log

3.我们找到seata-server/conf 文件夹内的file编辑它:20191129132933

4.再次找到其中的db配置方法块,更改方法如下图:

好了,可以到bin目录去./seata-server.bat 运行看看了

创建项目

​ 首先我们使用的是eclipse,当然你也可以用idea之类的工具,详细请按下面步骤来运行

​ 1.创建一个新的maven项目,并删除多余文件夹:2019112913335420191129133441

​ 2.打开项目的pom.xml,加入以下依赖:

	<properties>
<webVersion>3.1</webVersion>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<HikariCP.version>3.2.0</HikariCP.version>
<mybatis-plus-boot-starter.version>3.2.0</mybatis-plus-boot-starter.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<!-- <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId>
<version>7.0</version> <scope>provided</scope> </dependency> -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>

<!-- mybatis-plus begin -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<!-- mybatis-plus end -->
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0.1</version>
</dependency>
<!-- Zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>2.5.4</version> </dependency> -->

<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId>
<version>3.1.0</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.freemarker/freemarker -->
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency>
<!-- 加上这个才能辨认到log4j2.yml文件 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency> <!-- 引入log4j2依赖 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId>
<version>2.11.0</version> </dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

​ 3.再切换父项目为pom模式,还是pom文件,切换为 overview ,做如图操作:20191129134127

4.创建我们的demo子项目,test-service:20191129135935

​ 目录如下:

20191129140048

创建EmbeddedZooKeeper.java文件,跟 ProviderApplication.java,代码如下:

package org.test;

import java.io.File;
import java.lang.reflect.Method;
import java.util.Properties;
import java.util.UUID;

import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.ErrorHandler;
import org.springframework.util.SocketUtils;

/**
* from:
* https://github.com/spring-projects/spring-xd/blob/v1.3.1.RELEASE/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/zookeeper/ZooKeeperUtils.java
*
* Helper class to start an embedded instance of standalone (non clustered) ZooKeeper.
*
* NOTE: at least an external standalone server (if not an ensemble) are recommended, even for
* {@link org.springframework.xd.dirt.server.singlenode.SingleNodeApplication}
*
* @author Patrick Peralta
* @author Mark Fisher
* @author David Turanski
*/
public class EmbeddedZooKeeper implements SmartLifecycle {

/**
* Logger.
*/
private static final Logger logger = LoggerFactory.getLogger(EmbeddedZooKeeper.class);

/**
* ZooKeeper client port. This will be determined dynamically upon startup.
*/
private final int clientPort;

/**
* Whether to auto-start. Default is true.
*/
private boolean autoStartup = true;

/**
* Lifecycle phase. Default is 0.
*/
private int phase = 0;

/**
* Thread for running the ZooKeeper server.
*/
private volatile Thread zkServerThread;

/**
* ZooKeeper server.
*/
private volatile ZooKeeperServerMain zkServer;

/**
* {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread.
*/
private ErrorHandler errorHandler;

private boolean daemon = true;

/**
* Construct an EmbeddedZooKeeper with a random port.
*/
public EmbeddedZooKeeper() {
clientPort = SocketUtils.findAvailableTcpPort();
}

/**
* Construct an EmbeddedZooKeeper with the provided port.
*
* @param clientPort
* port for ZooKeeper server to bind to
*/
public EmbeddedZooKeeper(int clientPort, boolean daemon) {
this.clientPort = clientPort;
this.daemon = daemon;
}

/**
* Returns the port that clients should use to connect to this embedded server.
*
* @return dynamically determined client port
*/
public int getClientPort() {
return this.clientPort;
}

/**
* Specify whether to start automatically. Default is true.
*
* @param autoStartup
* whether to start automatically
*/
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

/**
* {@inheritDoc}
*/
public boolean isAutoStartup() {
return this.autoStartup;
}

/**
* Specify the lifecycle phase for the embedded server.
*
* @param phase
* the lifecycle phase
*/
public void setPhase(int phase) {
this.phase = phase;
}

/**
* {@inheritDoc}
*/
public int getPhase() {
return this.phase;
}

/**
* {@inheritDoc}
*/
public boolean isRunning() {
return (zkServerThread != null);
}

/**
* Start the ZooKeeper server in a background thread.
* <p>
* Register an error handler via {@link #setErrorHandler} in order to handle any exceptions thrown during startup or
* execution.
*/
public synchronized void start() {
if (zkServerThread == null) {
zkServerThread = new Thread(new ServerRunnable(), "ZooKeeper Server Starter");
zkServerThread.setDaemon(daemon);
zkServerThread.start();
}
}

/**
* Shutdown the ZooKeeper server.
*/
public synchronized void stop() {
if (zkServerThread != null) {
// The shutdown method is protected...thus this hack to invoke it.
// This will log an exception on shutdown; see
// https://issues.apache.org/jira/browse/ZOOKEEPER-1873 for details.
try {
Method shutdown = ZooKeeperServerMain.class.getDeclaredMethod("shutdown");
shutdown.setAccessible(true);
shutdown.invoke(zkServer);
}

catch (Exception e) {
throw new RuntimeException(e);
}

// It is expected that the thread will exit after
// the server is shutdown; this will block until
// the shutdown is complete.
try {
zkServerThread.join(5000);
zkServerThread = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for embedded ZooKeeper to exit");
// abandoning zk thread
zkServerThread = null;
}
}
}

/**
* Stop the server if running and invoke the callback when complete.
*/
public void stop(Runnable callback) {
stop();
callback.run();
}

/**
* Provide an {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. If none
* is provided, only error-level logging will occur.
*
* @param errorHandler
* the {@link ErrorHandler} to be invoked
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Runnable implementation that starts the ZooKeeper server.
*/
private class ServerRunnable implements Runnable {

public void run() {
try {
Properties properties = new Properties();
File file = new File(System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID());
file.deleteOnExit();
properties.setProperty("dataDir", file.getAbsolutePath());
properties.setProperty("clientPort", String.valueOf(clientPort));

QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
quorumPeerConfig.parseProperties(properties);

zkServer = new ZooKeeperServerMain();
ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumPeerConfig);

zkServer.runFromConfig(configuration);
} catch (Exception e) {
if (errorHandler != null) {
errorHandler.handleError(e);
} else {
logger.error("Exception running embedded ZooKeeper", e);
}
}
}
}

}

package org.test;

import org.apache.dubbo.config.spring.context.annotation.DubboComponentScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
*
* @author cjb
* @date 2019/10/24
*/
@EnableTransactionManagement
@ComponentScan(basePackages = {"org.test.config", "org.test.service.impl"})
@DubboComponentScan(basePackages = "org.test.service.impl")
@SpringBootApplication
public class ProviderApplication {

public static void main(String[] args) {
new EmbeddedZooKeeper(2181, false).start();
SpringApplication app = new SpringApplication(ProviderApplication.class);
app.run(args);
}

}

创建实体包 org.test.entity以及创建实体类Test 用到了lombok,详细百度一下,eclipse装lombok插件

package org.test.entity;

import java.io.Serializable;
import java.time.LocalDateTime;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

/**
* <p>
* 功能
* </p>
*
* @author Funkye
* @since 2019-04-23
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value = "test对象", description = "功能")
public class Test implements Serializable {

private static final long serialVersionUID = 1L;

@ApiModelProperty(value = "主键")
@TableId(value = "id", type = IdType.AUTO)
private Integer id;

@ApiModelProperty(value = "one")
@TableField("one")
private String one;

@ApiModelProperty(value = "two")
@TableField("two")
private String two;

@ApiModelProperty(value = "createTime")
@TableField("createTime")
private LocalDateTime createTime;

}

​ 创建service,service.impl,mapper等包,依次创建ITestservice,以及实现类,mapper

package org.test.service;

import org.test.entity.Test;

import com.baomidou.mybatisplus.extension.service.IService;

/**
* <p>
* 功能 服务类
* </p>
*
* @author Funkye
* @since 2019-04-10
*/
public interface ITestService extends IService<Test> {

}

package org.test.service.impl;




import org.apache.dubbo.config.annotation.Service;
import org.test.entity.Test;
import org.test.mapper.TestMapper;
import org.test.service.ITestService;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

@Service(version = "1.0.0",interfaceClass =ITestService.class )
public class TestServiceImpl extends ServiceImpl<TestMapper, Test> implements ITestService {

}

package org.test.mapper;

import org.test.entity.Test;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
* <p>
* 功能 Mapper 接口
* </p>
*
* @author Funkye
* @since 2019-04-10
*/
public interface TestMapper extends BaseMapper<Test> {

}

创建org.test.config包,创建SeataAutoConfig.java,配置信息都在此处,主要作用为代理数据,连接事务服务分组

package org.test.config;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import com.alibaba.druid.pool.DruidDataSource;

import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties;
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);

@Bean(name = "druidDataSource") // 声明其为Bean实例
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}", dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("装载dataSource........");
return druidDataSource;
}

/**
* init datasource proxy
*
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean(name = "dataSource")
@Primary // 在同样的DataSource中,首先使用被标注的DataSource
public DataSourceProxy dataSourceProxy(@Qualifier(value = "druidDataSource") DruidDataSource druidDataSource) {
logger.info("代理dataSource........");
return new DataSourceProxy(druidDataSource);
}

/**
* init global transaction scanner
*
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("配置seata........");
return new GlobalTransactionScanner("test-service", "test-group");
}
}

再创建mybatisplus所需的配置文件MybatisPlusConfig

package org.test.config;

import java.util.ArrayList;
import java.util.List;

import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.baomidou.mybatisplus.core.parser.ISqlParser;
import com.baomidou.mybatisplus.extension.parsers.BlockAttackSqlParser;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;

@Configuration
// @MapperScan("com.baomidou.springboot.mapper*")//这个注解,作用相当于下面的@Bean
// MapperScannerConfigurer,2者配置1份即可
public class MybatisPlusConfig {

/**
* mybatis-plus分页插件<br>
* 文档:http://mp.baomidou.com<br>
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
List<ISqlParser> sqlParserList = new ArrayList<ISqlParser>();
// 攻击 SQL 阻断解析器、加入解析链
sqlParserList.add(new BlockAttackSqlParser());
paginationInterceptor.setSqlParserList(sqlParserList);
return paginationInterceptor;
}

/**
* 相当于顶部的: {@code @MapperScan("com.baomidou.springboot.mapper*")} 这里可以扩展,比如使用配置文件来配置扫描Mapper的路径
*/

@Bean
public MapperScannerConfigurer mapperScannerConfigurer() {
MapperScannerConfigurer scannerConfigurer = new MapperScannerConfigurer();
scannerConfigurer.setBasePackage("org.test.mapper");
return scannerConfigurer;
}

}

​ 再创建resources目录,创建mapper文件夹,application.yml等文件

server:
port: 38888
spring:
application:
name: test-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
dubbo:
protocol:
loadbalance: leastactive
threadpool: cached
scan:
base-packages: org。test.service
application:
qos-enable: false
name: testserver
registry:
id: my-registry
address: zookeeper://127.0.0.1:2181?client=curator
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config:
db-config:
field-strategy: not-empty
id-type: auto
db-type: mysql
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
auto-mapping-unknown-column-behavior: none

​ 创建file.conf,此处的service 内的vgroup_mapping.你的事务分组,比如上面SeataAutoConfig内配置了test-group,那么这里也要改为test-group,然后下面ip端口都是seata运行的ip跟端口就行了

transport {
type = "TCP"
server = "NIO"
heartbeat = true
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
boss-thread-size = 1
worker-thread-size = 8
}
shutdown {
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
vgroup_mapping.test-group = "default"
default.grouplist = "127.0.0.1:8091"
enableDegrade = false
disable = false
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}

client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
tm.commit.retry.count = 1
tm.rollback.retry.count = 1
undo.log.table = "undo_log"
}

recovery {
committing-retry-period = 1000
asyn-committing-retry-period = 1000
rollbacking-retry-period = 1000
timeout-retry-period = 1000
}

transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
undo.log.save.days = 7
undo.log.delete.period = 86400000
undo.log.table = "undo_log"
}

metrics {
enabled = false
registry-type = "compact"
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}

support {
spring {
datasource.autoproxy = false
}
}

创建registry.conf,来指定file,zk的ip端口之类的配置

registry {
type = "file"
file {
name = "file.conf"
}
}
config {
type = "file"
file {
name = "file.conf"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
}

​ 大功告成,可以直接运行啦,这时候观察seata-server20191129142115

​ 接下来我们创建test-client项目项目,这里就不赘述了,跟上面的test-service一样的创建方式

​ 接下来我们复制test-service内的service跟实体过去,当然你嫌麻烦,可以单独搞个子项目放通用的service跟实体,一些工具类等等,我这边为了快速搭建这个demo,就选择复制黏贴的方式了.

目录结构:

然后我们创建ClientApplication:

package org.test;

import java.util.TimeZone;
import java.util.concurrent.Executor;

import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration;

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, MybatisPlusAutoConfiguration.class})
@EnableScheduling
@EnableAsync
@Configuration
@EnableDubbo(scanBasePackages = {"org.test.service"})
@ComponentScan(basePackages = {"org.test.service", "org.test.controller", "org.test.config"})
public class ClientApplication {
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
SpringApplication app = new SpringApplication(ClientApplication.class);
app.run(args);
}

@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
return new ThreadPoolTaskExecutor();
}
}

再到config包内创建SwaggerConfig :

package org.test.config;

import java.util.ArrayList;
import java.util.List;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.service.Parameter;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
// swagger2的配置文件,这里可以配置swagger2的一些基本的内容,比如扫描的包等等
@Bean
public Docket createRestApi() {
List<Parameter> pars = new ArrayList<Parameter>();
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
// 为当前包路径
.apis(RequestHandlerSelectors.basePackage("org.test.controller")).paths(PathSelectors.any()).build()
.globalOperationParameters(pars);
}

// 构建 api文档的详细信息函数,注意这里的注解引用的是哪个
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
// 页面标题
.title("项目接口")
// 创建人
.contact(new Contact("FUNKYE", "", ""))
// 版本号
.version("1.0")
// 描述
.description("API 描述").build();
}
}

​ 再创建SpringMvcConfigure,再里面放入seata的配置,我为了偷懒直接集成在mvc配置的类里了,大家规范点可以另外创建个配置seata的类,大家可以发现下面还是有个组名称,我把两个项目都分配到一个组去,貌似另外取一个也没事的.

package org.test.config;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.dubbo.config.annotation.Reference;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.filter.CorsFilter;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.servlet.view.InternalResourceViewResolver;

import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson.support.config.FastJsonConfig;
import com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter;
import com.google.common.collect.Maps;

import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SpringMvcConfigure implements WebMvcConfigurer {

@Bean
public FilterRegistrationBean corsFilter() {
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedOrigin("*");
config.addAllowedHeader(CorsConfiguration.ALL);
config.addAllowedMethod(CorsConfiguration.ALL);
source.registerCorsConfiguration("/**", config);
FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(new CorsFilter(source));
filterRegistrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE);
filterRegistrationBean.setOrder(1);
filterRegistrationBean.setEnabled(true);
filterRegistrationBean.addUrlPatterns("/**");
Map<String, String> initParameters = Maps.newHashMap();
initParameters.put("excludes", "/favicon.ico,/img/*,/js/*,/css/*");
initParameters.put("isIncludeRichText", "true");
filterRegistrationBean.setInitParameters(initParameters);
return filterRegistrationBean;
}

@Bean
public InternalResourceViewResolver viewResolver() {
InternalResourceViewResolver viewResolver = new InternalResourceViewResolver();
viewResolver.setPrefix("/WEB-INF/jsp/");
viewResolver.setSuffix(".jsp");
// viewResolver.setViewClass(JstlView.class);
// 这个属性通常并不需要手动配置,高版本的Spring会自动检测
return viewResolver;
}



/**
* 替换框架json为fastjson
*/
@Override
public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
FastJsonHttpMessageConverter fastConverter = new FastJsonHttpMessageConverter();
FastJsonConfig fastJsonConfig = new FastJsonConfig();
fastJsonConfig.setSerializerFeatures(SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue,
SerializerFeature.WriteNullStringAsEmpty, SerializerFeature.DisableCircularReferenceDetect);
// 处理中文乱码问题
List<MediaType> fastMediaTypes = new ArrayList<>();
fastMediaTypes.add(MediaType.APPLICATION_JSON_UTF8);
fastConverter.setSupportedMediaTypes(fastMediaTypes);
fastConverter.setFastJsonConfig(fastJsonConfig);
// 处理字符串, 避免直接返回字符串的时候被添加了引号
StringHttpMessageConverter smc = new StringHttpMessageConverter(Charset.forName("UTF-8"));
converters.add(smc);
converters.add(fastConverter);
}

@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("test-client", "test-group");
}

}

再创建controller包,再包下创建TestController :

package org.test.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.test.service.DemoService;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;

/**
* <p>
* 文件表 前端控制器
* </p>
*
* @author funkye
* @since 2019-03-20
*/
@RestController
@RequestMapping("/test")
@Api(tags = "测试接口")
public class TestController {

private final static Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
@Lazy
DemoService demoService;

@GetMapping(value = "testSeataOne")
@ApiOperation(value = "测试手动回滚分布式事务接口")
public Object testSeataOne() {
return demoService.One();
}

@GetMapping(value = "testSeataTwo")
@ApiOperation(value = "测试异常回滚分布式事务接口")
public Object testSeataTwo() {
return demoService.Two();
}

}

再到service去创建需要依赖的DemoService

package org.test.service;

import java.time.LocalDateTime;

import org.apache.dubbo.config.annotation.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.test.controller.TestController;
import org.test.entity.Test;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;

@Service
public class DemoService {
@Reference(version = "1.0.0", timeout = 60000)
private ITestService testService;
private final static Logger logger = LoggerFactory.getLogger(DemoService.class);

/**
* 手动回滚示例
*
* @return
*/
@GlobalTransactional
public Object One() {
logger.info("seata分布式事务Id:{}", RootContext.getXID());
Test t = new Test();
t.setOne("1");
t.setTwo("2");
t.setCreateTime(LocalDateTime.now());
testService.save(t);
try {
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
try {
logger.info("载入事务id进行回滚");
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
} catch (TransactionException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
return false;
}

/**
* 抛出异常进行回滚示例
*
* @return
*/
@GlobalTransactional
public Object Two() {
logger.info("seata分布式事务Id:{}", RootContext.getXID());
Test t = new Test();
t.setOne("1");
t.setTwo("2");
t.setCreateTime(LocalDateTime.now());
testService.save(t);
try {
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException();
}
}
}

一样创建resources文件夹,先创建常用的application.yml

spring:
application:
name: test
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test?userSSL=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai
username: root
password: 123456
mvc:
servlet:
load-on-startup: 1
http:
encoding:
force: true
charset: utf-8
enabled: true
multipart:
max-file-size: 10MB
max-request-size: 10MB
dubbo:
registry:
id: my-registry
address: zookeeper://127.0.0.1:2181?client=curator
# address: zookeeper://127.0.0.1:2181?client=curator
application:
name: dubbo-demo-client
qos-enable: false
server:
port: 28888
max-http-header-size: 8192
address: 0.0.0.0
tomcat:
max-http-post-size: 104857600

再把之前service配置好的file跟registry文件复制来,如果你的client组名称再配置类里修改了,那么这里的file文件内的组名称一样需要更改.

完整的目录结构如上,这时候可以启动test-service后,再启动test-client,到swagger里测试咯

​ 4.访问127.0.0.1:28888/swagger-ui.html做最后的收尾

20191129143124

这里数据我已经存了一条记录了,我们看看会不会成功回滚:

20191129143252

刷新数据库,发现还是只有一条数据:

20191129143124

再查看日志:

20191129143407

显示已经回滚,我们再看看seata-server的日志:

显示回滚成功,事务id也是一致的,这下我们的分布式事务就跑通咯,通过打断点方式,大家可以查看undo_log,会发现再事务提交前,会存入一条事务信息的数据,如果回滚成功,该信息就会被删除.

总结

seata的整合还是比较简单易入手,稍微用心一些你肯定写的比我更好!

欢迎大家也多去阅读seata,dubbo之类的源代码,能解决业务中遇到的大量的坑哦!

· 阅读需 4 分钟

在分析启动部分源码时,我发现 GlobalTransactionScanner 会同时启动 RM 和 TM client,但根据 Seata 的设计来看,TM 负责全局事务的操作,如果一个服务中不需要开启全局事务,此时是不需要启动 TM client的,也就是说项目中如果没有全局事务注解,此时是不是就不需要初始化 TM client 了,因为不是每个微服务,都需要 GlobalTransactional,它此时仅仅作为一个 RM client 而已。

于是我着手将 GlobalTransactionScanner 稍微更改了初始化的规则,由于之前 GlobalTransactionScanner 调用 初始化方法是在 InitializingBean 中的 afterPropertiesSet() 方法中进行,afterPropertySet() 仅仅是当前 bean 初始化后被调用,此时无法得知当前 Spring 容器是否有全局事务注解。

因此我去掉了 InitializingBean,改成了是实现 ApplicationListener,在实例化 bean 的过程中检查是否有 GlobalTransactional 注解的存在,最后在 Spring 容器初始化完成之后再调用 RM 和 TM client 初始化方法,这时候就可以根据项目是否有用到全局事务注解来决定是否启动 TM client 了。

这里附上 PR 地址:https://github.com/apache/incubator-seata/pull/1936

随后在 pr 中讨论中得知,目前 Seata 的设计是只有在发起方的 TM 才可以发起 GlobalRollbackRequest,RM 只能发送 BranchReport(false) 上报分支状态到 TC 服务端,无法直接发送 GlobalRollbackRequest 进行全局回滚操作。具体的交互逻辑如下:

那么根据上面的设计模型,自然可以按需启动 TM client 了。

但是 Seata 后面的优化迭代中,还需要考虑的一点是:

当参与方出现异常时,是否可以直接由参与方的 TM client 发起全局回滚?这也就意味着可以缩短分布式事务的周期时间,尽快释放全局锁让其他数据冲突的事务尽早的获取到锁执行。

也就是说在一个全局事务当中,只要有一个 RM client 执行本地事务失败了,直接当前服务的 TM client 发起全局事务回滚,不必要等待发起方的 TM 发起的决议回滚通知了。如果要实现这个优化,那么就需要每个服务都需要同时启动 TM client 和 RM client。

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 14 分钟

从上一篇文章「分布式事务中间件Seata的设计原理」讲了下 Seata AT 模式的一些设计原理,从中也知道了 AT 模式的三个角色(RM、TM、TC),接下来我会更新 Seata 源码分析系列文章。今天就来分析 Seata AT 模式在启动的时候都做了哪些操作。

客户端启动逻辑

TM 是负责整个全局事务的管理器,因此一个全局事务是由 TM 开启的,TM 有个全局管理类 GlobalTransaction,结构如下:

io.seata.tm.api.GlobalTransaction

public interface GlobalTransaction {

void begin() throws TransactionException;

void begin(int timeout) throws TransactionException;

void begin(int timeout, String name) throws TransactionException;

void commit() throws TransactionException;

void rollback() throws TransactionException;

GlobalStatus getStatus() throws TransactionException;

// ...
}

可以通过 GlobalTransactionContext 创建一个 GlobalTransaction,然后用 GlobalTransaction 进行全局事务的开启、提交、回滚等操作,因此我们直接用 API 方式使用 Seata AT 模式:

//init seata;
TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);
//trx
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
tx.begin(60000, "testBiz");
// 事务处理
// ...
tx.commit();
} catch (Exception exx) {
tx.rollback();
throw exx;
}

如果每次使用全局事务都这样写,难免会造成代码冗余,我们的项目都是基于 Spring 容器,这时我们可以利用 Spring AOP 的特性,用模板模式把这些冗余代码封装模版里,参考 Mybatis-spring 也是做了这么一件事情,那么接下来我们来分析一下基于 Spring 的项目启动 Seata 并注册全局事务时都做了哪些工作。

我们开启一个全局事务是在方法上加上 @GlobalTransactional注解,Seata 的 Spring 模块中,有个 GlobalTransactionScanner,它的继承关系如下:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean, ApplicationContextAware, DisposableBean {
// ...
}

在基于 Spring 项目的启动过程中,对该类会有如下初始化流程:

image-20191124155455309

InitializingBean 的 afterPropertiesSet() 方法调用了 initClient() 方法:

io.seata.spring.annotation.GlobalTransactionScanner#initClient

TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);

对 TM 和 RM 做了初始化操作。

  • TM 初始化

io.seata.tm.TMClient#init

public static void init(String applicationId, String transactionServiceGroup) {
// 获取 TmRpcClient 实例
TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
// 初始化 TM Client
tmRpcClient.init();
}

调用 TmRpcClient.getInstance() 方法会获取一个 TM 客户端实例,在获取过程中,会创建 Netty 客户端配置文件对象,以及创建 messageExecutor 线程池,该线程池用于在处理各种与服务端的消息交互,在创建 TmRpcClient 实例时,创建 ClientBootstrap,用于管理 Netty 服务的启停,以及 ClientChannelManager,它是专门用于管理 Netty 客户端对象池,Seata 的 Netty 部分配合使用了对象池,后面在分析网络模块会讲到。

io.seata.core.rpc.netty.AbstractRpcRemotingClient#init

public void init() {
clientBootstrap.start();
// 定时尝试连接服务端
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
super.init();
}

调用 TM 客户端 init() 方法,最终会启动 netty 客户端(此时还未真正启动,在对象池被调用时才会被真正启动);开启一个定时任务,定时重新发送 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端,具体逻辑是在 NettyClientChannelManager 中的 channels 中缓存了客户端 channel,如果此时 channels 不存在获取已过期,那么就会尝试连接服务端以重新获取 channel 并将其缓存到 channels 中;开启一条单独线程,用于处理异步请求发送,这里用得很巧妙,之后在分析网络模块在具体对其进行分析。

io.seata.core.rpc.netty.AbstractRpcRemoting#init

public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
if (entry.getValue().isTimeout()) {
futures.remove(entry.getKey());
entry.getValue().setResultMessage(null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}

nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

在 AbstractRpcRemoting 的 init 方法中,又是开启了一个定时任务,该定时任务主要是用于定时清除 futures 已过期的 futrue,futures 是保存发送请求需要返回结果的 future 对象,该对象有个超时时间,过了超时时间就会自动抛异常,因此需要定时清除已过期的 future 对象。

  • RM 初始化

io.seata.rm.RMClient#init

public static void init(String applicationId, String transactionServiceGroup) {
RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
rmRpcClient.setResourceManager(DefaultResourceManager.get());
rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
rmRpcClient.init();
}

RmRpcClient.getInstance 处理逻辑与 TM 大致相同;ResourceManager 是 RM 资源管理器,负责分支事务的注册、提交、上报、以及回滚操作,以及全局锁的查询操作,DefaultResourceManager 会持有当前所有的 RM 资源管理器,进行统一调用处理,而 get() 方法主要是加载当前的资源管理器,主要用了类似 SPI 的机制,进行灵活加载,如下图,Seata 会扫描 META-INF/services/ 目录下的配置类并进行动态加载。

ClientMessageListener 是 RM 消息处理监听器,用于负责处理从 TC 发送过来的指令,并对分支进行分支提交、分支回滚,以及 undo log 删除操作;最后 init 方法跟 TM 逻辑也大体一致;DefaultRMHandler 封装了 RM 分支事务的一些具体操作逻辑。

接下来再看看 wrapIfNecessary 方法究竟做了哪些操作。

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 判断是否有开启全局事务
if (disableGlobalTransaction) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

// 判断 bean 中是否有 GlobalTransactional 和 GlobalLock 注解
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}

if (interceptor == null) {
// 创建代理类
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}

LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]",
bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// 执行包装目标对象到代理对象
Advisor[] advisor = super.buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

GlobalTransactionScanner 继承了 AbstractAutoProxyCreator,用于对 Spring AOP 支持,从代码中可看出,用GlobalTransactionalInterceptor 代替了被 GlobalTransactional 和 GlobalLock 注解的方法。

GlobalTransactionalInterceptor 实现了 MethodInterceptor:

io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
// 全局事务注解
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
// 全局锁注解
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}

以上是代理方法执行的逻辑逻辑,其中 handleGlobalTransaction() 方法里面调用了 TransactionalTemplate 模版:

io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction

private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public TransactionInfo getTransactionInfo() {
// ...
}
});
} catch (TransactionalExecutor.ExecutionException e) {
// ...
}
}

handleGlobalTransaction() 方法执行了就是 TransactionalTemplate 模版类的 execute 方法:

io.seata.tm.api.TransactionalTemplate#execute

public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {

// 2. begin transaction
beginTransaction(txInfo, tx);

Object rs = null;
try {

// Do Your Business
rs = business.execute();

} catch (Throwable ex) {

// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}

// 4. everything is fine, commit.
commitTransaction(tx);

return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}

以上是不是有一种似曾相识的感觉?没错,以上就是我们使用 API 时经常写的冗余代码,现在 Spring 通过代理模式,把这些冗余代码都封装带模版里面了,它将那些冗余代码统统封装起来统一流程处理,并不需要你显示写出来了,有兴趣的也可以去看看 Mybatis-spring 的源码,也是写得非常精彩。

服务端处理逻辑

服务端收到客户端的连接,那当然是将其 channel 也缓存起来,前面也说到客户端会发送 RegisterRMRequest/RegisterTMRequest 请求给服务端,服务端收到后会调用 ServerMessageListener 监听器处理:

io.seata.core.rpc.ServerMessageListener

public interface ServerMessageListener {
// 处理各种事务,如分支注册、分支提交、分支上报、分支回滚等等
void onTrxMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender);
// 处理 RM 客户端的注册连接
void onRegRmMessage(RpcMessage request, ChannelHandlerContext ctx,
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler);
// 处理 TM 客户端的注册连接
void onRegTmMessage(RpcMessage request, ChannelHandlerContext ctx,
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler);
// 服务端与客户端保持心跳
void onCheckMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender)

}

ChannelManager 是服务端 channel 的管理器,服务端每次和客户端通信,都需要从 ChannelManager 中获取客户端对应的 channel,它用于保存 TM 和 RM 客户端 channel 的缓存结构如下:

/**
* resourceId -> applicationId -> ip -> port -> RpcContext
*/
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>>>
RM_CHANNELS = new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>>>();

/**
* ip+appname,port
*/
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS
= new ConcurrentHashMap<String, ConcurrentMap<Integer, RpcContext>>();

以上的 Map 结构有点复杂:

RM_CHANNELS:

  1. resourceId 指的是 RM client 的数据库地址;
  2. applicationId 指的是 RM client 的服务 Id,比如 springboot 的配置 spring.application.name=account-service 中的 account-service 即是 applicationId;
  3. ip 指的是 RM client 服务地址;
  4. port 指的是 RM client 服务地址;
  5. RpcContext 保存了本次注册请求的信息。

TM_CHANNELS:

  1. ip+appname:这里的注释应该是写错了,应该是 appname+ip,即 TM_CHANNELS 的 Map 结构第一个 key 为 appname+ip;
  2. port:客户端的端口号。

以下是 RM Client 注册逻辑:

io.seata.core.rpc.ChannelManager#registerRMChannel

public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(resourceManagerRequest.getVersion());
// 将 ResourceIds 数据库连接连接信息放入一个set中
Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
RpcContext rpcContext;
// 从缓存中判断是否有该channel信息
if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
// 根据请求注册信息,构建 rpcContext
rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
resourceManagerRequest.getResourceIds(), channel);
// 将 rpcContext 放入缓存中
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
} else {
rpcContext = IDENTIFIED_CHANNELS.get(channel);
rpcContext.addResources(dbkeySet);
}
if (null == dbkeySet || dbkeySet.isEmpty()) { return; }
for (String resourceId : dbkeySet) {
String clientIp;
// 将请求信息存入 RM_CHANNELS 中,这里用了 java8 的 computeIfAbsent 方法操作
ConcurrentMap<Integer, RpcContext> portMap = RM_CHANNELS.computeIfAbsent(resourceId, resourceIdKey -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceManagerRequest.getApplicationId(), applicationId -> new ConcurrentHashMap<>())
.computeIfAbsent(clientIp = getClientIpFromChannel(channel), clientIpKey -> new ConcurrentHashMap<>());
// 将当前 rpcContext 放入 portMap 中
rpcContext.holdInResourceManagerChannels(resourceId, portMap);
updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
}
}

从以上代码逻辑能够看出,注册 RM client 主要是将注册请求信息,放入 RM_CHANNELS 缓存中,同时还会从 IDENTIFIED_CHANNELS 中判断本次请求的 channel 是否已验证过,IDENTIFIED_CHANNELS 的结构如下:

private static final ConcurrentMap<Channel, RpcContext> IDENTIFIED_CHANNELS
= new ConcurrentHashMap<>();

IDENTIFIED_CHANNELS 包含了所有 TM 和 RM 已注册的 channel。

以下是 TM 注册逻辑:

io.seata.core.rpc.ChannelManager#registerTMChannel

public static void registerTMChannel(RegisterTMRequest request, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(request.getVersion());
// 根据请求注册信息,构建 RpcContext
RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
request.getApplicationId(),
request.getTransactionServiceGroup(),
null, channel);
// 将 RpcContext 放入 IDENTIFIED_CHANNELS 缓存中
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
// account-service:127.0.0.1:63353
String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
+ getClientIpFromChannel(channel);
// 将请求信息存入 TM_CHANNELS 缓存中
TM_CHANNELS.putIfAbsent(clientIdentified, new ConcurrentHashMap<Integer, RpcContext>());
// 将上一步创建好的get出来,之后再将rpcContext放入这个map的value中
ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = TM_CHANNELS.get(clientIdentified);
rpcContext.holdInClientChannels(clientIdentifiedMap);
}

TM client 的注册大体类似,把本次注册的信息放入对应的缓存中保存,但比 RM client 的注册逻辑简单一些,主要是 RM client 会涉及分支事务资源的信息,需要注册的信息也会比 TM client 多。

以上源码分析基于 0.9.0 版本。

作者简介

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 29 分钟

Seata 意为:Simple Extensible Autonomous Transaction Architecture,是一套一站式分布式事务解决方案,提供了 AT、TCC、Saga 和 XA 事务模式,本文详解其中的 Saga 模式。
项目地址:https://github.com/apache/incubator-seata

本文作者:屹远(陈龙),蚂蚁金服分布式事务核心研发。

金融分布式应用开发的痛点

分布式系统有一个比较明显的问题就是,一个业务流程需要组合一组服务。这样的事情在微服务下就更为明显了,因为这需要业务上的一致性的保证。也就是说,如果一个步骤失败了,那么要么回滚到以前的服务调用,要么不断重试保证所有的步骤都成功。---《左耳听风-弹力设计之“补偿事务”》

而在金融领域微服务架构下的业务流程往往会更复杂,流程很长,比如一个互联网微贷业务流程调十几个服务很正常,再加上异常处理的流程那就更复杂了,做过金融业务开发的同学会很有体感。

所以在金融分布式应用开发过程中我们面临一些痛点:

  • 业务一致性难以保障

我们接触到的大多数业务(比如在渠道层、产品层、集成层的系统),为了保障业务最终一致性,往往会采用“补偿”的方式来做,如果没有一个协调器来支持,开发难度是比较大的,每一步都要在 catch 里去处理前面所有的“回滚”操作,这将会形成“箭头形”的代码,可读性及维护性差。或者重试异常的操作,如果重试不成功可能要转异步重试,甚至最后转人工处理。这些都给开发人员带来极大的负担,开发效率低,且容易出错。

  • 业务状态难以管理

业务实体很多、实体的状态也很多,往往做完一个业务活动后就将实体的状态更新到了数据库里,没有一个状态机来管理整个状态的变迁过程,不直观,容易出错,造成业务进入一个不正确的状态。

  • 幂等性难以保障

服务的幂等性是分布式环境下的基本要求,为了保证服务的幂等性往往需要服务开发者逐个去设计,有用数据库唯一键实现的,有用分布式缓存实现的,没有一个统一的方案,开发人员负担大,也容易遗漏,从而造成资损。

  • 业务监控运维难,缺乏统一的差错守护能力

业务的执行情况监控一般通过打印日志,再基于日志监控平台查看,大多数情况是没有问题的,但是如果业务出错,这些监控缺乏当时的业务上下文,对排查问题不友好,往往需要再去数据库里查。同时日志的打印也依赖于开发,容易遗漏。对于补偿事务往往需要有“差错守护触发补偿”、“工人触发补偿”操作,没有统一的差错守护和处理规范,这些都要开发者逐个开发,负担沉重。

理论基础

一些场景下,我们对数据有强一致性的需求时,会采用在业务层上需要使用“两阶段提交”这样的分布式事务方案。而在另外一些场景下,我们并不需要这么强的一致性,那就只需要保证最终一致性就可以了。

例如蚂蚁金服目前在金融核心系统使用的就是 TCC 模式,金融核心系统的特点是一致性要求高(业务上的隔离性)、短流程、并发高。

而在很多金融核心以上的业务(比如在渠道层、产品层、集成层的系统),这些系统的特点是最终一致即可、流程多、流程长、还可能要调用其它公司的服务(如金融网络)。这是如果每个服务都开发 Try、Confirm、Cancel 三个方法成本高。如果事务中有其它公司的服务,也无法要求其它公司的服务也遵循 TCC 这种开发模式。同时流程长,事务边界太长会影响性能。

对于事务我们都知道 ACID,也很熟悉 CAP 理论最多只能满足其中两个,所以,为了提高性能,出现了 ACID 的一个变种 BASE。ACID 强调的是一致性(CAP 中的 C),而 BASE 强调的是可用性(CAP 中的 A)。我们知道,在很多情况下,我们是无法做到强一致性的 ACID 的。特别是我们需要跨多个系统的时候,而且这些系统还不是由一个公司所提供的。BASE 的系统倾向于设计出更加有弹力的系统,在短时间内,就算是有数据不同步的风险,我们也应该允许新的交易可以发生,而后面我们在业务上将可能出现问题的事务通过补偿的方式处理掉,以保证最终的一致性。

所以我们在实际开发中会进行取舍,对于更多的金融核心以上的业务系统可以采用补偿事务,补偿事务处理方面在 30 年前就提出了  Saga 理论,随着微服务的发展,近些年才逐步受到大家的关注。目前业界比较也公认 Saga 是作为长事务的解决方案。

https://github.com/aphyr/dist-sagas/blob/master/sagas.pdf[1] > http://microservices.io/patterns/data/saga.html[2]

社区和业界的方案

Apache Camel Saga

Camel 是实现 EIP(Enterprise Integration Patterns)企业集成模式的一款开源产品,它基于事件驱动的架构,有着良好的性能和吞吐量,它在 2.21 版本新增加了 Saga EIP。

Saga EIP 提供了一种方式可以通过 camel route 定义一系列有关联关系的 Action,这些 Action 要么都执行成功,要么都回滚,Saga 可以协调任何通讯协议的分布式服务或本地服务,并达到全局的最终一致性。Saga 不要求整个处理在短时间内完成,因为它不占用任何数据库锁,它可以支持需要长时间处理的请求,从几秒到几天,Camel 的 Saga EIP 是基于  Microprofile 的 LRA[3](Long Running Action),同样也是支持协调任何通讯协议任何语言实现的分布式服务。

Saga 的实现不会对数据进行加锁,而是在给操作定义它的“补偿操作”,当正常流程执行出错的时候触发那些已经执行过的操作的“补偿操作”,将流程回滚掉。“补偿操作”可以在 Camel route 上用 Java 或 XML DSL(Definition Specific Language)来定义。

下面是一个 Java DSL 示例:

// action
from("direct:reserveCredit")
.bean(idService, "generateCustomId") // generate a custom Id and set it in the body
.to("direct:creditReservation")

// delegate action
from("direct:creditReservation")
.saga()
.propagation(SagaPropagation.SUPPORTS)
.option("CreditId", body()) // mark the current body as needed in the compensating action
.compensation("direct:creditRefund")
.bean(creditService, "reserveCredit")
.log("Credit ${header.amount} reserved. Custom Id used is ${body}");

// called only if the saga is cancelled
from("direct:creditRefund")
.transform(header("CreditId")) // retrieve the CreditId option from headers
.bean(creditService, "refundCredit")
.log("Credit for Custom Id ${body} refunded");

XML DSL 示例:

<route>
<from uri="direct:start"/>
<saga>
<compensation uri="direct:compensation" />
<completion uri="direct:completion" />
<option optionName="myOptionKey">
<constant>myOptionValue</constant>
</option>
<option optionName="myOptionKey2">
<constant>myOptionValue2</constant>
</option>
</saga>
<to uri="direct:action1" />
<to uri="direct:action2" />
</route>

Eventuate Tram Saga

Eventuate Tram Saga[4]  框架是使用 JDBC / JPA 的 Java 微服务的一个 Saga 框架。它也和 Camel Saga 一样采用了  Java DSL 来定义补偿操作:

public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {

private SagaDefinition<CreateOrderSagaData> sagaDefinition =
step()
.withCompensation(this::reject)
.step()
.invokeParticipant(this::reserveCredit)
.step()
.invokeParticipant(this::approve)
.build();


@Override
public SagaDefinition<CreateOrderSagaData> getSagaDefinition() {
return this.sagaDefinition;
}


private CommandWithDestination reserveCredit(CreateOrderSagaData data) {
long orderId = data.getOrderId();
Long customerId = data.getOrderDetails().getCustomerId();
Money orderTotal = data.getOrderDetails().getOrderTotal();
return send(new ReserveCreditCommand(customerId, orderId, orderTotal))
.to("customerService")
.build();

...

Apache ServiceComb Saga

ServiceComb Saga[5]  也是一个微服务应用的数据最终一致性解决方案。相对于  TCC  而言,在 try 阶段,Saga 会直接提交事务,后续 rollback 阶段则通过反向的补偿操作来完成。与前面两种不同是它是采用 Java 注解+拦截器的方式来进行“补偿”服务的定义。

架构:

Saga 是由  alpha  和  **omega **组成,其中:

  • alpha 充当协调者的角色,主要负责对事务进行管理和协调;
  • omega 是微服务中内嵌的一个 agent,负责对网络请求进行拦截并向 alpha 上报事务事件;

下图展示了 alpha,omega 以及微服务三者的关系:
ServiceComb Saga

使用示例:

public class ServiceA extends AbsService implements IServiceA {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Autowired
private IServiceB serviceB;

@Autowired
private IServiceC serviceC;

@Override
public String getServiceName() {
return "servicea";
}

@Override
public String getTableName() {
return "testa";
}

@Override
@SagaStart
@Compensable(compensationMethod = "cancelRun")
@Transactional(rollbackFor = Exception.class)
public Object run(InvokeContext invokeContext) throws Exception {
LOG.info("A.run called");
doRunBusi();
if (invokeContext.isInvokeB(getServiceName())) {
serviceB.run(invokeContext);
}
if (invokeContext.isInvokeC(getServiceName())) {
serviceC.run(invokeContext);
}
if (invokeContext.isException(getServiceName())) {
LOG.info("A.run exception");
throw new Exception("A.run exception");
}
return null;
}

public void cancelRun(InvokeContext invokeContext) {
LOG.info("A.cancel called");
doCancelBusi();
}

蚂蚁金服的实践

蚂蚁金服内部大规模在使用 TCC 模式分布式事务,主要用于金融核心等对一致性要求高、性能要求高的场景。在更上层的业务系统因为流程多流程长,开发 TCC 成本比较高,大都会权衡采用 Saga 模式来到达业务最终一致性,由于历史的原因不同的 BU 有自己的一套“补偿”事务的方案,基本上是两种:

  • 一种是当一个服务在失败时需要“重试”或“补偿”时,在执行服务前在数据库插入一条记录,记录状态,当异常时通过定时任务去查询数据库记录并进行“重试”或“补偿”,当业务流程执行成功则删除记录;
  • 另一种是设计一个状态机引擎和简单的 DSL,编排业务流程和记录业务状态,状态机引擎可以定义“补偿服务”,当异常时由状态机引擎反向调用“补偿服务”进行回滚,同时还会有一个“差错守护”平台,监控那些执行失败或补偿失败的业务流水,并不断进行“补偿”或“重试”;

方案对比

社区和业界的解决方案一般是两种,一种基本状态机或流程引擎通过 DSL 方式编排流程程和补偿定义,一种是基于 Java 注解+拦截器实现补偿,那么这两种方案有什么优缺点呢?

方式优点缺点
状态机+DSL
- 可以用可视化工具来定义业务流程,标准化,可读性高,可实现服务编排的功能
- 提高业务分析人员与程序开发人员的沟通效率
- 业务状态管理:流程本质就是一个状态机,可以很好的反映业务状态的流转
- 提高异常处理灵活性:可以实现宕机恢复后的“向前重试”或“向后补偿”
- 天然可以使用 Actor 模型或 SEDA 架构等异步处理引擎来执行,提高整体吞吐量

- 业务流程实际是由 JAVA 程序与 DSL 配置组成,程序与配置分离,开发起来比较繁琐
- 如果是改造现有业务,对业务侵入性高
- 引擎实现成本高
拦截器+java 注解
- 程序与注解是在一起的,开发简单,学习成本低
- 方便接入现有业务
- 基于动态代理拦截器,框架实现成本低

- 框架无法提供 Actor 模型或 SEDA 架构等异步处理模式来提高系统吞吐量
- 框架无法提供业务状态管理
- 难以实现宕机恢复后的“向前重试”,因为无法恢复线程上下文

Seata Saga 的方案

Seata Saga 的简介可以看一下《Seata Saga 官网文档》[6]。

Seata Saga 采用了状态机+DSL 方案来实现,原因有以下几个:

  • 状态机+DSL 方案在实际生产中应用更广泛;
  • 可以使用 Actor 模型或 SEDA 架构等异步处理引擎来执行,提高整体吞吐量;
  • 通常在核心系统以上层的业务系统会伴随有“服务编排”的需求,而服务编排又有事务最终一致性要求,两者很难分割开,状态机+DSL 方案可以同时满足这两个需求;
  • 由于 Saga 模式在理论上是不保证隔离性的,在极端情况下可能由于脏写无法完成回滚操作,比如举一个极端的例子, 分布式事务内先给用户 A 充值,然后给用户 B 扣减余额,如果在给 A 用户充值成功,在事务提交以前,A 用户把线消费掉了,如果事务发生回滚,这时则没有办法进行补偿了,有些业务场景可以允许让业务最终成功,在回滚不了的情况下可以继续重试完成后面的流程,状态机+DSL 的方案可以实现“向前”恢复上下文继续执行的能力, 让业务最终执行成功,达到最终一致性的目的。

在不保证隔离性的情况下:业务流程设计时要遵循“宁可长款, 不可短款”的原则,长款意思是客户少了线机构多了钱,以机构信誉可以给客户退款,反之则是短款,少的线可能追不回来了。所以在业务流程设计上一定是先扣款。

状态定义语言(Seata State Language)

  1. 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件;

  2. 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点;

  3. 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚;

    注意: 异常发生时是否进行补偿也可由用户自定义决定

  4. 可以实现服务编排需求,支持单项选择、并发、异步、子状态机、参数转换、参数映射、服务执行状态判断、异常捕获等功能;

假设有一个业务流程要调两个服务,先调库存扣减(InventoryService),再调余额扣减(BalanceService),保证在一个分布式内要么同时成功,要么同时回滚。两个参与者服务都有一个 reduce 方法,表示库存扣减或余额扣减,还有一个 compensateReduce 方法,表示补偿扣减操作。以  InventoryService 为例看一下它的接口定义:

public interface InventoryService {

/**
* reduce
* @param businessKey
* @param amount
* @param params
* @return
*/
boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);

/**
* compensateReduce
* @param businessKey
* @param params
* @return
*/
boolean compensateReduce(String businessKey, Map<String, Object> params);
}

这个业务流程对应的状态图:

示例状态图
对应的 JSON

{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": ["$.[businessKey]", "$.[count]"],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Expression": "[reduceInventoryResult] == true",
"Next": "ReduceBalance"
}
],
"Default": "Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException": "$.[mockReduceBalanceFail]"
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": ["java.lang.Throwable"],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}

状态语言在一定程度上参考了  AWS Step Functions[7]。

"状态机" 属性简介:

  • Name: 表示状态机的名称,必须唯一;
  • Comment: 状态机的描述;
  • Version: 状态机定义版本;
  • StartState: 启动时运行的第一个"状态";
  • States: 状态列表,是一个 map 结构,key 是"状态"的名称,在状态机内必须唯一;

"状态" 属性简介:

  • Type:"状态" 的类型,比如有:
    • ServiceTask: 执行调用服务任务;
    • Choice: 单条件选择路由;
    • CompensationTrigger: 触发补偿流程;
    • Succeed: 状态机正常结束;
    • Fail: 状态机异常结束;
    • SubStateMachine: 调用子状态机;
  • ServiceName: 服务名称,通常是服务的 beanId;
  • ServiceMethod: 服务方法名称;
  • CompensateState: 该"状态"的补偿"状态";
  • Input: 调用服务的输入参数列表,是一个数组,对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的  SpringEL[8], 如果是常量直接写值即可;
  • Output: 将服务返回的参数赋值到状态机上下文中,是一个 map 结构,key 为放入到状态机上文时的 key(状态机上下文也是一个 map),value 中 $. 是表示 SpringEL 表达式,表示从服务的返回参数中取值,#root 表示服务的整个返回参数;
  • Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知,我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个 map 结构,key 是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是 SpringEL 表达式判断服务返回参数,带 $Exception{开头表示判断异常类型,value 是当这个条件表达式成立时则将服务执行状态映射成这个值;
  • Catch: 捕获到异常后的路由;
  • Next: 服务执行完成后下一个执行的"状态";
  • Choices: Choice 类型的"状态"里, 可选的分支列表, 分支中的 Expression 为 SpringEL 表达式,Next 为当表达式成立时执行的下一个"状态";
  • ErrorCode: Fail 类型"状态"的错误码;
  • Message: Fail 类型"状态"的错误信息;

更多详细的状态语言解释请看《Seata Saga 官网文档》[6http://seata.io/zh-cn/docs/user/saga.html]。

状态机引擎原理:

状态机引擎原理

  • 图中的状态图是先执行 stateA, 再执行 stataB,然后执行 stateC;
  • "状态"的执行是基于事件驱动的模型,stataA 执行完成后,会产生路由消息放入 EventQueue,事件消费端从 EventQueue 取出消息,执行 stateB;
  • 在整个状态机启动时会调用 Seata Server 开启分布式事务,并生产 xid, 然后记录"状态机实例"启动事件到本地数据库;
  • 当执行到一个"状态"时会调用 Seata Server 注册分支事务,并生产 branchId, 然后记录"状态实例"开始执行事件到本地数据库;
  • 当一个"状态"执行完成后会记录"状态实例"执行结束事件到本地数据库, 然后调用 Seata Server 上报分支事务的状态;
  • 当整个状态机执行完成,会记录"状态机实例"执行完成事件到本地数据库, 然后调用 Seata Server 提交或回滚分布式事务;

状态机引擎设计:

状态机引擎设计

状态机引擎的设计主要分成三层, 上层依赖下层,从下往上分别是:

  • Eventing 层:

    • 实现事件驱动架构, 可以压入事件, 并由消费端消费事件, 本层不关心事件是什么消费端执行什么,由上层实现;
  • ProcessController 层:

    • 由于上层的 Eventing 驱动一个“空”流程执行的执行,"state"的行为和路由都未实现,由上层实现;

      基于以上两层理论上可以自定义扩展任何"流程"引擎。这两层的设计是参考了内部金融网络平台的设计。

  • StateMachineEngine 层:

    • 实现状态机引擎每种 state 的行为和路由逻辑;
    • 提供 API、状态机语言仓库;

Saga 模式下服务设计的实践经验

下面是实践中总结的在 Saga 模式下微服务设计的一些经验,当然这是推荐做法,并不是说一定要 100% 遵循,没有遵循也有“绕过”方案。

好消息:Seata Saga 模式对微服务的接口参数没有任务要求,这使得 Saga 模式可用于集成遗留系统或外部机构的服务。

允许空补偿

  • 空补偿:原服务未执行,补偿服务执行了;
  • 出现原因:
    • 原服务 超时(丢包);
    • Saga 事务触发 回滚;
    • 未收到原服务请求,先收到补偿请求;

所以服务设计时需要允许空补偿,即没有找到要补偿的业务主键时返回补偿成功并将原业务主键记录下来。

防悬挂控制

  • 悬挂:补偿服务 比 原服务 先执行;
  • 出现原因:
    • 原服务 超时(拥堵);
    • Saga 事务回滚,触发 回滚;
    • 拥堵的原服务到达;

所以要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝服务的执行。

幂等控制

  • 原服务与补偿服务都需要保证幂等性, 由于网络可能超时,可以设置重试策略,重试发生时要通过幂等控制避免业务数据重复更新。

总结

很多时候我们不需要强调强一性,我们基于 BASE 和 Saga 理论去设计更有弹性的系统,在分布式架构下获得更好的性能和容错能力。分布式架构没有银弹,只有适合特定场景的方案,事实上 Seata Saga 是一个具备“服务编排”和“Saga 分布式事务”能力的产品,总结下来它的适用场景是:

  • 适用于微服务架构下的“长事务”处理;
  • 适用于微服务架构下的“服务编排”需求;
  • 适用于金融核心系统以上的有大量组合服务的业务系统(比如在渠道层、产品层、集成层的系统);
  • 适用于业务流程中需要集成遗留系统或外部机构提供的服务的场景(这些服务不可变不能对其提出改造要求)。

文中涉及相关链接

[1]https://github.com/aphyr/dist-sagas/blob/master/sagas.pdf
[2]http://microservices.io/patterns/data/saga.html
[3]Microprofile 的 LRAhttps://github.com/eclipse/microprofile-sandbox/tree/master/proposals/0009-LRA
[4]Eventuate Tram Sagahttps://github.com/eventuate-tram/eventuate-tram-sagas
[5]ServiceComb Sagahttps://github.com/apache/servicecomb-pack
[6]Seata Saga 官网文档http://seata.io/zh-cn/docs/user/saga.html
[7]AWS Step Functionshttps://docs.aws.amazon.com/zh_cn/step-functions/latest/dg/tutorial-creating-lambda-state-machine.html
[8]SpringELhttps://docs.spring.io/spring/docs/4.3.10.RELEASE/spring-framework-reference/html/expressions.html

· 阅读需 26 分钟

作者:屹远(陈龙),蚂蚁金服分布式事务框架核心研发。
本文根据 8 月 11 日 SOFA Meetup#3 广州站 《分布式事务 Seata 及其三种模式详解》主题分享整理,着重分享分布式事务产生的背景、理论基础,以及 Seata 分布式事务的原理以及三种模式(AT、TCC、Saga)的分布式事务实现。

现场回顾视频以及 PPT 见文末链接。

3 分布式事务 Seata 三种模式详解-屹远.jpg

一、分布式事务产生的背景

1.1 分布式架构演进之 - 数据库的水平拆分

蚂蚁金服的业务数据库起初是单库单表,但随着业务数据规模的快速发展,数据量越来越大,单库单表逐渐成为瓶颈。所以我们对数据库进行了水平拆分,将原单库单表拆分成数据库分片。

如下图所示,分库分表之后,原来在一个数据库上就能完成的写操作,可能就会跨多个数据库,这就产生了跨数据库事务问题。

image.png

1.2 分布式架构演进之 - 业务服务化拆分

在业务发展初期,“一块大饼”的单业务系统架构,能满足基本的业务需求。但是随着业务的快速发展,系统的访问量和业务复杂程度都在快速增长,单系统架构逐渐成为业务发展瓶颈,解决业务系统的高耦合、可伸缩问题的需求越来越强烈。

如下图所示,蚂蚁金服按照面向服务架构(SOA)的设计原则,将单业务系统拆分成多个业务系统,降低了各系统之间的耦合度,使不同的业务系统专注于自身业务,更有利于业务的发展和系统容量的伸缩。

image.png

业务系统按照服务拆分之后,一个完整的业务往往需要调用多个服务,如何保证多个服务间的数据一致性成为一个难题。

二、分布式事务理论基础

2.1 两阶段提交协议

16_16_18__08_13_2019.jpg

两阶段提交协议:事务管理器分两个阶段来协调资源管理器,第一阶段准备资源,也就是预留事务所需的资源,如果每个资源管理器都资源预留成功,则进行第二阶段资源提交,否则协调资源管理器回滚资源。

2.2 TCC

16_16_51__08_13_2019.jpg

TCC(Try-Confirm-Cancel) 实际上是服务化的两阶段提交协议,业务开发者需要实现这三个服务接口,第一阶段服务由业务代码编排来调用 Try 接口进行资源预留,所有参与者的 Try 接口都成功了,事务管理器会提交事务,并调用每个参与者的 Confirm 接口真正提交业务操作,否则调用每个参与者的 Cancel 接口回滚事务。

2.3 Saga

3 分布式事务 Seata 三种模式详解-屹远-9.jpg

Saga 是一种补偿协议,在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。

分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任何一个正向操作执行失败,那么分布式事务会退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。

Saga 理论出自 Hector & Kenneth 1987发表的论文 Sagas。

Saga 正向服务与补偿服务也需要业务开发者实现。

三、Seata 及其三种模式详解

3.1 分布式事务 Seata 介绍

Seata(Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架)是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。Seata 开源半年左右,目前已经有超过 1.1 万 star,社区非常活跃。我们热忱欢迎大家参与到 Seata 社区建设中,一同将 Seata 打造成开源分布式事务标杆产品。

Seata:https://github.com/apache/incubator-seata

image.png

3.2 分布式事务 Seata 产品模块

如下图所示,Seata 中有三大模块,分别是 TM、RM 和 TC。 其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。

image.png

在 Seata 中,分布式事务的执行流程:

  • TM 开启分布式事务(TM 向 TC 注册全局事务记录);
  • 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
  • TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
  • TC 汇总事务信息,决定分布式事务是提交还是回滚;
  • TC 通知所有 RM 提交/回滚 资源,事务二阶段结束;

3.3 分布式事务 Seata 解决方案

Seata 会有 4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。

15_49_23__08_13_2019.jpg

2.3.1 AT 模式

今年 1 月份,Seata 开源了 AT 模式。AT 模式是一种无侵入的分布式事务解决方案。在 AT 模式下,用户只需关注自己的“业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操作。

image.png

AT 模式如何做到对业务的无侵入 :
  • 一阶段:

在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,然后执行“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

图片3.png

  • 二阶段提交:

二阶段如果是提交的话,因为“业务 SQL”在一阶段已经提交至数据库, 所以 Seata 框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。

图片4.png

  • 二阶段回滚:

二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

图片5.png

AT 模式的一阶段、二阶段提交和回滚均由 Seata 框架自动生成,用户只需编写“业务 SQL”,便能轻松接入分布式事务,AT 模式是一种对业务无任何侵入的分布式事务解决方案。

2.3.2 TCC 模式

2019 年 3 月份,Seata 开源了 TCC 模式,该模式由蚂蚁金服贡献。TCC 模式需要用户根据自己的业务场景实现 Try、Confirm 和 Cancel 三个操作;事务发起方在一阶段执行 Try 方式,在二阶段提交执行 Confirm 方法,二阶段回滚执行 Cancel 方法。

图片6.png

TCC 三个方法描述:

  • Try:资源的检测和预留;
  • Confirm:执行的业务操作提交;要求 Try 成功 Confirm 一定要能成功;
  • Cancel:预留资源释放;

蚂蚁金服在 TCC 的实践经验
**
16_48_02__08_13_2019.jpg

1 TCC 设计 - 业务模型分 2 阶段设计:

用户接入 TCC ,最重要的是考虑如何将自己的业务模型拆成两阶段来实现。

以“扣钱”场景为例,在接入 TCC 前,对 A 账户的扣钱,只需一条更新账户余额的 SQL 便能完成;但是在接入 TCC 之后,用户就需要考虑如何将原来一步就能完成的扣钱操作,拆成两阶段,实现成三个方法,并且保证一阶段 Try  成功的话 二阶段 Confirm 一定能成功。

图片7.png

如上图所示,

Try 方法作为一阶段准备方法,需要做资源的检查和预留。在扣钱场景下,Try 要做的事情是就是检查账户余额是否充足,预留转账资金,预留的方式就是冻结 A 账户的 转账资金。Try 方法执行之后,账号 A 余额虽然还是 100,但是其中 30 元已经被冻结了,不能被其他事务使用。

二阶段 Confirm 方法执行真正的扣钱操作。Confirm 会使用 Try 阶段冻结的资金,执行账号扣款。Confirm 方法执行之后,账号 A 在一阶段中冻结的 30 元已经被扣除,账号 A 余额变成 70 元 。

如果二阶段是回滚的话,就需要在 Cancel 方法内释放一阶段 Try 冻结的 30 元,使账号 A 的回到初始状态,100 元全部可用。

用户接入 TCC 模式,最重要的事情就是考虑如何将业务模型拆成 2 阶段,实现成 TCC 的 3 个方法,并且保证 Try 成功 Confirm 一定能成功。相对于 AT 模式,TCC 模式对业务代码有一定的侵入性,但是 TCC 模式无 AT 模式的全局行锁,TCC 性能会比 AT 模式高很多。

2 TCC 设计 - 允许空回滚:
**
16_51_44__08_13_2019.jpg

Cancel 接口设计时需要允许空回滚。在 Try 接口因为丢包时没有收到,事务管理器会触发回滚,这时会触发 Cancel 接口,这时 Cancel 执行时发现没有对应的事务 xid 或主键时,需要返回回滚成功。让事务服务管理器认为已回滚,否则会不断重试,而 Cancel 又没有对应的业务数据可以进行回滚。

3 TCC 设计 - 防悬挂控制:
**
16_51_56__08_13_2019.jpg

悬挂的意思是:Cancel 比 Try 接口先执行,出现的原因是 Try 由于网络拥堵而超时,事务管理器生成回滚,触发 Cancel 接口,而最终又收到了 Try 接口调用,但是 Cancel 比 Try 先到。按照前面允许空回滚的逻辑,回滚会返回成功,事务管理器认为事务已回滚成功,则此时的 Try 接口不应该执行,否则会产生数据不一致,所以我们在 Cancel 空回滚返回成功之前先记录该条事务 xid 或业务主键,标识这条记录已经回滚过,Try 接口先检查这条事务xid或业务主键如果已经标记为回滚成功过,则不执行 Try 的业务操作。

4 TCC 设计 - 幂等控制:
**
16_52_07__08_13_2019.jpg

幂等性的意思是:对同一个系统,使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的。因为网络抖动或拥堵可能会超时,事务管理器会对资源进行重试操作,所以很可能一个业务操作会被重复调用,为了不因为重复调用而多次占用资源,需要对服务设计时进行幂等控制,通常我们可以用事务 xid 或业务主键判重来控制。

2.3.3 Saga 模式

Saga 模式是 Seata 即将开源的长事务解决方案,将由蚂蚁金服主要贡献。在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。

分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任何一个正向操作执行失败,那么分布式事务会去退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。

图片8.png

Saga 模式下分布式事务通常是由事件驱动的,各个参与者之间是异步执行的,Saga 模式是一种长事务解决方案。

1 Saga 模式使用场景
**
16_44_58__08_13_2019.jpg

Saga 模式适用于业务流程长且需要保证事务最终一致性的业务系统,Saga 模式一阶段就会提交本地事务,无锁、长流程情况下可以保证性能。

事务参与者可能是其它公司的服务或者是遗留系统的服务,无法进行改造和提供 TCC 要求的接口,可以使用 Saga 模式。

Saga模式的优势是:

  • 一阶段提交本地数据库事务,无锁,高性能;
  • 参与者可以采用事务驱动异步执行,高吞吐;
  • 补偿服务即正向服务的“反向”,易于理解,易于实现;

缺点:Saga 模式由于一阶段已经提交本地数据库事务,且没有进行“预留”动作,所以不能保证隔离性。后续会讲到对于缺乏隔离性的应对措施。
2 基于状态机引擎的 Saga 实现

17_13_19__08_13_2019.jpg

目前 Saga 的实现一般有两种,一种是通过事件驱动架构实现,一种是基于注解加拦截器拦截业务的正向服务实现。Seata 目前是采用事件驱动的机制来实现的,Seata 实现了一个状态机,可以编排服务的调用流程及正向服务的补偿服务,生成一个 json 文件定义的状态图,状态机引擎驱动到这个图的运行,当发生异常的时候状态机触发回滚,逐个执行补偿服务。当然在什么情况下触发回滚用户是可以自定义决定的。该状态机可以实现服务编排的需求,它支持单项选择、并发、异步、子状态机调用、参数转换、参数映射、服务执行状态判断、异常捕获等功能。

3 状态机引擎原理

16_45_32__08_13_2019.jpg

该状态机引擎的基本原理是,它基于事件驱动架构,每个步骤都是异步执行的,步骤与步骤之间通过事件队列流转,
极大的提高系统吞吐量。每个步骤执行时会记录事务日志,用于出现异常时回滚时使用,事务日志会记录在与业务表所在的数据库内,提高性能。

4 状态机引擎设计

16_45_46__08_13_2019.jpg

该状态机引擎分成了三层架构的设计,最底层是“事件驱动”层,实现了 EventBus 和消费事件的线程池,是一个 Pub-Sub 的架构。第二层是“流程控制器”层,它实现了一个极简的流程引擎框架,它驱动一个“空”的流程执行,“空”的意思是指它不关心流程节点做什么事情,它只执行每个节点的 process 方法,然后执行 route 方法流转到下一个节点。这是一个通用框架,基于这两层,开发者可以实现任何流程引擎。最上层是“状态机引擎”层,它实现了每种状态节点的“行为”及“路由”逻辑代码,提供 API 和状态图仓库,同时还有一些其它组件,比如表达式语言、逻辑计算器、流水生成器、拦截器、配置管理、事务日志记录等。

5 Saga 服务设计经验

和TCC类似,Saga的正向服务与反向服务也需求遵循以下设计原则:

1)Saga 服务设计 - 允许空补偿
**
16_52_22__08_13_2019.jpg

2)Saga 服务设计 - 防悬挂控制
**
16_52_52__08_13_2019.jpg

3)Saga 服务设计 - 幂等控制
**
3 分布式事务 Seata 三种模式详解-屹远-31.jpg

4)Saga 设计 - 自定义事务恢复策略
**
16_53_07__08_13_2019.jpg

前面讲到 Saga 模式不保证事务的隔离性,在极端情况下可能出现脏写。比如在分布式事务未提交的情况下,前一个服务的数据被修改了,而后面的服务发生了异常需要进行回滚,可能由于前面服务的数据被修改后无法进行补偿操作。这时的一种处理办法可以是“重试”继续往前完成这个分布式事务。由于整个业务流程是由状态机编排的,即使是事后恢复也可以继续往前重试。所以用户可以根据业务特点配置该流程的事务处理策略是优先“回滚”还是“重试”,当事务超时的时候,Server 端会根据这个策略不断进行重试。

由于 Saga 不保证隔离性,所以我们在业务设计的时候需要做到“宁可长款,不可短款”的原则,长款是指在出现差错的时候站在我方的角度钱多了的情况,钱少了则是短款,因为如果长款可以给客户退款,而短款则可能钱追不回来了,也就是说在业务设计的时候,一定是先扣客户帐再入帐,如果因为隔离性问题造成覆盖更新,也不会出现钱少了的情况。

6 基于注解和拦截器的 Saga 实现
**
17_13_37__08_13_2019.jpg

还有一种 Saga 的实现是基于注解+拦截器的实现,Seata 目前没有实现,可以看上面的伪代码来理解一下,one 方法上定义了 @SagaCompensable 的注解,用于定义 one 方法的补偿方法是 compensateOne 方法。然后在业务流程代码 processA 方法上定义 @SagaTransactional 注解,启动 Saga 分布式事务,通过拦截器拦截每个正向方法当出现异常的时候触发回滚操作,调用正向方法的补偿方法。

7 两种 Saga 实现优劣对比

两种 Saga 的实现各有又缺点,下面表格是一个对比:

17_13_49__08_13_2019.jpg

状态机引擎的最大优势是可以通过事件驱动的方法异步执行提高系统吞吐,可以实现服务编排需求,在 Saga 模式缺乏隔离性的情况下,可以多一种“向前重试”的事情恢复策略。注解加拦截器的的最大优势是,开发简单、学习成本低。

总结

本文先回顾了分布式事务产生的背景及理论基础,然后重点讲解了 Seata 分布式事务的原理以及三种模式(AT、TCC、Saga)的分布式事务实现。

Seata 的定位是分布式事全场景解决方案,未来还会有 XA 模式的分布式事务实现,每种模式都有它的适用场景,AT 模式是无侵入的分布式事务解决方案,适用于不希望对业务进行改造的场景,几乎0学习成本。TCC 模式是高性能分布式事务解决方案,适用于核心系统等对性能有很高要求的场景。Saga 模式是长事务解决方案,适用于业务流程长且需要保证事务最终一致性的业务系统,Saga 模式一阶段就会提交本地事务,无锁,长流程情况下可以保证性能,多用于渠道层、集成层业务系统。事务参与者可能是其它公司的服务或者是遗留系统的服务,无法进行改造和提供 TCC 要求的接口,也可以使用 Saga 模式。

本次分享的视频回顾以及PPT 查看地址:https://tech.antfin.com/community/activities/779/review

· 阅读需 15 分钟

在微服务架构体系下,我们可以按照业务模块分层设计,单独部署,减轻了服务部署压力,也解耦了业务的耦合,避免了应用逐渐变成一个庞然怪物,从而可以轻松扩展,在某些服务出现故障时也不会影响其它服务的正常运行。总之,微服务在业务的高速发展中带给我们越来越多的优势,但是微服务并不是十全十美,因此不能盲目过度滥用,它有很多不足,而且会给系统带来一定的复杂度,其中伴随而来的分布式事务问题,是微服务架构体系下必然需要处理的一个痛点,也是业界一直关注的一个领域,因此也出现了诸如 CAP 和 BASE 等理论。

在今年年初,阿里开源了一个分布式事务中间件,起初起名为 Fescar,后改名为 Seata,在它开源之初,我就知道它肯定要火,因为这是一个解决痛点的开源项目,Seata 一开始就是冲着对业务无侵入与高性能方向走,这正是我们对解决分布式事务问题迫切的需求。因为待过的几家公司,用的都是微服务架构,但是在解决分布式事务的问题上都不太优雅,所以我也在一直关注 Seata 的发展,今天就简要说说它的一些设计上的原理,后续我将会对它的各个模块进行深入源码分析,感兴趣的可以持续关注我的公众号或者博客,不要跟丢。

分布式事务解决的方案有哪些?

目前分布式事务解决的方案主要有对业务无入侵和有入侵的方案,无入侵方案主要有基于数据库 XA 协议的两段式提交(2PC)方案,它的优点是对业务代码无入侵,但是它的缺点也是很明显:必须要求数据库对 XA 协议的支持,且由于 XA 协议自身的特点,它会造成事务资源长时间得不到释放,锁定周期长,而且在应用层上面无法干预,因此它性能很差,它的存在相当于七伤拳那样“伤人七分,损己三分”,因此在互联网项目中并不是很流行这种解决方案。

为了这个弥补这种方案带来性能低的问题,大佬们又想出了很多种方案来解决,但这无一例外都需要通过在应用层做手脚,即入侵业务的方式,比如很出名的 TCC 方案,基于 TCC 也有很多成熟的框架,如 ByteTCC、tcc-transaction 等。以及基于可靠消息的最终一致性来实现,如 RocketMQ 的事务消息。

入侵代码的方案是基于现有情形“迫不得已”才推出的解决方案,实际上它们实现起来非常不优雅,一个事务的调用通常伴随而来的是对该事务接口增加一系列的反向操作,比如 TCC 三段式提交,提交逻辑必然伴随着回滚的逻辑,这样的代码会使得项目非常臃肿,维护成本高。

Seata 各模块之间的关系

针对上面所说的分布式事务解决方案的痛点,那很显然,我们理想的分布式事务解决方案肯定是性能要好而且要对业务无入侵,业务层上无需关心分布式事务机制的约束,Seata 正是往这个方向发展的,因此它非常值得期待,它将给我们的微服务架构带来质的提升。

那 Seata 是怎么做到的呢?下面说说它的各个模块之间的关系。

Seata 的设计思路是将一个分布式事务可以理解成一个全局事务,下面挂了若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以操作分布式事务像操作本地事务一样。

Seata 内部定义了 3个模块来处理全局事务和分支事务的关系和处理过程,这三个组件分别是:

  • Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
  • Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
  • Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

简要说说整个全局事务的执行步骤:

  1. TM 向 TC 申请开启一个全局事务,TC 创建全局事务后返回全局唯一的 XID,XID 会在全局事务的上下文中传播;
  2. RM 向 TC 注册分支事务,该分支事务归属于拥有相同 XID 的全局事务;
  3. TM 向 TC 发起全局提交或回滚;
  4. TC 调度 XID 下的分支事务完成提交或者回滚。

与 XA 方案有什么不同?

Seata 的事务提交方式跟 XA 协议的两段式提交在总体上来说基本是一致的,那它们之间有什么不同呢?

我们都知道 XA 协议它依赖的是数据库层面来保障事务的一致性,也即是说 XA 的各个分支事务是在数据库层面上驱动的,由于 XA 的各个分支事务需要有 XA 的驱动程序,一方面会导致数据库与 XA 驱动耦合,另一方面它会导致各个分支的事务资源锁定周期长,这也是它没有在互联网公司流行的重要因素。

基于 XA 协议以上的问题,Seata 另辟蹊径,既然在依赖数据库层会导致这么多问题,那我就从应用层做手脚,这还得从 Seata 的 RM 模块说起,前面也说过 RM 的主要作用了,其实 RM 在内部做了对数据库操作的代理层,如下:

Seata 在数据源做了一层代理层,所以我们使用 Seata 时,我们使用的数据源实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,主要是解析 SQL,把业务数据在更新前后的数据镜像组织成回滚日志,并将 undo log 日志插入 undo_log 表中,保证每条更新数据的业务 sql 都有对应的回滚日志存在。

这样做的好处就是,本地事务执行完可以立即释放本地事务锁定的资源,然后向 TC 上报分支状态。当 TM 决议全局提交时,就不需要同步协调处理了,TC 会异步调度各个 RM 分支事务删除对应的 undo log 日志即可,这个步骤非常快速地可以完成;当 TM 决议全局回滚时,RM 收到 TC 发送的回滚请求,RM 通过 XID 找到对应的 undo log 回滚日志,然后执行回滚日志完成回滚操作。

如上图所示,XA 方案的 RM 是放在数据库层的,它依赖了数据库的 XA 驱动程序。

如上图所示,Seata 的 RM 实际上是已中间件的形式放在应用层,不用依赖数据库对协议的支持,完全剥离了分布式事务方案对数据库在协议支持上的要求。

分支事务如何提交和回滚?

下面详细说说分支事务是如何提交和回滚的:

  • 第一阶段:

分支事务利用 RM 模块中对 JDBC 数据源代理,加入了若干流程,对业务 SQL 进行解释,把业务数据在更新前后的数据镜像组织成回滚日志,并生成 undo log 日志,对全局事务锁的检查以及分支事务的注册等,利用本地事务 ACID 特性,将业务 SQL 和 undo log 写入同一个事物中一同提交到数据库中,保证业务 SQL 必定存在相应的回滚日志,最后对分支事务状态向 TC 进行上报。

  • 第二阶段:

TM决议全局提交:

当 TM 决议提交时,就不需要同步协调处理了,TC 会异步调度各个 RM 分支事务删除对应的 undo log 日志即可,这个步骤非常快速地可以完成。这个机制对于性能提升非常关键,我们知道正常的业务运行过程中,事务执行的成功率是非常高的,因此可以直接在本地事务中提交,这步对于提升性能非常显著。

TM决议全局回滚:

当 TM 决议回滚时,RM 收到 TC 发送的回滚请求,RM 通过 XID 找到对应的 undo log 回滚日志,然后利用本地事务 ACID 特性,执行回滚日志完成回滚操作并删除 undo log 日志,最后向 TC 进行回滚结果上报。

业务对以上所有的流程都无感知,业务完全不关心全局事务的具体提交和回滚,而且最重要的一点是 Seata 将两段式提交的同步协调分解到各个分支事务中了,分支事务与普通的本地事务无任何差异,这意味着我们使用 Seata 后,分布式事务就像使用本地事务一样,完全将数据库层的事务协调机制交给了中间件层 Seata 去做了,这样虽然事务协调搬到应用层了,但是依然可以做到对业务的零侵入,从而剥离了分布式事务方案对数据库在协议支持上的要求,且 Seata 在分支事务完成之后直接释放资源,极大减少了分支事务对资源的锁定时间,完美避免了 XA 协议需要同步协调导致资源锁定时间过长的问题。

其它方案的补充

上面说的其实是 Seata 的默认模式,也叫 AT 模式,它是类似于 XA 方案的两段式提交方案,并且是对业务无侵入,但是这种机制依然是需要依赖数据库本地事务的 ACID 特性,有没有发现,我在上面的图中都强调了必须是支持 ACID 特性的关系型数据库,那么问题就来了,非关系型或者不支持 ACID 的数据库就无法使用 Seata 了,别慌,Seata 现阶段为我们准备了另外一种模式,叫 MT 模式,它是一种对业务有入侵的方案,提交回滚等操作需要我们自行定义,业务逻辑需要被分解为 Prepare/Commit/Rollback 3 部分,形成一个 MT 分支,加入全局事务,它存在的意义是为 Seata 触达更多的场景。

只不过,它不是 Seata “主打”的模式,它的存在仅仅作为补充的方案,从以上官方的发展远景就可以看出来,Seata 的目标是始终是对业务无入侵的方案。

注:本文图片设计参考Seata官方图

作者简介:

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。