在Seata1.3.0版本中,数据源自动代理和手动代理一定不能混合使用,否则会导致多层代理,从而导致以下问题:
- 单数据源情况下:导致分支事务提交时,undo_log本身也被代理,即
为 undo_log 生成了 undo_log, 假设为undo_log2
,此时undo_log将被当作分支事务来处理;分支事务回滚时,因为undo_log2
生成的有问题,在undo_log对应的事务分支
回滚时会将业务表关联的undo_log
也一起删除,从而导致业务表对应的事务分支
回滚时发现undo_log不存在,从而又多生成一条状态为1的undo_log。这时候整体逻辑已经乱了,很严重的问题 - 多数据源和
逻 辑数据源被代理
情况下:除了单数据源情况下会出现的问题,还可能会造成死锁问题。死锁的原因就是针对undo_log的操作,本该在一个事务中执行的select for update
和delete
操作,被分散在多个事务中执行,导致一个事务在执行完select for update
后一直不提交,一个事务在执行delete
时一直等待锁,直到超时
代理描述
即对DataSource代理一层,重写一些方法。比如getConnection
方法,这时不直接返回一个Connection
,而是返回ConnectionProxy
,其它的以此类推
// DataSourceProxy
public DataSourceProxy(DataSource targetDataSource) {
this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
}
private void init(DataSource dataSource, String resourceGroupId) {
DefaultResourceManager.get().registerResource(this);
}
public Connection getPlainConnection() throws SQLException {
return targetDataSource.getConnection();
}
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
手动代理
即手动注入一个DataSourceProxy
,如下
@Bean
public DataSource druidDataSource() {
return new DruidDataSource()
}
@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
自动代理
针对DataSource
创建一个代理类,在代理类里面基于DataSource
获取DataSourceProxy(如果没有就创建)
,然后调用DataSourceProxy
的相关方法。核心逻辑在SeataAutoDataSourceProxyCreator
中
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final String[] excludes;
private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());
public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {
this.excludes = excludes;
setProxyTargetClass(!useJdkProxy);
}
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
return new Object[]{advisor};
}
@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}
}
public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (m != null) {
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
@Override
public Class<?>[] getInterfaces() {
return new Class[]{SeataProxy.class};
}
}
数据源多层代理
@Bean
@DependsOn("strangeAdapter")
public DataSource druidDataSource(StrangeAdapter strangeAdapter) {
doxx
return new DruidDataSource()
}
@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
- 首先我们在配置类里面注入了两个
DataSource
,分别为:DruidDataSource
和DataSourceProxy
, 其中DruidDataSource 作为 DataSourceProxy 的 targetDataSource 属性
,并且DataSourceProxy
为使用了@Primary
注解声明 - 应用默认开启了数据源自动代理,所以在调用
DruidDataSource
相关方法时,又会为为DruidDataSource
创建一个对应的数据源代理DataSourceProxy2
- 当我们在程序中想获取一个Connection时会发生什么?
- 先获取一个
DataSource
,因为DataSourceProxy
为Primary
,所以此时拿到的是DataSourceProxy
- 基于
DataSource
获取一个Connection
,即通过DataSourceProxy
获取Connection
。此时会先调用targetDataSource 即 DruidDataSource 的 getConnection 方法
,但因为切面会对DruidDataSource
进行拦截,根据步骤2的拦截逻辑可以知道,此时会自动创建一个DataSourceProxy2
,然后调用DataSourceProxy2#getConnection
,然后再调用DruidDataSource#getConnection
。最终形成了双层代理, 返回的Connection
也是一个双层的ConnectionProxy
- 先获取一个
上面其实是改造之后的代理逻辑,Seata默认的自动代理会对DataSourceProxy
再次进行代理,后果就是代理多了一层此时对应的图如下
数据源多层代理会导致的两个问题在文章开头处已经总结了,下面会有案例介绍。
分支事务提交
通过ConnectionProxy
中执行对应的方法,会发生什么?以update操作涉及到的一个分支事务提交为例:
- 执行
ConnectionProxy#prepareStatement
, 返回一个PreparedStatementProxy
- 执行
PreparedStatementProxy#executeUpdate
,PreparedStatementProxy#executeUpdate
大概会帮做两件事情: 执行业务SQL和提交undo_log
提交业务SQL
// ExecuteTemplate#execute
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
主要流程就是: 先执行业务SQL,然后执行ConnectionProxy的commit方法,在这个方法中,会先帮我们执行对应的 undo_log SQL,然后提交事务
PreparedStatementProxy#executeUpdate =>
ExecuteTemplate#execute =>
BaseTransactionalExecutor#execute =>
AbstractDMLBaseExecutor#doExecute =>
AbstractDMLBaseExecutor#executeAutoCommitTrue =>
AbstractDMLBaseExecutor#executeAutoCommitFalse => 在这一步操中,会触发statementCallback#execute方法,即调用调用原生PreparedStatement#executeUpdate方法
ConnectionProxy#commit
ConnectionProxy#processGlobalTransactionCommit
UNDO_LOG插入
// ConnectionProxy#processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
try {
// 注册分支事务,简单理解向server发一个请求,然后server在branch_table表里插入一条记录,不关注
register();
} catch (TransactionException e) {
// 如果没有for update 的sql,会直接在commit之前做注册,此时不止插入一条branch记录,而会附带锁信息进行竞争,下方的异常一般就是在注册时没拿到锁抛出,一般就是纯update语句的并发下会触发竞争锁失败的异常 @FUNKYE
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// undo_log处理,期望用 targetConnection 处理 @1
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
// 提交本地事务,期望用 targetConnection 处理 @2
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
- undo_log处理@1,解析当前事务分支涉及到的
undo_log
,然后使用TargetConnection
, 写到数据库
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}
String xid = connectionContext.getXid();
long branchId = connectionContext.getBranchId();
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,cp.getTargetConnection());
}
- 提交本地事务@2,即通过
TargetConnection
提交事务。即务SQL执行
、undo_log写入
、即事务提交
用的都是同一个TargetConnection
lcn的内置数据库方案,lcn是将undolog写到他内嵌的h2(忘了是不是这个来着了)数据库上,此时会变成2个本地事务,一个是h2的undolog插入事务,一个是业务数据库的事务,如果在h2插入后,业务数 据库异常,lcn的方案就会出现数据冗余,回滚数据的时候也是一样,删除undolog跟回滚业务数据不是一个本地事务. 但是lcn这样的好处就是入侵小,不需要另外添加undolog表。 感谢@FUNKYE大佬给的建议,对lcn不太了解,有机会好好研究一下
分支事务回滚
- Server端向Client端发送回滚请求
- Client端接收到Server发过来的请求,经过一系列处理,最终会到
DataSourceManager#branchRollback
方法 - 先根据resourceId从
DataSourceManager.dataSourceCache
中获取对应的DataSourceProxy
,此时为masterSlaveProxy
(回滚阶段我们就不考代理数据源问题,简单直接一些,反正最终拿到的都是TragetConnection
) - 根据Server端发过来的xid和branchId查找对应的undo_log并解析其
rollback_info
属性,每条undo_log可能会解析出多条SQLUndoLog
,每个SQLUndoLog
可以理解成是一个操作。比如一个分支事务先更新A表,再更新B表,这时候针对该分支事务生成的undo_log就包含两个SQLUndoLog
:第一个SQLUndoLog
对应的是更新A表的前后快照;第二个SQLUndoLog
对应的是更新B表的前后快照 - 针对每条
SQLUndoLog
执行对应的回滚操作,比如一个SQLUndoLog
对应的操作是INSERT
,则其对应的回滚操作就是DELETE
- 根据xid和branchId删除该undo_log
// AbstractUndoLogManager#undo 删除了部分非关键代码
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
// 获取原生数据源的Connection, 回滚阶段我们不管代理数据源问题,最终拿到的都是 TargetConnection
conn = dataSourceProxy.getPlainConnection();
// 将回滚操作放在一个本地事务中,手动提交,确保最终业务SQL操作和undo_log删除操作一起提交
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// 根据xid 和 branchId 查询 undo_log,注意此时的SQL语句 SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
// status == 1 undo_log不处理,和防悬挂相关
if (!canUndo(state)) {
return;
}
// 解析undo_log
byte[] rollbackInfo = getRollbackInfo(rs);
BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance(serializer).decode(rollbackInfo);
try {
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
// 执行对应的回滚操作
undoExecutor.executeOn(conn);
}
}
}
//
if (exists) {
LOGGER.error("\n delete from undo_log where xid={} AND branchId={} \n", xid, branchId);
deleteUndoLog(xid, branchId, conn);
conn.commit();
// 和防悬挂相关 如果根据 xid和branchId 没有查到undo_log,说明这个分支事务有异常:例如业务处理超时,导致全局事务回滚,但这时候业务undo_log并没有插入
} else {
LOGGER.error("\n insert into undo_log xid={},branchId={} \n", xid, branchId);
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
}
return;
} catch (Throwable e) {
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,branchId, e.getMessage()), e);
}
}
}
有以下几个注意点:
- 回滚时不考虑数据源代理问题,最终都是使用
TargetConnection
- 设置atuoCommit为false,即需要手动提交事务
- 根据xid和branchId查询undo_log时加了
for update
,也就是说,这个事务会持有这条undo_log的锁直到所有回滚操作都完成,因为完成之后才会
多层代理问题
数据源多层代理会导致的几个问题在文章开头的时候已经提到过了,重点分析一下为什么会造成以上问题:
对分支事务提交的影响
先分析一下,如果使用双层代理会发生什么?我们从两个方面来分析:业务SQL
和undo_log
- 业务SQL
PreparedStatementProxy1.executeUpdate =>
statementCallback#executeUpdate(PreparedStatementProxy2#executeUpdate) =>
PreparedStatement#executeUpdate
好像没啥影响,就是多绕了一圈,最终还是通过PreparedStatement
执行
- undo_log
ConnectionProxy1#getTargetConnection ->
ConnectionProxy2#prepareStatement ->
PreparedStatementProxy2#executeUpdate ->
PreparedStatement#executeUpdate(原生的undo_log写入,在此之前会对为该 undo_log 生成 undo_log2(即 undo_log 的 undo_log)) ->
ConnectionProxy2#commit ->
ConnectionProxy2#processGlobalTransactionCommit(写入undo_log2) ->
ConnectionProxy2#getTargetConnection ->
TargetConnection#prepareStatement ->
PreparedStatement#executeUpdate
对分支事务回滚的影响
在事务回滚之后,为何undo_log没有被删除呢?
其实并不是没有被删除。前面已经说过,双层代理会导致undo_log
被当作分支事务来处理,所以也会为该 undo_log
生成一个undo_log(假设为undo_log2
),而undo_log2
生成的有问题(其实也没问题,就应该这样生成),从而导致回滚时会将业务表关联的undo_log
也一起删除,最终导致业务表对应的事务分支
回滚时发现undo_log不存在,从而又多生成一条状态为为1的undo_log
回滚之前
// undo_log
84 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.1KB 0
85 59734075254222849 172.16.120.59:23004:59734061438185472 serializer=jackson 4.0KB 0
// branch_table
59734070967644161 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware
59734075254222849 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware
// lock_table
jdbc:mysql://xx^^^seata_storage^^^1 59734070967644161 jdbc:mysql://172.16.248.10:3306/tuya_middleware seata_storage 1
jdbc:mysql://xx^^^undo_log^^^84 59734075254222849 jdbc:mysql://172.16.248.10:3306/tuya_middleware undo_log 84
回滚之后
// 生成了一条状态为1的undo_log,对应的日志为: undo_log added with GlobalFinished
86 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.0Byte 1
问题分析
- 根据xid和branchId找到对应的undo_log日志
- 对undo_log进行解析,主要就是解析它的
rollback_info
字段,rollback_info
解析出来就是一个SQLUndoLog集合
,每条SQLUndoLog
对应着一个操作,里面包含了该操作的前后的快照,然后执行对应的回滚 - 根据xid和branchId删除undo_log日志
因为双层代理问题,导致一条undo_log变成了一个分支事务,所以发生回滚时,我们也需要对undo_log分支事务进行回滚:
1、首先根据xid和branchId找到对应的undo_log
并解析其rollback_info
属性,这里解析出来的rollback_info包含了两条SQLUndoLog
。为什么有两条?
仔细想想也可以可以理解,第一层代理针对
seata_storage
的操作,放到缓存中,本来执行完之后是需要清掉的,但因为这里是双层代理,所以这时候这个流程并没有结束。轮到第二层代理对undo_log
操作时,将该操作放到缓存中,此时缓存中有两个操作,分别为seata_storage的UPDATE
和undo_log的INSERT
。所以这也就很好理解为什么针对undo_log操作
的那条undo_log格外大(4KB),因为它的rollback_info
包含了两个操作。
有一点需要注意的是,第一条SQLUndoLog
对应的after快照,里面的branchId=59734070967644161
pk=84
, 即 seata_storage分支对应的branchId
和 seata_storage对应的undo_log PK
。也就是说,undo_log回滚时候 把seata_storage对应的undo_log
删掉了。
那undo_log本身对应的undo_log 如何删除呢?在接下来的逻辑中会根据xid和branchId删除
2、解析第一条SQLUndoLog
,此时对应的是undo_log的INSERT
操作,所以其对应的回滚操作是DELETE
。因为undo_log
此时被当作了业务表。所以这一步会将59734075254222849
对应的undo_log删除,但这个其实是业务表对应的对应的undo_log
3、解析第二条SQLUndoLog
,此时对应的是seata_storage的UPDATE
操作,这时会通过快照将seata_storage
对应的记录恢复
4、根据xid和branchId删除undo_log日志,这里删除的是undo_log 的 undo_log , 即 undo_log2
。所以,执行到这里,两条undo_log就已经被删除了
5、接下来回滚seata_storage
,因为这时候它对应的undo_log已经在步骤2删掉了, 所以此时查不到undo_log,然后重新生成一条status == 1 的 undo_log
案例分析
背景
1、配置了三个数据源: 两个物理数据源、一个逻辑数据源,但是两个物理数据源对应的连接地址是一样的。这样做有意思吗?
@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}
@Bean("dsSlave")
DynamicDataSource dsSlave() {
return new DynamicDataSource(slaveDsRoute);
}
@Primary
@Bean("masterSlave")
DataSource masterSlave(@Qualifier("dsMaster") DataSource dataSourceMaster,
@Qualifier("dsSlave") DataSource dataSourceSlave) throws SQLException {
Map<String, DataSource> dataSourceMap = new HashMap<>(2);
//主库
dataSourceMap.put("dsMaster", dataSourceMaster);
//从库
dataSourceMap.put("dsSlave", dataSourceSlave);
// 配置读写分离规则
MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(
"masterSlave", "dsMaster", Lists.newArrayList("dsSlave")
);
Properties shardingProperties = new Properties();
shardingProperties.setProperty("sql.show", "true");
shardingProperties.setProperty("sql.simple", "true");
// 获取数据源对象
DataSource dataSource = MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, masterSlaveRuleConfig, shardingProperties);
log.info("datasource initialized!");
return dataSource;˚
}
2、开启seata的数据源动态代理,根据seata的数据源代理逻辑可以知道,最终会生成三个代理数据源,原生数据源和代理数据源的关系缓存在DataSourceProxyHolder.dataSourceProxyMap
中,假如原生数据源和代理数据源对应的关系如下:
dsMaster(DynamicDataSource) => dsMasterProxy(DataSourceProxy)
dsSlave(DynamicDataSource) => dsSlaveProxy(DataSourceProxy)
masterSlave(MasterSlaveDataSource) => masterSlaveProxy(DataSourceProxy)
所以,最终在IOC容器中存在的数据源是这三个: dsMasterProxy 、 dsSlaveProxy 、 masterSlaveProxy 。根据@Primary的特性可以知道,当我们从容器中获取一个DataSource的时候,默认返回的就是代理数据源 masterSlaveProxy
对shardingjdbc没有具体的研究过,只是根据debug时看到的代码猜测它的工作机制,又不对的地方,还请大佬指出来
masterSlaveProxy
可以看成是被 DataSourceProxy 包装后的 MasterSlaveDataSource
。我们可以大胆的猜测MasterSlaveDataSource