跳到主要内容

· 阅读需 5 分钟

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

通过GA大会上滴滴出行的高级研发工程陈鹏志的在滴滴两轮车业务中的实践,发现动态降级的必要性是非常的高,所以这边简单利用spring boot aop来简单的处理降级相关的处理,这边非常感谢陈鹏志的分享!

可利用此demo项目地址

通过以下代码改造实践.

准备工作

​ 1.创建测试用的TestAspect:

package org.test.config;

import java.lang.reflect.Method;

import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.tm.api.GlobalTransaction;
import io.seata.tm.api.GlobalTransactionContext;

@Aspect
@Component
public class TestAspect {
private final static Logger logger = LoggerFactory.getLogger(TestAspect.class);

@Before("execution(* org.test.service.*.*(..))")
public void before(JoinPoint joinPoint) throws TransactionException {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
logger.info("拦截到需要分布式事务的方法," + method.getName());
// 此处可用redis或者定时任务来获取一个key判断是否需要关闭分布式事务
// 模拟动态关闭分布式事务
if ((int)(Math.random() * 100) % 2 == 0) {
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
tx.begin(300000, "test-client");
} else {
logger.info("关闭分布式事务");
}
}

@AfterThrowing(throwing = "e", pointcut = "execution(* org.test.service.*.*(..))")
public void doRecoveryActions(Throwable e) throws TransactionException {
logger.info("方法执行异常:{}", e.getMessage());
if (!StringUtils.isBlank(RootContext.getXID()))
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
}

@AfterReturning(value = "execution(* org.test.service.*.*(..))", returning = "result")
public void afterReturning(JoinPoint point, Object result) throws TransactionException {
logger.info("方法执行结束:{}", result);
if ((Boolean)result) {
if (!StringUtils.isBlank(RootContext.getXID())) {
logger.info("分布式事务Id:{}", RootContext.getXID());
GlobalTransactionContext.reload(RootContext.getXID()).commit();
}
}
}

}

请注意上面的包名可改为你自己的service包名:

​ 2.改动service代码:

    public Object seataCommit() {
testService.Commit();
return true;
}

因为异常跟返回结果我们都会拦截,所以这边可以trycatch或者直接让他抛异常来拦截也行,或者直接判断返回结果,比如你的业务代码code=200为成功,那么就commit,反之在拦截返回值那段代码加上rollback;

进行调试

​ 1.更改代码主动抛出异常

    public Object seataCommit() {
try {
testService.Commit();
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException();
}
}

​ 查看日志:

2019-12-23 11:57:55.386  INFO 23952 --- [.0-28888-exec-7] org.test.controller.TestController       : 拦截到需要分布式事务的方法,seataCommit
2019-12-23 11:57:55.489 INFO 23952 --- [.0-28888-exec-7] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.14.67:8092:2030765910]
2019-12-23 11:57:55.489 INFO 23952 --- [.0-28888-exec-7] org.test.controller.TestController : 创建分布式事务完毕192.168.14.67:8092:2030765910
2019-12-23 11:57:55.709 INFO 23952 --- [.0-28888-exec-7] org.test.controller.TestController : 方法执行异常:null
2019-12-23 11:57:55.885 INFO 23952 --- [.0-28888-exec-7] i.seata.tm.api.DefaultGlobalTransaction : [192.168.14.67:8092:2030765910] rollback status: Rollbacked
2019-12-23 11:57:55.888 ERROR 23952 --- [.0-28888-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException] with root cause

​ 可以看到已被拦截也触发了rollback了.

​ 2.恢复代码调试正常情况:

    public Object seataCommit() {
testService.Commit();
return true;
}

​ 查看日志:

2019-12-23 12:00:20.876  INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController       : 拦截到需要分布式事务的方法,seataCommit
2019-12-23 12:00:20.919 INFO 23952 --- [.0-28888-exec-2] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.14.67:8092:2030765926]
2019-12-23 12:00:20.920 INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController : 创建分布式事务完毕192.168.14.67:8092:2030765926
2019-12-23 12:00:21.078 INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController : 方法执行结束:true
2019-12-23 12:00:21.078 INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController : 分布式事务Id:192.168.14.67:8092:2030765926
2019-12-23 12:00:21.213 INFO 23952 --- [.0-28888-exec-2] i.seata.tm.api.DefaultGlobalTransaction : [192.168.14.67:8092:2030765926] commit status: Committed

​ 可以看到事务已经被提交了.

总结

更详细的内容希望希望大家访问以下地址阅读详细文档

nacos官网

dubbo官网

seata官网

docker官网

· 阅读需 8 分钟

Seata 的动态降级需要结合配置中心的动态配置订阅功能。动态配置订阅,即通过配置中心监听订阅,根据需要读取已更新的缓存值,ZK、Apollo、Nacos 等第三方配置中心都有现成的监听器可实现动态刷新配置;动态降级,即通过动态更新指定配置参数值,使得 Seata 能够在运行过程中动态控制全局事务失效(目前只有 AT 模式有这个功能)。

那么 Seata 支持的多个配置中心是如何适配不同的动态配置订阅以及如何实现降级的呢?下面从源码的层面详细给大家讲解一番。

动态配置订阅

Seata 配置中心有一个监听器基准接口,它主要有一个抽象方法和 default 方法,如下:

io.seata.config.ConfigurationChangeListener

该监听器基准接口主要有两个实现类型:

  1. 实现注册配置订阅事件监听器:用于实现各种功能的动态配置订阅,比如 GlobalTransactionalInterceptor 实现了 ConfigurationChangeListener,根据动态配置订阅实现的动态降级功能;
  2. 实现配置中心动态订阅功能与适配:对于目前还没有动态订阅功能的 file 类型默认配置中心,可以实现该基准接口来实现动态配置订阅功能;对于阻塞订阅需要另起一个线程去执行,这时候可以实现该基准接口进行适配,还可以复用该基准接口的线程池;以及还有异步订阅,有订阅单个 key,有订阅多个 key 等等,我们都可以实现该基准接口以适配各个配置中心。

Nacos 动态订阅实现

Nacos 有自己内部实现的监听器,因此直接直接继承它内部抽象监听器 AbstractSharedListener,实现如下:

如上,

  • dataId:为订阅的配置属性;
  • listener:配置订阅事件监听器,用于将外部传入的 listener 作为一个 wrapper,执行真正的变更逻辑。

值得一提的是,nacos 并没有使用 ConfigurationChangeListener 实现自己的监听配置,一方面是因为 Nacos 本身已有监听订阅功能,不需要自己再去实现;另一方面因为 nacos 属于非阻塞式订阅,不需要复用 ConfigurationChangeListener 的线程池,即无需进行适配。

添加订阅:

Nacos 配置中心为某个 dataId 添加订阅的逻辑很简单,用 dataId 和 listener 创建一个 NacosListener 调用 configService#addListener 方法,把 NacosListener 作为 dataId 的监听器,dataId 就实现了动态配置订阅功能。

file 动态订阅实现

以它的实现类 FileListener 举例子,它的实现逻辑如下:

如上,

  • dataId:为订阅的配置属性;

  • listener:配置订阅事件监听器,用于将外部传入的 listener 作为一个 wrapper,执行真正的变更逻辑,这里特别需要注意的是,该监听器与 FileListener 同样实现了 ConfigurationChangeListener 接口,只不过 FileListener 是用于给 file 提供动态配置订阅功能,而 listener 用于执行配置订阅事件

  • executor:用于处理配置变更逻辑的线程池,在 ConfigurationChangeListener#onProcessEvent 方法中用到。

FileListener#onChangeEvent 方法的实现让 file 具备了动态配置订阅的功能,它的逻辑如下:

无限循环获取订阅的配置属性当前的值,从缓存中获取旧的值,判断是否有变更,如果有变更就执行外部传入 listener 的逻辑。

ConfigurationChangeEvent 用于保存配置变更的事件类,它的成员属性如下:

这里的 getConfig 方法是如何感知 file 配置的变更呢?我们点进去,发现它最终的逻辑如下:

发现它是创建一个 future 类,然后包装成一个 Runnable 放入线程池中异步执行,最后调用 get 方法阻塞获取值,那么我们继续往下看:

allowDynamicRefresh:动态刷新配置开关;

targetFileLastModified:file 最后更改的时间缓存。

以上逻辑:

获取 file 最后更新的时间值 tempLastModified,然后对比对比缓存值 targetFileLastModified,如果 tempLastModified > targetFileLastModified,说明期间配置有更改过,这时就重新加载 file 实例,替换掉旧的 fileConfig,使得后面的操作能够获取到最新的配置值。

添加一个配置属性监听器的逻辑如下:

configListenersMap 为 FileConfiguration 的配置监听器缓存,它的数据结构如下:

ConcurrentMap<String/*dataId*/, Set<ConfigurationChangeListener>> configListenersMap

从数据结构上可看出,每个配置属性可关联多个事件监听器。

最终执行 onProcessEvent 方法,这个是监听器基准接口里面的 default 方法,它会调用 onChangeEvent 方法,即最终会调用 FileListener 中的实现。

动态降级

有了以上的动态配置订阅功能,我们只需要实现 ConfigurationChangeListener 监听器,就可以做各种各种的功能,目前 Seata 只有动态降级有用到动态配置订阅的功能。

在「Seata AT 模式启动源码分析」这篇文章中讲到,Spring 集成 Seata 的项目中,在 AT 模式启动时,会用 用GlobalTransactionalInterceptor 代替了被 GlobalTransactional 和 GlobalLock 注解的方法,GlobalTransactionalInterceptor 实现了 MethodInterceptor,最终会执行 invoker 方法,那么想要实现动态降级,就可以在这里做手脚。

  • 在 GlobalTransactionalInterceptor 中加入一个成员变量:
private volatile boolean disable; 

在构造函数中进行初始化赋值:

ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION(service.disableGlobalTransaction)这个参数目前有两个功能:

  1. 在启动时决定是否开启全局事务;
  2. 在开启全局事务后,决定是否降级。
  • 实现 ConfigurationChangeListener:

这里的逻辑简单,就是判断监听事件是否属于 ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION 配置属性,如果是,直接更新 disable 值。

  • 接下来在 GlobalTransactionalInterceptor#invoke 中做点手脚

如上,disable = true 时,不执行全局事务与全局锁。

  • 配置中心订阅降级监听器

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

在 Spring AOP 进行 wrap 逻辑过程中,当前配置中心将订阅降级事件监听器。

作者简介

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 8 分钟

Seata 可以支持多个第三方配置中心,那么 Seata 是如何同时兼容那么多个配置中心的呢?下面我给大家详细介绍下 Seata 配置中心的实现原理。

配置中心属性加载

在 Seata 配置中心,有两个默认的配置文件:

file.conf 是默认的配置属性,registry.conf 主要存储第三方注册中心与配置中心的信息,主要有两大块:

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
# ...
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
}
file {
name = "file.conf"
}
# ...
}

其中 registry 为注册中心的配置属性,这里先不讲,config 为配置中心的属性值,默认为 file 类型,即会加载本地的 file.conf 里面的属性,如果 type 为其它类型,那么会从第三方配置中心加载配置属性值。

在 config 模块的 core 目录中,有个配置工厂类 ConfigurationFactory,它的结构如下:

可以看到都是一些配置的静态常量:

REGISTRY_CONF_PREFIX、REGISTRY_CONF_SUFFIX:配置文件名、默认配置文件类型;

SYSTEM_PROPERTY_SEATA_CONFIG_NAME、ENV_SEATA_CONFIG_NAME、ENV_SYSTEM_KEY、ENV_PROPERTY_KEY:自定义文件名配置变量,也说明我们可以自定义配置中心的属性文件。

ConfigurationFactory 里面有一处静态代码块,如下:

io.seata.config.ConfigurationFactory

根据自定义文件名配置变量找出配置文件名称与类型,如果没有配置,默认使用 registry.conf,FileConfiguration 是 Seata 默认的配置实现类,如果为默认值,则会更具 registry.conf 配置文件生成 FileConfiguration 默认配置对象,这里也可以利用 SPI 机制支持第三方扩展配置实现,具体实现是继承 ExtConfigurationProvider 接口,在META-INF/services/创建一个文件并填写实现类的全路径名,如下所示:

第三方配置中心实现类加载

在静态代码块逻辑加载完配置中心属性之后,Seata 是如何选择配置中心并获取配置中心的属性值的呢?

我们刚刚也说了 FileConfiguration 是 Seata 的默认配置实现类,它继承了 AbstractConfiguration,它的基类为 Configuration,提供了获取参数值的方法:

short getShort(String dataId, int defaultValue, long timeoutMills);
int getInt(String dataId, int defaultValue, long timeoutMills);
long getLong(String dataId, long defaultValue, long timeoutMills);
// ....

那么意味着只需要第三方配置中心实现该接口,就可以整合到 Seata 配置中心了,下面我拿 zk 来做例子:

首先,第三方配置中心需要实现一个 Provider 类:

实现的 provider 方法如其名,主要是输出具体的 Configuration 实现类。

那么我们是如何获取根据配置去获取对应的第三方配置中心实现类呢?

在 Seata 项目中,获取一个第三方配置中心实现类通常是这么做的:

Configuration CONFIG = ConfigurationFactory.getInstance();

在 getInstance() 方法中主要是使用了单例模式构造配置实现类,它的构造具体实现如下:

io.seata.config.ConfigurationFactory#buildConfiguration:

首先从 ConfigurationFactory 中的静态代码块根据 registry.conf 创建的 CURRENT_FILE_INSTANCE 中获取当前环境使用的配置中心,默认为为 File 类型,我们也可以在 registry.conf 配置其它第三方配置中心,这里也是利用了 SPI 机制去加载第三方配置中心的实现类,具体实现如下:

如上,即是刚刚我所说的 ZookeeperConfigurationProvider 配置实现输出类,我们再来看看这行代码:

EnhancedServiceLoader.load(ConfigurationProvider.class,Objects.requireNonNull(configType).name()).provide();

EnhancedServiceLoader 是 Seata SPI 实现核心类,这行代码会加载 META-INF/services/META-INF/seata/目录中文件填写的类名,那么如果其中有多个配置中心实现类都被加载了怎么办呢?

我们注意到 ZookeeperConfigurationProvider 类的上面有一个注解:

@LoadLevel(name = "ZK", order = 1)

在加载多个配置中心实现类时,会根据 order 进行排序:

io.seata.common.loader.EnhancedServiceLoader#findAllExtensionClass:

io.seata.common.loader.EnhancedServiceLoader#loadFile:

这样,就不会产生冲突了。

但是我们发现 Seata 还可以用这个方法进行选择,Seata 在调用 load 方法时,还传了一个参数:

Objects.requireNonNull(configType).name()

ConfigType 为配置中心类型,是个枚举类:

public enum ConfigType {
File, ZK, Nacos, Apollo, Consul, Etcd3, SpringCloudConfig, Custom;
}

我们注意到,LoadLevel 注解上还有一个 name 属性,在进行筛选实现类时,Seata 还做了这个操作:

根据当前 configType 来判断是否等于 LoadLevel 的 name 属性,如果相等,那么就是当前配置的第三方配置中心实现类。

第三方配置中心实现类

ZookeeperConfiguration 继承了 AbstractConfiguration,它的构造方法如下:

构造方法创建了一个 zkClient 对象,这里的 FILE_CONFIG 是什么呢?

private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;

原来就是刚刚静态代码块中创建的 registry.conf 配置实现类,从该配置实现类拿到第三方配置中心的相关属性,构造第三方配置中心客户端,然后实现 Configuration 接口时:

就可以利用客户端相关方法去第三方配置获取对应的参数值了。

第三方配置中心配置同步脚本

上周末才写好,已经提交 PR 上去了,还处于 review 中,预估会在 Seata 1.0 版本提供给大家使用,敬请期待。

具体位置在 Seata 项目的 script 目录中:

config.txt 为本地配置好的值,搭建好第三方配置中心之后,运行脚本会将 config.txt 的配置同步到第三方配置中心。

作者简介

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 12 分钟

运行所使用的 demo项目地址

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

直连方式的 Seata 配置博客

Seata 整合 Nacos 配置博客

我们接着前几篇篇的基础上去配置 nacos 做配置中心跟 dubbo 注册中心.

准备工作

​ 1.安装 docker

yum -y install docker

​ 2.创建 nacos 与 seata 的数据库

/******************************************/
/* 数据库全名 = nacos */
/* 表名称 = config_info */
/******************************************/
CREATE TABLE `config_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(255) DEFAULT NULL,
`content` longtext NOT NULL COMMENT 'content',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(20) DEFAULT NULL COMMENT 'source ip',
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
`c_desc` varchar(256) DEFAULT NULL,
`c_use` varchar(64) DEFAULT NULL,
`effect` varchar(64) DEFAULT NULL,
`type` varchar(64) DEFAULT NULL,
`c_schema` text,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfo_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_aggr */
/******************************************/
CREATE TABLE `config_info_aggr` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(255) NOT NULL COMMENT 'group_id',
`datum_id` varchar(255) NOT NULL COMMENT 'datum_id',
`content` longtext NOT NULL COMMENT '内容',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfoaggr_datagrouptenantdatum` (`data_id`,`group_id`,`tenant_id`,`datum_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='增加租户字段';


/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_beta */
/******************************************/
CREATE TABLE `config_info_beta` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL COMMENT 'content',
`beta_ips` varchar(1024) DEFAULT NULL COMMENT 'betaIps',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(20) DEFAULT NULL COMMENT 'source ip',
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfobeta_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_beta';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_tag */
/******************************************/
CREATE TABLE `config_info_tag` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id',
`tag_id` varchar(128) NOT NULL COMMENT 'tag_id',
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL COMMENT 'content',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(20) DEFAULT NULL COMMENT 'source ip',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfotag_datagrouptenanttag` (`data_id`,`group_id`,`tenant_id`,`tag_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_tag';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_tags_relation */
/******************************************/
CREATE TABLE `config_tags_relation` (
`id` bigint(20) NOT NULL COMMENT 'id',
`tag_name` varchar(128) NOT NULL COMMENT 'tag_name',
`tag_type` varchar(64) DEFAULT NULL COMMENT 'tag_type',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id',
`nid` bigint(20) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`nid`),
UNIQUE KEY `uk_configtagrelation_configidtag` (`id`,`tag_name`,`tag_type`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_tag_relation';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = group_capacity */
/******************************************/
CREATE TABLE `group_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`group_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Group ID,空字符表示整个集群',
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '使用量',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数,,0表示使用默认值',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_group_id` (`group_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='集群、各Group容量信息表';

/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = his_config_info */
/******************************************/
CREATE TABLE `his_config_info` (
`id` bigint(64) unsigned NOT NULL,
`nid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(128) NOT NULL,
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL,
`md5` varchar(32) DEFAULT NULL,
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`src_user` text,
`src_ip` varchar(20) DEFAULT NULL,
`op_type` char(10) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`nid`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_did` (`data_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='多租户改造';


/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = tenant_capacity */
/******************************************/
CREATE TABLE `tenant_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`tenant_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Tenant ID',
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '使用量',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='租户容量信息表';


CREATE TABLE `tenant_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`kp` varchar(128) NOT NULL COMMENT 'kp',
`tenant_id` varchar(128) default '' COMMENT 'tenant_id',
`tenant_name` varchar(128) default '' COMMENT 'tenant_name',
`tenant_desc` varchar(256) DEFAULT NULL COMMENT 'tenant_desc',
`create_source` varchar(32) DEFAULT NULL COMMENT 'create_source',
`gmt_create` bigint(20) NOT NULL COMMENT '创建时间',
`gmt_modified` bigint(20) NOT NULL COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_info_kptenantid` (`kp`,`tenant_id`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='tenant_info';

CREATE TABLE users (
username varchar(50) NOT NULL PRIMARY KEY,
password varchar(500) NOT NULL,
enabled boolean NOT NULL
);

CREATE TABLE roles (
username varchar(50) NOT NULL,
role varchar(50) NOT NULL
);

INSERT INTO users (username, password, enabled) VALUES ('nacos', '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu', TRUE);

INSERT INTO roles (username, role) VALUES ('nacos', 'ROLE_ADMIN');

-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

​ 3.拉取 nacos 以及 seata 镜像并运行

docker run -d --name nacos -p 8848:8848 -e MODE=standalone -e MYSQL_MASTER_SERVICE_HOST=你的mysql所在ip -e MYSQL_MASTER_SERVICE_DB_NAME=nacos -e MYSQL_MASTER_SERVICE_USER=root -e MYSQL_MASTER_SERVICE_PASSWORD=mysql密码 -e MYSQL_SLAVE_SERVICE_HOST=你的mysql所在ip -e SPRING_DATASOURCE_PLATFORM=mysql -e MYSQL_DATABASE_NUM=1 nacos/nacos-server:latest
docker run -d --name seata -p 8091:8091 -e SEATA_IP=你想指定的ip -e SEATA_PORT=8091 seataio/seata-server:1.4.2

Seata 配置

​ 1.由于 seata 容器内没有内置 vim,我们可以直接将要文件夹 cp 到宿主机外来编辑好了,再 cp 回去

docker cp 容器id:seata-server/resources 你想放置的目录

​ 2.使用如下代码获取两个容器的 ip 地址

docker inspect --format='{{.NetworkSettings.IPAddress}}' ID/NAMES

​ 3.nacos-config.txt 编辑为如下内容

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.你的事务组名=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://你的mysql所在ip:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=mysql账号
store.db.password=mysql密码
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

详细参数配置请点此处

​ 4.registry.conf 编辑为如下内容

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"

nacos {
serverAddr = "nacos容器ip:8848"
namespace = ""
cluster = "default"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"

nacos {
serverAddr = "nacos容器ip:8848"
namespace = ""
}
}

​ 5.配置完成后使用如下命令,把修改完成的 registry.conf 复制到容器中,并重启查看日志运行

docker cp /home/seata/resources/registry.conf seata:seata-server/resources/
docker restart seata
docker logs -f seata

​ 6.运行 nacos-config.sh 导入 Nacos 配置

eg: sh ${SEATAPATH}/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u username -w password

具体参数释义参考:配置导入说明

​ 7.登录 nacos 控制中心查看

20191202205912

​ 如图所示便是成功了.

进行调试

​ 1.拉取博文中所示的项目,修改 test-service 的 application.yml 与 registry.conf

registry {
type = "nacos"
nacos {
serverAddr = "宿主机ip:8848"
namespace = ""
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "宿主机ip:8848"
namespace = ""
cluster = "default"
}
}

server:
port: 38888
spring:
application:
name: test-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://mysqlip:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
dubbo:
protocol:
threadpool: cached
scan:
base-packages: com.example
application:
qos-enable: false
name: testserver
registry:
id: my-registry
address: nacos://宿主机ip:8848
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config:
db-config:
field-strategy: not-empty
id-type: auto
db-type: mysql
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
auto-mapping-unknown-column-behavior: none

​ 2.把修改完成的 registry.conf 复制到 test-client-resources 中,并修改 application

spring:
application:
name: test
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://mysqlIp:3306/test?userSSL=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai
username: root
password: 123456
mvc:
servlet:
load-on-startup: 1
http:
encoding:
force: true
charset: utf-8
enabled: true
multipart:
max-file-size: 10MB
max-request-size: 10MB
dubbo:
registry:
id: my-registry
address: nacos://宿主机ip:8848
application:
name: dubbo-demo-client
qos-enable: false
server:
port: 28888
max-http-header-size: 8192
address: 0.0.0.0
tomcat:
max-http-post-size: 104857600

​ 4. 在每个所涉及的 db 执行 undo_log 脚本.

CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

​ 5.依次运行 test-service,test-client.

​ 6.查看 nacos 中服务列表是否如下图所示

20191203132351

总结

关于 nacos 与 seata 的 docker 部署已经完成了,更详细的内容希望希望大家访问以下地址阅读详细文档

nacos 官网

dubbo 官网

seata 官网

docker 官网

· 阅读需 7 分钟

项目地址

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

上次发布了直连方式的seata配置,详细可以看这篇博客

我们接着上一篇的基础上去配置nacos做配置中心跟dubbo注册中心.

准备工作

​ 1.首先去nacos的github上下载最新版本

​ 2.下载好了后,很简单,解压后到bin目录下去启动就好了,看到如图所示就成了:

​ 3.启动完毕后访问:http://127.0.0.1:8848/nacos/#/login

是不是看到这样的界面了?输入nacos(账号密码相同),先进去看看吧.

这时候可以发现没有任何服务注册

20191202204147

别急我们马上让seata服务连接进来.

Seata配置

​ 1.进入seata的conf文件夹看到这个木有?

就是它,编辑它:

20191202204353

20191202204437

​ 2.然后记得保存哦!接着我们把registry.conf文件打开编辑:

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}

都编辑好了后,我们运行nacos-config.sh,这时候我们配置的nacos-config.txt的内容已经被发送到nacos中了详细如图:

20191202205743

出现以上类似的代码就是说明成功了,接着我们登录nacos配置中心,查看配置列表,出现如图列表说明配置成功了:

20191202205912

看到了吧,你的配置已经全部都提交上去了,如果再git工具内运行sh不行的话,试着把编辑sh文件,试试改成如下操作

for line in $(cat nacos-config.txt)

do

key=${line%%=*}
value=${line#*=}
echo "\r\n set "${key}" = "${value}

result=`curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=$key&group=SEATA_GROUP&content=$value"`

if [ "$result"x == "true"x ]; then

echo "\033[42;37m $result \033[0m"

else

echo "\033[41;37 $result \033[0m"
let error++

fi

done


if [ $error -eq 0 ]; then

echo "\r\n\033[42;37m init nacos config finished, please start seata-server. \033[0m"

else

echo "\r\n\033[41;33m init nacos config fail. \033[0m"

fi

​ 3.目前我们的准备工作全部完成,我们去seata-service/bin去运行seata服务吧,如图所示就成功啦!

20191202210112

进行调试

​ 1.首先把springboot-dubbo-mybatsiplus-seata项目的pom的依赖更改,去除掉zk这些配置,因为我们使用nacos做注册中心了.

	<properties>
<webVersion>3.1</webVersion>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<HikariCP.version>3.2.0</HikariCP.version>
<mybatis-plus-boot-starter.version>3.2.0</mybatis-plus-boot-starter.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-nacos</artifactId>
<version>2.7.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<!-- <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId>
<version>7.0</version> <scope>provided</scope> </dependency> -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>

<!-- mybatis-plus begin -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<!-- mybatis-plus end -->
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0.1</version>
</dependency>
<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>2.5.4</version> </dependency> -->

<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId>
<version>3.1.0</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.freemarker/freemarker -->
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency>
<!-- 加上这个才能辨认到log4j2.yml文件 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency> <!-- 引入log4j2依赖 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId>
<version>2.11.0</version> </dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

​ 2.然后更改test-service的目录结构,删除zk的配置并更改application.yml文件,目录结构与代码:

server:
port: 38888
spring:
application:
name: test-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
dubbo:
protocol:
loadbalance: leastactive
threadpool: cached
scan:
base-packages: org。test.service
application:
qos-enable: false
name: testserver
registry:
id: my-registry
address: nacos://127.0.0.1:8848
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config:
db-config:
field-strategy: not-empty
id-type: auto
db-type: mysql
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
auto-mapping-unknown-column-behavior: none
20191202211833

​ 3.再更改registry.conf文件,如果你的nacos是其它服务器,请改成对应都ip跟端口

registry {
type = "nacos"
file {
name = "file.conf"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
}
config {
type = "nacos"
file {
name = "file.conf"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
}

​ 4.接着我们运行provideApplication

20191202212000

启动成功啦,我们再去看seata的日志:

20191202212028

成功了,这下我们一样,去修改test-client的内容,首先一样application.yml,把zk换成nacos,这里就不详细描述了,把test-service内的registry.conf,复制到client项目的resources中覆盖原来的registry.conf.

然后我们可以运行clientApplication:

20191202212114

​ 5.确认服务已经被发布并测试事务运行是否正常

20191202212203

服务成功发布出来,也被成功消费了.这下我们再去swagger中去测试回滚是否一切正常,访问http://127.0.0.1:28888/swagger-ui.html

20191202212240

恭喜你,看到这一定跟我一样成功了!

总结

关于nacos的使用跟seata的简单搭建已经完成了,更详细的内容希望希望大家访问以下地址阅读详细文档

nacos官网

dubbo官网

seata官网

· 阅读需 2 分钟

活动介绍

亮点解读

分享嘉宾

  • 季敏(清铭) 《Seata 的过去、现在和未来》 slides

  • 吴江坷《我与SEATA的开源之路以及SEATA在互联网医疗系统中的应用》 slides

    1577282651

  • 申海强(煊檍)《带你读透 Seata AT 模式的精髓》 slides

    1577282652

  • 张森 《分布式事务Seata之TCC模式详解》

    1577282653

  • 陈龙(屹远)《Seata 长事务解决方案 Saga 模式》

    1577282654

  • 陈鹏志《Seata%20在滴滴两轮车业务的实践》 slides

    1577282655

特别嘉奖

· 阅读需 11 分钟

项目地址:https://gitee.com/itCjb/springboot-dubbo-mybatisplus-seata

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

介绍

Mybatis-Plus:MyBatis-Plus(简称 MP)是一个 MyBatis 的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开发、提高效率而生。

MP配置:

<bean id="sqlSessionFactory" class="com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
</bean>

Seata:Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

AT模式机制:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
  • 二阶段:
    • 提交异步化,非常快速地完成。
    • 回滚通过一阶段的回滚日志进行反向补偿。

分析原因

​ 1.首先我们通过介绍,可以看到,mp是需要注册sqlSessionFactory,注入数据源,而Seata是通过代理数据源来保证事务的正常回滚跟提交。

​ 2.我们来看基于seata的官方demo提供的SeataAutoConfig的代码

package org.test.config;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;

import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties;
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);

@Bean(name = "dataSource") // 声明其为Bean实例
@Primary // 在同样的DataSource中,首先使用被标注的DataSource
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}",dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("装载dataSource........");
return druidDataSource;
}

/**
* init datasource proxy
*
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
logger.info("代理dataSource........");
return new DataSourceProxy(dataSource);
}

@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
factory.setDataSource(dataSourceProxy);
factory.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath*:/mapper/*.xml"));
return factory.getObject();
}

/**
* init global transaction scanner
*
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("配置seata........");
return new GlobalTransactionScanner("test-service", "test-group");
}
}

首先看到我们的seata配置数据源的类里,我们配置了一个数据源,然后又配置了一个seata代理datasource的bean,这时候.

然后我们如果直接启动mp整合seata的项目会发现,分页之类的插件会直接失效,连扫描mapper都得从代码上写,这是为什么呢?

通过阅读以上代码,是因为我们另外的配置了一个sqlSessionFactory,导致mp的sqlSessionFactory失效了,这时候我们发现了问题的所在了,即使我们不配置sqlSessionFactoryl,也会因为mp所使用的数据源不是被seata代理过后的数据源,导致分布式事务失效.但是如何解决这个问题呢?

这时候我们需要去阅读mp的源码,找到他的启动类,一看便知

/*
* Copyright (c) 2011-2020, baomidou (jobob@qq.com).
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.baomidou.mybatisplus.autoconfigure;


import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.core.config.GlobalConfig;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import com.baomidou.mybatisplus.core.incrementer.IKeyGenerator;
import com.baomidou.mybatisplus.core.injector.ISqlInjector;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.mapping.DatabaseIdProvider;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.scripting.LanguageDriver;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.type.TypeHandler;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.mapper.MapperFactoryBean;
import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import javax.sql.DataSource;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

/**
* {@link EnableAutoConfiguration Auto-Configuration} for Mybatis. Contributes a
* {@link SqlSessionFactory} and a {@link SqlSessionTemplate}.
* <p>
* If {@link org.mybatis.spring.annotation.MapperScan} is used, or a
* configuration file is specified as a property, those will be considered,
* otherwise this auto-configuration will attempt to register mappers based on
* the interface definitions in or under the root auto-configuration package.
* </p>
* <p> copy from {@link org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration}</p>
*
* @author Eddú Meléndez
* @author Josh Long
* @author Kazuki Shimizu
* @author Eduardo Macarrón
*/
@Configuration
@ConditionalOnClass({SqlSessionFactory.class, SqlSessionFactoryBean.class})
@ConditionalOnSingleCandidate(DataSource.class)
@EnableConfigurationProperties(MybatisPlusProperties.class)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
public class MybatisPlusAutoConfiguration implements InitializingBean {

private static final Logger logger = LoggerFactory.getLogger(MybatisPlusAutoConfiguration.class);

private final MybatisPlusProperties properties;

private final Interceptor[] interceptors;

private final TypeHandler[] typeHandlers;

private final LanguageDriver[] languageDrivers;

private final ResourceLoader resourceLoader;

private final DatabaseIdProvider databaseIdProvider;

private final List<ConfigurationCustomizer> configurationCustomizers;

private final List<MybatisPlusPropertiesCustomizer> mybatisPlusPropertiesCustomizers;

private final ApplicationContext applicationContext;


public MybatisPlusAutoConfiguration(MybatisPlusProperties properties,
ObjectProvider<Interceptor[]> interceptorsProvider,
ObjectProvider<TypeHandler[]> typeHandlersProvider,
ObjectProvider<LanguageDriver[]> languageDriversProvider,
ResourceLoader resourceLoader,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider,
ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mybatisPlusPropertiesCustomizerProvider,
ApplicationContext applicationContext) {
this.properties = properties;
this.interceptors = interceptorsProvider.getIfAvailable();
this.typeHandlers = typeHandlersProvider.getIfAvailable();
this.languageDrivers = languageDriversProvider.getIfAvailable();
this.resourceLoader = resourceLoader;
this.databaseIdProvider = databaseIdProvider.getIfAvailable();
this.configurationCustomizers = configurationCustomizersProvider.getIfAvailable();
this.mybatisPlusPropertiesCustomizers = mybatisPlusPropertiesCustomizerProvider.getIfAvailable();
this.applicationContext = applicationContext;
}

@Override
public void afterPropertiesSet() {
if (!CollectionUtils.isEmpty(mybatisPlusPropertiesCustomizers)) {
mybatisPlusPropertiesCustomizers.forEach(i -> i.customize(properties));
}
checkConfigFileExists();
}

private void checkConfigFileExists() {
if (this.properties.isCheckConfigLocation() && StringUtils.hasText(this.properties.getConfigLocation())) {
Resource resource = this.resourceLoader.getResource(this.properties.getConfigLocation());
Assert.state(resource.exists(),
"Cannot find config location: " + resource + " (please add config file or check your Mybatis configuration)");
}
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
// TODO 使用 MybatisSqlSessionFactoryBean 而不是 SqlSessionFactoryBean
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
factory.setDataSource(dataSource);
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation()));
}
applyConfiguration(factory);
if (this.properties.getConfigurationProperties() != null) {
factory.setConfigurationProperties(this.properties.getConfigurationProperties());
}
if (!ObjectUtils.isEmpty(this.interceptors)) {
factory.setPlugins(this.interceptors);
}
if (this.databaseIdProvider != null) {
factory.setDatabaseIdProvider(this.databaseIdProvider);
}
if (StringUtils.hasLength(this.properties.getTypeAliasesPackage())) {
factory.setTypeAliasesPackage(this.properties.getTypeAliasesPackage());
}
if (this.properties.getTypeAliasesSuperType() != null) {
factory.setTypeAliasesSuperType(this.properties.getTypeAliasesSuperType());
}
if (StringUtils.hasLength(this.properties.getTypeHandlersPackage())) {
factory.setTypeHandlersPackage(this.properties.getTypeHandlersPackage());
}
if (!ObjectUtils.isEmpty(this.typeHandlers)) {
factory.setTypeHandlers(this.typeHandlers);
}
if (!ObjectUtils.isEmpty(this.properties.resolveMapperLocations())) {
factory.setMapperLocations(this.properties.resolveMapperLocations());
}

// TODO 对源码做了一定的修改(因为源码适配了老旧的mybatis版本,但我们不需要适配)
Class<? extends LanguageDriver> defaultLanguageDriver = this.properties.getDefaultScriptingLanguageDriver();
if (!ObjectUtils.isEmpty(this.languageDrivers)) {
factory.setScriptingLanguageDrivers(this.languageDrivers);
}
Optional.ofNullable(defaultLanguageDriver).ifPresent(factory::setDefaultScriptingLanguageDriver);

// TODO 自定义枚举包
if (StringUtils.hasLength(this.properties.getTypeEnumsPackage())) {
factory.setTypeEnumsPackage(this.properties.getTypeEnumsPackage());
}
// TODO 此处必为非 NULL
GlobalConfig globalConfig = this.properties.getGlobalConfig();
// TODO 注入填充器
if (this.applicationContext.getBeanNamesForType(MetaObjectHandler.class,
false, false).length > 0) {
MetaObjectHandler metaObjectHandler = this.applicationContext.getBean(MetaObjectHandler.class);
globalConfig.setMetaObjectHandler(metaObjectHandler);
}
// TODO 注入主键生成器
if (this.applicationContext.getBeanNamesForType(IKeyGenerator.class, false,
false).length > 0) {
IKeyGenerator keyGenerator = this.applicationContext.getBean(IKeyGenerator.class);
globalConfig.getDbConfig().setKeyGenerator(keyGenerator);
}
// TODO 注入sql注入器
if (this.applicationContext.getBeanNamesForType(ISqlInjector.class, false,
false).length > 0) {
ISqlInjector iSqlInjector = this.applicationContext.getBean(ISqlInjector.class);
globalConfig.setSqlInjector(iSqlInjector);
}
// TODO 设置 GlobalConfig 到 MybatisSqlSessionFactoryBean
factory.setGlobalConfig(globalConfig);
return factory.getObject();
}

// TODO 入参使用 MybatisSqlSessionFactoryBean
private void applyConfiguration(MybatisSqlSessionFactoryBean factory) {
// TODO 使用 MybatisConfiguration
MybatisConfiguration configuration = this.properties.getConfiguration();
if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation())) {
configuration = new MybatisConfiguration();
}
if (configuration != null && !CollectionUtils.isEmpty(this.configurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.configurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration);
}

@Bean
@ConditionalOnMissingBean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
ExecutorType executorType = this.properties.getExecutorType();
if (executorType != null) {
return new SqlSessionTemplate(sqlSessionFactory, executorType);
} else {
return new SqlSessionTemplate(sqlSessionFactory);
}
}

/**
* This will just scan the same base package as Spring Boot does. If you want more power, you can explicitly use
* {@link org.mybatis.spring.annotation.MapperScan} but this will get typed mappers working correctly, out-of-the-box,
* similar to using Spring Data JPA repositories.
*/
public static class AutoConfiguredMapperScannerRegistrar implements BeanFactoryAware, ImportBeanDefinitionRegistrar {

private BeanFactory beanFactory;

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

if (!AutoConfigurationPackages.has(this.beanFactory)) {
logger.debug("Could not determine auto-configuration package, automatic mapper scanning disabled.");
return;
}

logger.debug("Searching for mappers annotated with @Mapper");

List<String> packages = AutoConfigurationPackages.get(this.beanFactory);
if (logger.isDebugEnabled()) {
packages.forEach(pkg -> logger.debug("Using auto-configuration base package '{}'", pkg));
}

BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MapperScannerConfigurer.class);
builder.addPropertyValue("processPropertyPlaceHolders", true);
builder.addPropertyValue("annotationClass", Mapper.class);
builder.addPropertyValue("basePackage", StringUtils.collectionToCommaDelimitedString(packages));
BeanWrapper beanWrapper = new BeanWrapperImpl(MapperScannerConfigurer.class);
Stream.of(beanWrapper.getPropertyDescriptors())
// Need to mybatis-spring 2.0.2+
.filter(x -> x.getName().equals("lazyInitialization")).findAny()
.ifPresent(x -> builder.addPropertyValue("lazyInitialization", "${mybatis.lazy-initialization:false}"));
registry.registerBeanDefinition(MapperScannerConfigurer.class.getName(), builder.getBeanDefinition());
}

@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
}

/**
* If mapper registering configuration or mapper scanning configuration not present, this configuration allow to scan
* mappers based on the same component-scanning path as Spring Boot itself.
*/
@Configuration
@Import(AutoConfiguredMapperScannerRegistrar.class)
@ConditionalOnMissingBean({MapperFactoryBean.class, MapperScannerConfigurer.class})
public static class MapperScannerRegistrarNotFoundConfiguration implements InitializingBean {

@Override
public void afterPropertiesSet() {
logger.debug(
"Not found configuration for registering mapper bean using @MapperScan, MapperFactoryBean and MapperScannerConfigurer.");
}
}
}

看到mp启动类里的sqlSessionFactory方法了吗,他也是一样的注入一个数据源,这时候大家应该都知道解决方法了吧?

没错,就是把被代理过的数据源给放到mp的sqlSessionFactory中.

很简单,我们需要稍微改动一下我们的seata配置类就行了

package org.test.config;

import javax.sql.DataSource;

import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import com.alibaba.druid.pool.DruidDataSource;

import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
@MapperScan("com.baomidou.springboot.mapper*")
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties;
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);
private DataSourceProxy dataSourceProxy;

@Bean(name = "dataSource") // 声明其为Bean实例
@Primary // 在同样的DataSource中,首先使用被标注的DataSource
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}", dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("装载dataSource........");
dataSourceProxy = new DataSourceProxy(druidDataSource);
return dataSourceProxy;
}

/**
* init datasource proxy
*
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy() {
logger.info("代理dataSource........");
return dataSourceProxy;
}

/**
* init global transaction scanner
*
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("配置seata........");
return new GlobalTransactionScanner("test-service", "test-group");
}
}

看代码,我们去掉了自己配置的sqlSessionFactory,直接让DataSource bean返回的是一个被代理过的bean,并且我们加入了@Primary,导致mp优先使用我们配置的数据源,这样就解决了mp因为seata代理了数据源跟创建了新的sqlSessionFactory,导致mp的插件,组件失效的bug了!

总结

踩到坑不可怕,主要又耐心的顺着每个组件实现的原理,再去思考,查找对应冲突的代码块,你一定能找到个兼容二者的方法。

· 阅读需 23 分钟

项目地址

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

事务:事务是由一组操作构成的可靠的独立的工作单元,事务具备ACID的特性,即原子性、一致性、隔离性和持久性。 ​ 分布式事务:当一个操作牵涉到多个服务,多台数据库协力完成时(比如分表分库后,业务拆分),多个服务中,本地的Transaction已经无法应对这个情况了,为了保证数据一致性,就需要用到分布式事务。 ​ Seata :是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 ​ 本文目的:现如今微服务越来越流行,而市面上的分布式事务的方案可谓不少,参差不齐,比较流行的以MQ代表的保证的是消息最终一致性的解决方案(消费确认,消息回查,消息补偿机制等),以及TX-LCN的LCN模式协调本地事务来保证事务统一提交或回滚(已经停止更新,对Dubbo2.7不兼容)。而MQ的分布式事务太过复杂,TX-LCN断更,这时候需要一个高效可靠及易上手的分布式事务解决方案,Seata脱颖而出,本文要介绍的就是如何快速搭建一个整合Seata的Demo项目,一起来吧!

准备工作

1.首先安装mysql,eclipse之类常用的工具,这不展开了.

2.访问seata下载中心地址我们使用的0.9.0版本

3.下载并解压seata-server

建库建表

1.首先我们链接mysql创建一个名为seata的数据库,然后运行一下建表sql,这个在seata-server的conf文件夹内的db_store.sql就是我的所需要使用的sql了.

/*
Navicat MySQL Data Transfer
Source Server : mysql
Source Server Version : 50721
Source Host : localhost:3306
Source Database : seata
Target Server Type : MYSQL
Target Server Version : 50721
File Encoding : 65001
Date: 2019-11-23 22:03:18
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------

-- Table structure for branch_table

-- ----------------------------

DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of branch_table

-- ----------------------------

-- ----------------------------

-- Table structure for global_table

-- ----------------------------

DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of global_table

-- ----------------------------

-- ----------------------------

-- Table structure for lock_table

-- ----------------------------

DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(96) DEFAULT NULL,
`transaction_id` mediumtext,
`branch_id` mediumtext,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of lock_table

-- ----------------------------

-- ----------------------------

-- Table structure for undo_log

-- ----------------------------

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------

-- Records of undo_log

2.运行完上面的seata所需要的数据库后,我们进行搭建我们所需要写的demo的库,创建一个名为test的数据库,然后执行以下sql代码:

/*
Navicat MySQL Data Transfer
Source Server : mysql
Source Server Version : 50721
Source Host : localhost:3306
Source Database : test
Target Server Type : MYSQL
Target Server Version : 50721
File Encoding : 65001
Date: 2019-11-23 22:03:24
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------

-- Table structure for test

-- ----------------------------

DROP TABLE IF EXISTS `test`;
CREATE TABLE `test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`one` varchar(255) DEFAULT NULL,
`two` varchar(255) DEFAULT NULL,
`createTime` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of test

-- ----------------------------

INSERT INTO `test` VALUES ('1', '1', '2', '2019-11-23 16:07:34');

-- ----------------------------

-- Table structure for undo_log

-- ----------------------------

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------

-- Records of undo_log

3.我们找到seata-server/conf 文件夹内的file编辑它:20191129132933

4.再次找到其中的db配置方法块,更改方法如下图:

好了,可以到bin目录去./seata-server.bat 运行看看了

创建项目

​ 首先我们使用的是eclipse,当然你也可以用idea之类的工具,详细请按下面步骤来运行

​ 1.创建一个新的maven项目,并删除多余文件夹:2019112913335420191129133441

​ 2.打开项目的pom.xml,加入以下依赖:

	<properties>
<webVersion>3.1</webVersion>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<HikariCP.version>3.2.0</HikariCP.version>
<mybatis-plus-boot-starter.version>3.2.0</mybatis-plus-boot-starter.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<!-- <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId>
<version>7.0</version> <scope>provided</scope> </dependency> -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>

<!-- mybatis-plus begin -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<!-- mybatis-plus end -->
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0.1</version>
</dependency>
<!-- Zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>2.5.4</version> </dependency> -->

<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId>
<version>3.1.0</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.freemarker/freemarker -->
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency>
<!-- 加上这个才能辨认到log4j2.yml文件 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency> <!-- 引入log4j2依赖 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId>
<version>2.11.0</version> </dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

​ 3.再切换父项目为pom模式,还是pom文件,切换为 overview ,做如图操作:20191129134127

4.创建我们的demo子项目,test-service:20191129135935

​ 目录如下:

20191129140048

创建EmbeddedZooKeeper.java文件,跟 ProviderApplication.java,代码如下:

package org.test;

import java.io.File;
import java.lang.reflect.Method;
import java.util.Properties;
import java.util.UUID;

import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.ErrorHandler;
import org.springframework.util.SocketUtils;

/**
* from:
* https://github.com/spring-projects/spring-xd/blob/v1.3.1.RELEASE/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/zookeeper/ZooKeeperUtils.java
*
* Helper class to start an embedded instance of standalone (non clustered) ZooKeeper.
*
* NOTE: at least an external standalone server (if not an ensemble) are recommended, even for
* {@link org.springframework.xd.dirt.server.singlenode.SingleNodeApplication}
*
* @author Patrick Peralta
* @author Mark Fisher
* @author David Turanski
*/
public class EmbeddedZooKeeper implements SmartLifecycle {

/**
* Logger.
*/
private static final Logger logger = LoggerFactory.getLogger(EmbeddedZooKeeper.class);

/**
* ZooKeeper client port. This will be determined dynamically upon startup.
*/
private final int clientPort;

/**
* Whether to auto-start. Default is true.
*/
private boolean autoStartup = true;

/**
* Lifecycle phase. Default is 0.
*/
private int phase = 0;

/**
* Thread for running the ZooKeeper server.
*/
private volatile Thread zkServerThread;

/**
* ZooKeeper server.
*/
private volatile ZooKeeperServerMain zkServer;

/**
* {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread.
*/
private ErrorHandler errorHandler;

private boolean daemon = true;

/**
* Construct an EmbeddedZooKeeper with a random port.
*/
public EmbeddedZooKeeper() {
clientPort = SocketUtils.findAvailableTcpPort();
}

/**
* Construct an EmbeddedZooKeeper with the provided port.
*
* @param clientPort
* port for ZooKeeper server to bind to
*/
public EmbeddedZooKeeper(int clientPort, boolean daemon) {
this.clientPort = clientPort;
this.daemon = daemon;
}

/**
* Returns the port that clients should use to connect to this embedded server.
*
* @return dynamically determined client port
*/
public int getClientPort() {
return this.clientPort;
}

/**
* Specify whether to start automatically. Default is true.
*
* @param autoStartup
* whether to start automatically
*/
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

/**
* {@inheritDoc}
*/
public boolean isAutoStartup() {
return this.autoStartup;
}

/**
* Specify the lifecycle phase for the embedded server.
*
* @param phase
* the lifecycle phase
*/
public void setPhase(int phase) {
this.phase = phase;
}

/**
* {@inheritDoc}
*/
public int getPhase() {
return this.phase;
}

/**
* {@inheritDoc}
*/
public boolean isRunning() {
return (zkServerThread != null);
}

/**
* Start the ZooKeeper server in a background thread.
* <p>
* Register an error handler via {@link #setErrorHandler} in order to handle any exceptions thrown during startup or
* execution.
*/
public synchronized void start() {
if (zkServerThread == null) {
zkServerThread = new Thread(new ServerRunnable(), "ZooKeeper Server Starter");
zkServerThread.setDaemon(daemon);
zkServerThread.start();
}
}

/**
* Shutdown the ZooKeeper server.
*/
public synchronized void stop() {
if (zkServerThread != null) {
// The shutdown method is protected...thus this hack to invoke it.
// This will log an exception on shutdown; see
// https://issues.apache.org/jira/browse/ZOOKEEPER-1873 for details.
try {
Method shutdown = ZooKeeperServerMain.class.getDeclaredMethod("shutdown");
shutdown.setAccessible(true);
shutdown.invoke(zkServer);
}

catch (Exception e) {
throw new RuntimeException(e);
}

// It is expected that the thread will exit after
// the server is shutdown; this will block until
// the shutdown is complete.
try {
zkServerThread.join(5000);
zkServerThread = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for embedded ZooKeeper to exit");
// abandoning zk thread
zkServerThread = null;
}
}
}

/**
* Stop the server if running and invoke the callback when complete.
*/
public void stop(Runnable callback) {
stop();
callback.run();
}

/**
* Provide an {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. If none
* is provided, only error-level logging will occur.
*
* @param errorHandler
* the {@link ErrorHandler} to be invoked
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Runnable implementation that starts the ZooKeeper server.
*/
private class ServerRunnable implements Runnable {

public void run() {
try {
Properties properties = new Properties();
File file = new File(System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID());
file.deleteOnExit();
properties.setProperty("dataDir", file.getAbsolutePath());
properties.setProperty("clientPort", String.valueOf(clientPort));

QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
quorumPeerConfig.parseProperties(properties);

zkServer = new ZooKeeperServerMain();
ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumPeerConfig);

zkServer.runFromConfig(configuration);
} catch (Exception e) {
if (errorHandler != null) {
errorHandler.handleError(e);
} else {
logger.error("Exception running embedded ZooKeeper", e);
}
}
}
}

}

package org.test;

import org.apache.dubbo.config.spring.context.annotation.DubboComponentScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
*
* @author cjb
* @date 2019/10/24
*/
@EnableTransactionManagement
@ComponentScan(basePackages = {"org.test.config", "org.test.service.impl"})
@DubboComponentScan(basePackages = "org.test.service.impl")
@SpringBootApplication
public class ProviderApplication {

public static void main(String[] args) {
new EmbeddedZooKeeper(2181, false).start();
SpringApplication app = new SpringApplication(ProviderApplication.class);
app.run(args);
}

}

创建实体包 org.test.entity以及创建实体类Test 用到了lombok,详细百度一下,eclipse装lombok插件

package org.test.entity;

import java.io.Serializable;
import java.time.LocalDateTime;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

/**
* <p>
* 功能
* </p>
*
* @author Funkye
* @since 2019-04-23
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value = "test对象", description = "功能")
public class Test implements Serializable {

private static final long serialVersionUID = 1L;

@ApiModelProperty(value = "主键")
@TableId(value = "id", type = IdType.AUTO)
private Integer id;

@ApiModelProperty(value = "one")
@TableField("one")
private String one;

@ApiModelProperty(value = "two")
@TableField("two")
private String two;

@ApiModelProperty(value = "createTime")
@TableField("createTime")
private LocalDateTime createTime;

}

​ 创建service,service.impl,mapper等包,依次创建ITestservice,以及实现类,mapper

package org.test.service;

import org.test.entity.Test;

import com.baomidou.mybatisplus.extension.service.IService;

/**
* <p>
* 功能 服务类
* </p>
*
* @author Funkye
* @since 2019-04-10
*/
public interface ITestService extends IService<Test> {

}

package org.test.service.impl;




import org.apache.dubbo.config.annotation.Service;
import org.test.entity.Test;
import org.test.mapper.TestMapper;
import org.test.service.ITestService;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

@Service(version = "1.0.0",interfaceClass =ITestService.class )
public class TestServiceImpl extends ServiceImpl<TestMapper, Test> implements ITestService {

}

package org.test.mapper;

import org.test.entity.Test;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
* <p>
* 功能 Mapper 接口
* </p>
*
* @author Funkye
* @since 2019-04-10
*/
public interface TestMapper extends BaseMapper<Test> {

}

创建org.test.config包,创建SeataAutoConfig.java,配置信息都在此处,主要作用为代理数据,连接事务服务分组

package org.test.config;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import com.alibaba.druid.pool.DruidDataSource;

import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties;
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);

@Bean(name = "druidDataSource") // 声明其为Bean实例
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}", dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("装载dataSource........");
return druidDataSource;
}

/**
* init datasource proxy
*
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean(name = "dataSource")
@Primary // 在同样的DataSource中,首先使用被标注的DataSource
public DataSourceProxy dataSourceProxy(@Qualifier(value = "druidDataSource") DruidDataSource druidDataSource) {
logger.info("代理dataSource........");
return new DataSourceProxy(druidDataSource);
}

/**
* init global transaction scanner
*
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("配置seata........");
return new GlobalTransactionScanner("test-service", "test-group");
}
}

再创建mybatisplus所需的配置文件MybatisPlusConfig

package org.test.config;

import java.util.ArrayList;
import java.util.List;

import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.baomidou.mybatisplus.core.parser.ISqlParser;
import com.baomidou.mybatisplus.extension.parsers.BlockAttackSqlParser;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;

@Configuration
// @MapperScan("com.baomidou.springboot.mapper*")//这个注解,作用相当于下面的@Bean
// MapperScannerConfigurer,2者配置1份即可
public class MybatisPlusConfig {

/**
* mybatis-plus分页插件<br>
* 文档:http://mp.baomidou.com<br>
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
List<ISqlParser> sqlParserList = new ArrayList<ISqlParser>();
// 攻击 SQL 阻断解析器、加入解析链
sqlParserList.add(new BlockAttackSqlParser());
paginationInterceptor.setSqlParserList(sqlParserList);
return paginationInterceptor;
}

/**
* 相当于顶部的: {@code @MapperScan("com.baomidou.springboot.mapper*")} 这里可以扩展,比如使用配置文件来配置扫描Mapper的路径
*/

@Bean
public MapperScannerConfigurer mapperScannerConfigurer() {
MapperScannerConfigurer scannerConfigurer = new MapperScannerConfigurer();
scannerConfigurer.setBasePackage("org.test.mapper");
return scannerConfigurer;
}

}

​ 再创建resources目录,创建mapper文件夹,application.yml等文件

server:
port: 38888
spring:
application:
name: test-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
dubbo:
protocol:
loadbalance: leastactive
threadpool: cached
scan:
base-packages: org。test.service
application:
qos-enable: false
name: testserver
registry:
id: my-registry
address: zookeeper://127.0.0.1:2181?client=curator
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config:
db-config:
field-strategy: not-empty
id-type: auto
db-type: mysql
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
auto-mapping-unknown-column-behavior: none

​ 创建file.conf,此处的service 内的vgroup_mapping.你的事务分组,比如上面SeataAutoConfig内配置了test-group,那么这里也要改为test-group,然后下面ip端口都是seata运行的ip跟端口就行了

transport {
type = "TCP"
server = "NIO"
heartbeat = true
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
boss-thread-size = 1
worker-thread-size = 8
}
shutdown {
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
vgroup_mapping.test-group = "default"
default.grouplist = "127.0.0.1:8091"
enableDegrade = false
disable = false
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}

client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
tm.commit.retry.count = 1
tm.rollback.retry.count = 1
undo.log.table = "undo_log"
}

recovery {
committing-retry-period = 1000
asyn-committing-retry-period = 1000
rollbacking-retry-period = 1000
timeout-retry-period = 1000
}

transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
undo.log.save.days = 7
undo.log.delete.period = 86400000
undo.log.table = "undo_log"
}

metrics {
enabled = false
registry-type = "compact"
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}

support {
spring {
datasource.autoproxy = false
}
}

创建registry.conf,来指定file,zk的ip端口之类的配置

registry {
type = "file"
file {
name = "file.conf"
}
}
config {
type = "file"
file {
name = "file.conf"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
}

​ 大功告成,可以直接运行啦,这时候观察seata-server20191129142115

​ 接下来我们创建test-client项目项目,这里就不赘述了,跟上面的test-service一样的创建方式

​ 接下来我们复制test-service内的service跟实体过去,当然你嫌麻烦,可以单独搞个子项目放通用的service跟实体,一些工具类等等,我这边为了快速搭建这个demo,就选择复制黏贴的方式了.

目录结构:

然后我们创建ClientApplication:

package org.test;

import java.util.TimeZone;
import java.util.concurrent.Executor;

import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration;

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, MybatisPlusAutoConfiguration.class})
@EnableScheduling
@EnableAsync
@Configuration
@EnableDubbo(scanBasePackages = {"org.test.service"})
@ComponentScan(basePackages = {"org.test.service", "org.test.controller", "org.test.config"})
public class ClientApplication {
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
SpringApplication app = new SpringApplication(ClientApplication.class);
app.run(args);
}

@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
return new ThreadPoolTaskExecutor();
}
}

再到config包内创建SwaggerConfig :

package org.test.config;

import java.util.ArrayList;
import java.util.List;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.service.Parameter;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
// swagger2的配置文件,这里可以配置swagger2的一些基本的内容,比如扫描的包等等
@Bean
public Docket createRestApi() {
List<Parameter> pars = new ArrayList<Parameter>();
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
// 为当前包路径
.apis(RequestHandlerSelectors.basePackage("org.test.controller")).paths(PathSelectors.any()).build()
.globalOperationParameters(pars);
}

// 构建 api文档的详细信息函数,注意这里的注解引用的是哪个
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
// 页面标题
.title("项目接口")
// 创建人
.contact(new Contact("FUNKYE", "", ""))
// 版本号
.version("1.0")
// 描述
.description("API 描述").build();
}
}

​ 再创建SpringMvcConfigure,再里面放入seata的配置,我为了偷懒直接集成在mvc配置的类里了,大家规范点可以另外创建个配置seata的类,大家可以发现下面还是有个组名称,我把两个项目都分配到一个组去,貌似另外取一个也没事的.

package org.test.config;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.dubbo.config.annotation.Reference;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.filter.CorsFilter;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.servlet.view.InternalResourceViewResolver;

import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson.support.config.FastJsonConfig;
import com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter;
import com.google.common.collect.Maps;

import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SpringMvcConfigure implements WebMvcConfigurer {

@Bean
public FilterRegistrationBean corsFilter() {
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedOrigin("*");
config.addAllowedHeader(CorsConfiguration.ALL);
config.addAllowedMethod(CorsConfiguration.ALL);
source.registerCorsConfiguration("/**", config);
FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(new CorsFilter(source));
filterRegistrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE);
filterRegistrationBean.setOrder(1);
filterRegistrationBean.setEnabled(true);
filterRegistrationBean.addUrlPatterns("/**");
Map<String, String> initParameters = Maps.newHashMap();
initParameters.put("excludes", "/favicon.ico,/img/*,/js/*,/css/*");
initParameters.put("isIncludeRichText", "true");
filterRegistrationBean.setInitParameters(initParameters);
return filterRegistrationBean;
}

@Bean
public InternalResourceViewResolver viewResolver() {
InternalResourceViewResolver viewResolver = new InternalResourceViewResolver();
viewResolver.setPrefix("/WEB-INF/jsp/");
viewResolver.setSuffix(".jsp");
// viewResolver.setViewClass(JstlView.class);
// 这个属性通常并不需要手动配置,高版本的Spring会自动检测
return viewResolver;
}



/**
* 替换框架json为fastjson
*/
@Override
public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
FastJsonHttpMessageConverter fastConverter = new FastJsonHttpMessageConverter();
FastJsonConfig fastJsonConfig = new FastJsonConfig();
fastJsonConfig.setSerializerFeatures(SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue,
SerializerFeature.WriteNullStringAsEmpty, SerializerFeature.DisableCircularReferenceDetect);
// 处理中文乱码问题
List<MediaType> fastMediaTypes = new ArrayList<>();
fastMediaTypes.add(MediaType.APPLICATION_JSON_UTF8);
fastConverter.setSupportedMediaTypes(fastMediaTypes);
fastConverter.setFastJsonConfig(fastJsonConfig);
// 处理字符串, 避免直接返回字符串的时候被添加了引号
StringHttpMessageConverter smc = new StringHttpMessageConverter(Charset.forName("UTF-8"));
converters.add(smc);
converters.add(fastConverter);
}

@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("test-client", "test-group");
}

}

再创建controller包,再包下创建TestController :

package org.test.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.test.service.DemoService;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;

/**
* <p>
* 文件表 前端控制器
* </p>
*
* @author funkye
* @since 2019-03-20
*/
@RestController
@RequestMapping("/test")
@Api(tags = "测试接口")
public class TestController {

private final static Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
@Lazy
DemoService demoService;

@GetMapping(value = "testSeataOne")
@ApiOperation(value = "测试手动回滚分布式事务接口")
public Object testSeataOne() {
return demoService.One();
}

@GetMapping(value = "testSeataTwo")
@ApiOperation(value = "测试异常回滚分布式事务接口")
public Object testSeataTwo() {
return demoService.Two();
}

}

再到service去创建需要依赖的DemoService

package org.test.service;

import java.time.LocalDateTime;

import org.apache.dubbo.config.annotation.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.test.controller.TestController;
import org.test.entity.Test;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;

@Service
public class DemoService {
@Reference(version = "1.0.0", timeout = 60000)
private ITestService testService;
private final static Logger logger = LoggerFactory.getLogger(DemoService.class);

/**
* 手动回滚示例
*
* @return
*/
@GlobalTransactional
public Object One() {
logger.info("seata分布式事务Id:{}", RootContext.getXID());
Test t = new Test();
t.setOne("1");
t.setTwo("2");
t.setCreateTime(LocalDateTime.now());
testService.save(t);
try {
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
try {
logger.info("载入事务id进行回滚");
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
} catch (TransactionException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
return false;
}

/**
* 抛出异常进行回滚示例
*
* @return
*/
@GlobalTransactional
public Object Two() {
logger.info("seata分布式事务Id:{}", RootContext.getXID());
Test t = new Test();
t.setOne("1");
t.setTwo("2");
t.setCreateTime(LocalDateTime.now());
testService.save(t);
try {
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException();
}
}
}

一样创建resources文件夹,先创建常用的application.yml

spring:
application:
name: test
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test?userSSL=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai
username: root
password: 123456
mvc:
servlet:
load-on-startup: 1
http:
encoding:
force: true
charset: utf-8
enabled: true
multipart:
max-file-size: 10MB
max-request-size: 10MB
dubbo:
registry:
id: my-registry
address: zookeeper://127.0.0.1:2181?client=curator
# address: zookeeper://127.0.0.1:2181?client=curator
application:
name: dubbo-demo-client
qos-enable: false
server:
port: 28888
max-http-header-size: 8192
address: 0.0.0.0
tomcat:
max-http-post-size: 104857600

再把之前service配置好的file跟registry文件复制来,如果你的client组名称再配置类里修改了,那么这里的file文件内的组名称一样需要更改.

完整的目录结构如上,这时候可以启动test-service后,再启动test-client,到swagger里测试咯

​ 4.访问127.0.0.1:28888/swagger-ui.html做最后的收尾

20191129143124

这里数据我已经存了一条记录了,我们看看会不会成功回滚:

20191129143252

刷新数据库,发现还是只有一条数据:

20191129143124

再查看日志:

20191129143407

显示已经回滚,我们再看看seata-server的日志:

显示回滚成功,事务id也是一致的,这下我们的分布式事务就跑通咯,通过打断点方式,大家可以查看undo_log,会发现再事务提交前,会存入一条事务信息的数据,如果回滚成功,该信息就会被删除.

总结

seata的整合还是比较简单易入手,稍微用心一些你肯定写的比我更好!

欢迎大家也多去阅读seata,dubbo之类的源代码,能解决业务中遇到的大量的坑哦!

· 阅读需 4 分钟

在分析启动部分源码时,我发现 GlobalTransactionScanner 会同时启动 RM 和 TM client,但根据 Seata 的设计来看,TM 负责全局事务的操作,如果一个服务中不需要开启全局事务,此时是不需要启动 TM client的,也就是说项目中如果没有全局事务注解,此时是不是就不需要初始化 TM client 了,因为不是每个微服务,都需要 GlobalTransactional,它此时仅仅作为一个 RM client 而已。

于是我着手将 GlobalTransactionScanner 稍微更改了初始化的规则,由于之前 GlobalTransactionScanner 调用 初始化方法是在 InitializingBean 中的 afterPropertiesSet() 方法中进行,afterPropertySet() 仅仅是当前 bean 初始化后被调用,此时无法得知当前 Spring 容器是否有全局事务注解。

因此我去掉了 InitializingBean,改成了是实现 ApplicationListener,在实例化 bean 的过程中检查是否有 GlobalTransactional 注解的存在,最后在 Spring 容器初始化完成之后再调用 RM 和 TM client 初始化方法,这时候就可以根据项目是否有用到全局事务注解来决定是否启动 TM client 了。

这里附上 PR 地址:https://github.com/apache/incubator-seata/pull/1936

随后在 pr 中讨论中得知,目前 Seata 的设计是只有在发起方的 TM 才可以发起 GlobalRollbackRequest,RM 只能发送 BranchReport(false) 上报分支状态到 TC 服务端,无法直接发送 GlobalRollbackRequest 进行全局回滚操作。具体的交互逻辑如下:

那么根据上面的设计模型,自然可以按需启动 TM client 了。

但是 Seata 后面的优化迭代中,还需要考虑的一点是:

当参与方出现异常时,是否可以直接由参与方的 TM client 发起全局回滚?这也就意味着可以缩短分布式事务的周期时间,尽快释放全局锁让其他数据冲突的事务尽早的获取到锁执行。

也就是说在一个全局事务当中,只要有一个 RM client 执行本地事务失败了,直接当前服务的 TM client 发起全局事务回滚,不必要等待发起方的 TM 发起的决议回滚通知了。如果要实现这个优化,那么就需要每个服务都需要同时启动 TM client 和 RM client。

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 14 分钟

从上一篇文章「分布式事务中间件Seata的设计原理」讲了下 Seata AT 模式的一些设计原理,从中也知道了 AT 模式的三个角色(RM、TM、TC),接下来我会更新 Seata 源码分析系列文章。今天就来分析 Seata AT 模式在启动的时候都做了哪些操作。

客户端启动逻辑

TM 是负责整个全局事务的管理器,因此一个全局事务是由 TM 开启的,TM 有个全局管理类 GlobalTransaction,结构如下:

io.seata.tm.api.GlobalTransaction

public interface GlobalTransaction {

void begin() throws TransactionException;

void begin(int timeout) throws TransactionException;

void begin(int timeout, String name) throws TransactionException;

void commit() throws TransactionException;

void rollback() throws TransactionException;

GlobalStatus getStatus() throws TransactionException;

// ...
}

可以通过 GlobalTransactionContext 创建一个 GlobalTransaction,然后用 GlobalTransaction 进行全局事务的开启、提交、回滚等操作,因此我们直接用 API 方式使用 Seata AT 模式:

//init seata;
TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);
//trx
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
tx.begin(60000, "testBiz");
// 事务处理
// ...
tx.commit();
} catch (Exception exx) {
tx.rollback();
throw exx;
}

如果每次使用全局事务都这样写,难免会造成代码冗余,我们的项目都是基于 Spring 容器,这时我们可以利用 Spring AOP 的特性,用模板模式把这些冗余代码封装模版里,参考 Mybatis-spring 也是做了这么一件事情,那么接下来我们来分析一下基于 Spring 的项目启动 Seata 并注册全局事务时都做了哪些工作。

我们开启一个全局事务是在方法上加上 @GlobalTransactional注解,Seata 的 Spring 模块中,有个 GlobalTransactionScanner,它的继承关系如下:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean, ApplicationContextAware, DisposableBean {
// ...
}

在基于 Spring 项目的启动过程中,对该类会有如下初始化流程:

image-20191124155455309

InitializingBean 的 afterPropertiesSet() 方法调用了 initClient() 方法:

io.seata.spring.annotation.GlobalTransactionScanner#initClient

TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);

对 TM 和 RM 做了初始化操作。

  • TM 初始化

io.seata.tm.TMClient#init

public static void init(String applicationId, String transactionServiceGroup) {
// 获取 TmRpcClient 实例
TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
// 初始化 TM Client
tmRpcClient.init();
}

调用 TmRpcClient.getInstance() 方法会获取一个 TM 客户端实例,在获取过程中,会创建 Netty 客户端配置文件对象,以及创建 messageExecutor 线程池,该线程池用于在处理各种与服务端的消息交互,在创建 TmRpcClient 实例时,创建 ClientBootstrap,用于管理 Netty 服务的启停,以及 ClientChannelManager,它是专门用于管理 Netty 客户端对象池,Seata 的 Netty 部分配合使用了对象池,后面在分析网络模块会讲到。

io.seata.core.rpc.netty.AbstractRpcRemotingClient#init

public void init() {
clientBootstrap.start();
// 定时尝试连接服务端
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
super.init();
}

调用 TM 客户端 init() 方法,最终会启动 netty 客户端(此时还未真正启动,在对象池被调用时才会被真正启动);开启一个定时任务,定时重新发送 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端,具体逻辑是在 NettyClientChannelManager 中的 channels 中缓存了客户端 channel,如果此时 channels 不存在获取已过期,那么就会尝试连接服务端以重新获取 channel 并将其缓存到 channels 中;开启一条单独线程,用于处理异步请求发送,这里用得很巧妙,之后在分析网络模块在具体对其进行分析。

io.seata.core.rpc.netty.AbstractRpcRemoting#init

public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
if (entry.getValue().isTimeout()) {
futures.remove(entry.getKey());
entry.getValue().setResultMessage(null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}

nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

在 AbstractRpcRemoting 的 init 方法中,又是开启了一个定时任务,该定时任务主要是用于定时清除 futures 已过期的 futrue,futures 是保存发送请求需要返回结果的 future 对象,该对象有个超时时间,过了超时时间就会自动抛异常,因此需要定时清除已过期的 future 对象。

  • RM 初始化

io.seata.rm.RMClient#init

public static void init(String applicationId, String transactionServiceGroup) {
RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
rmRpcClient.setResourceManager(DefaultResourceManager.get());
rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
rmRpcClient.init();
}

RmRpcClient.getInstance 处理逻辑与 TM 大致相同;ResourceManager 是 RM 资源管理器,负责分支事务的注册、提交、上报、以及回滚操作,以及全局锁的查询操作,DefaultResourceManager 会持有当前所有的 RM 资源管理器,进行统一调用处理,而 get() 方法主要是加载当前的资源管理器,主要用了类似 SPI 的机制,进行灵活加载,如下图,Seata 会扫描 META-INF/services/ 目录下的配置类并进行动态加载。

ClientMessageListener 是 RM 消息处理监听器,用于负责处理从 TC 发送过来的指令,并对分支进行分支提交、分支回滚,以及 undo log 删除操作;最后 init 方法跟 TM 逻辑也大体一致;DefaultRMHandler 封装了 RM 分支事务的一些具体操作逻辑。

接下来再看看 wrapIfNecessary 方法究竟做了哪些操作。

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 判断是否有开启全局事务
if (disableGlobalTransaction) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

// 判断 bean 中是否有 GlobalTransactional 和 GlobalLock 注解
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}

if (interceptor == null) {
// 创建代理类
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}

LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]",
bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// 执行包装目标对象到代理对象
Advisor[] advisor = super.buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

GlobalTransactionScanner 继承了 AbstractAutoProxyCreator,用于对 Spring AOP 支持,从代码中可看出,用GlobalTransactionalInterceptor 代替了被 GlobalTransactional 和 GlobalLock 注解的方法。

GlobalTransactionalInterceptor 实现了 MethodInterceptor:

io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
// 全局事务注解
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
// 全局锁注解
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}

以上是代理方法执行的逻辑逻辑,其中 handleGlobalTransaction() 方法里面调用了 TransactionalTemplate 模版:

io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction

private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public TransactionInfo getTransactionInfo() {
// ...
}
});
} catch (TransactionalExecutor.ExecutionException e) {
// ...
}
}

handleGlobalTransaction() 方法执行了就是 TransactionalTemplate 模版类的 execute 方法:

io.seata.tm.api.TransactionalTemplate#execute

public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {

// 2. begin transaction
beginTransaction(txInfo, tx);

Object rs = null;
try {

// Do Your Business
rs = business.execute();

} catch (Throwable ex) {

// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}

// 4. everything is fine, commit.
commitTransaction(tx);

return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}

以上是不是有一种似曾相识的感觉?没错,以上就是我们使用 API 时经常写的冗余代码,现在 Spring 通过代理模式,把这些冗余代码都封装带模版里面了,它将那些冗余代码统统封装起来统一流程处理,并不需要你显示写出来了,有兴趣的也可以去看看 Mybatis-spring 的源码,也是写得非常精彩。

服务端处理逻辑

服务端收到客户端的连接,那当然是将其 channel 也缓存起来,前面也说到客户端会发送 RegisterRMRequest/RegisterTMRequest 请求给服务端,服务端收到后会调用 ServerMessageListener 监听器处理:

io.seata.core.rpc.ServerMessageListener

public interface ServerMessageListener {
// 处理各种事务,如分支注册、分支提交、分支上报、分支回滚等等
void onTrxMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender);
// 处理 RM 客户端的注册连接
void onRegRmMessage(RpcMessage request, ChannelHandlerContext ctx,
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler);
// 处理 TM 客户端的注册连接
void onRegTmMessage(RpcMessage request, ChannelHandlerContext ctx,
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler);
// 服务端与客户端保持心跳
void onCheckMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender)

}

ChannelManager 是服务端 channel 的管理器,服务端每次和客户端通信,都需要从 ChannelManager 中获取客户端对应的 channel,它用于保存 TM 和 RM 客户端 channel 的缓存结构如下:

/**
* resourceId -> applicationId -> ip -> port -> RpcContext
*/
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>>>
RM_CHANNELS = new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>>>();

/**
* ip+appname,port
*/
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS
= new ConcurrentHashMap<String, ConcurrentMap<Integer, RpcContext>>();

以上的 Map 结构有点复杂:

RM_CHANNELS:

  1. resourceId 指的是 RM client 的数据库地址;
  2. applicationId 指的是 RM client 的服务 Id,比如 springboot 的配置 spring.application.name=account-service 中的 account-service 即是 applicationId;
  3. ip 指的是 RM client 服务地址;
  4. port 指的是 RM client 服务地址;
  5. RpcContext 保存了本次注册请求的信息。

TM_CHANNELS:

  1. ip+appname:这里的注释应该是写错了,应该是 appname+ip,即 TM_CHANNELS 的 Map 结构第一个 key 为 appname+ip;
  2. port:客户端的端口号。

以下是 RM Client 注册逻辑:

io.seata.core.rpc.ChannelManager#registerRMChannel

public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(resourceManagerRequest.getVersion());
// 将 ResourceIds 数据库连接连接信息放入一个set中
Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
RpcContext rpcContext;
// 从缓存中判断是否有该channel信息
if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
// 根据请求注册信息,构建 rpcContext
rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
resourceManagerRequest.getResourceIds(), channel);
// 将 rpcContext 放入缓存中
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
} else {
rpcContext = IDENTIFIED_CHANNELS.get(channel);
rpcContext.addResources(dbkeySet);
}
if (null == dbkeySet || dbkeySet.isEmpty()) { return; }
for (String resourceId : dbkeySet) {
String clientIp;
// 将请求信息存入 RM_CHANNELS 中,这里用了 java8 的 computeIfAbsent 方法操作
ConcurrentMap<Integer, RpcContext> portMap = RM_CHANNELS.computeIfAbsent(resourceId, resourceIdKey -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceManagerRequest.getApplicationId(), applicationId -> new ConcurrentHashMap<>())
.computeIfAbsent(clientIp = getClientIpFromChannel(channel), clientIpKey -> new ConcurrentHashMap<>());
// 将当前 rpcContext 放入 portMap 中
rpcContext.holdInResourceManagerChannels(resourceId, portMap);
updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
}
}

从以上代码逻辑能够看出,注册 RM client 主要是将注册请求信息,放入 RM_CHANNELS 缓存中,同时还会从 IDENTIFIED_CHANNELS 中判断本次请求的 channel 是否已验证过,IDENTIFIED_CHANNELS 的结构如下:

private static final ConcurrentMap<Channel, RpcContext> IDENTIFIED_CHANNELS
= new ConcurrentHashMap<>();

IDENTIFIED_CHANNELS 包含了所有 TM 和 RM 已注册的 channel。

以下是 TM 注册逻辑:

io.seata.core.rpc.ChannelManager#registerTMChannel

public static void registerTMChannel(RegisterTMRequest request, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(request.getVersion());
// 根据请求注册信息,构建 RpcContext
RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
request.getApplicationId(),
request.getTransactionServiceGroup(),
null, channel);
// 将 RpcContext 放入 IDENTIFIED_CHANNELS 缓存中
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
// account-service:127.0.0.1:63353
String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
+ getClientIpFromChannel(channel);
// 将请求信息存入 TM_CHANNELS 缓存中
TM_CHANNELS.putIfAbsent(clientIdentified, new ConcurrentHashMap<Integer, RpcContext>());
// 将上一步创建好的get出来,之后再将rpcContext放入这个map的value中
ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = TM_CHANNELS.get(clientIdentified);
rpcContext.holdInClientChannels(clientIdentifiedMap);
}

TM client 的注册大体类似,把本次注册的信息放入对应的缓存中保存,但比 RM client 的注册逻辑简单一些,主要是 RM client 会涉及分支事务资源的信息,需要注册的信息也会比 TM client 多。

以上源码分析基于 0.9.0 版本。

作者简介

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。