在Seata1.3.0版本中,数据源自动代理和手动代理一定不能混合使用,否则会导致多层代理,从而导致以下问题:
- 单数据源情况下:导致分支事务提交时,undo_log本身也被代理,即
为 undo_log 生成了 undo_log, 假设为undo_log2
,此时undo_log将被当作分支事务来处理;分支事务回滚时,因为undo_log2
生成的有问题,在undo_log对应的事务分支
回滚时会将业务表关联的undo_log
也一起删除,从而导致业务表对应的事务分支
回滚时发现undo_log不存在,从而又多生成一条状态为1的undo_log。这时候整体逻辑已经乱了,很严重的问题 - 多数据源和
逻辑数据源被代理
情况下:除了单数据源情况下会出现的问题,还可能会造成死锁问题。死锁的原因就是针对undo_log的操作,本该在一个事务中执行的select for update
和delete
操作,被分散在多个事务中执行,导致一个事务在执行完select for update
后一直不提交,一个事务在执行delete
时一直等待锁,直到超时
代理描述
即对DataSource代理一层,重写一些方法。比如getConnection
方法,这时不直接返回一个Connection
,而是返回ConnectionProxy
,其它的以此类推
// DataSourceProxy
public DataSourceProxy(DataSource targetDataSource) {
this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
}
private void init(DataSource dataSource, String resourceGroupId) {
DefaultResourceManager.get().registerResource(this);
}
public Connection getPlainConnection() throws SQLException {
return targetDataSource.getConnection();
}
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
手动代理
即手动注入一个DataSourceProxy
,如下
@Bean
public DataSource druidDataSource() {
return new DruidDataSource()
}
@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
自动代理
针对DataSource
创建一个代理类,在代理类里面基于DataSource
获取DataSourceProxy(如果没有就创建)
,然后调用DataSourceProxy
的相关方法。核心逻辑在SeataAutoDataSourceProxyCreator
中
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final String[] excludes;
private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());
public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {
this.excludes = excludes;
setProxyTargetClass(!useJdkProxy);
}
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
return new Object[]{advisor};
}
@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}
}
public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (m != null) {
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
@Override
public Class<?>[] getInterfaces() {
return new Class[]{SeataProxy.class};
}
}
数据源多层代理
@Bean
@DependsOn("strangeAdapter")
public DataSource druidDataSource(StrangeAdapter strangeAdapter) {
doxx
return new DruidDataSource()
}
@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
- 首先我们在配置类里面注入了两个
DataSource
,分别为:DruidDataSource
和DataSourceProxy
, 其中DruidDataSource 作为 DataSourceProxy 的 targetDataSource 属性
,并且DataSourceProxy
为使用了@Primary
注解声明 - 应用默认开启了数据源自动代理,所以在调用
DruidDataSource
相关方法时,又会为为DruidDataSource
创建一个对应的数据源代理DataSourceProxy2
- 当我们在程序中想获取一个Connection时会发生什么?
- 先获取一个
DataSource
,因为DataSourceProxy
为Primary
,所以此时拿到的是DataSourceProxy
- 基于
DataSource
获取一个Connection
,即通过DataSourceProxy
获取Connection
。此时会先调用targetDataSource 即 DruidDataSource 的 getConnection 方法
,但因为切面会对DruidDataSource
进行拦截,根据步骤2的拦截逻辑可以知道,此时会自动创建一个DataSourceProxy2
,然后调用DataSourceProxy2#getConnection
,然后再调用DruidDataSource#getConnection
。最终形成了双层代理, 返回的Connection
也是一个双层的ConnectionProxy
- 先获取一个
上面其实是改造之后的代理逻辑,Seata默认的自动代理会对DataSourceProxy
再次进行代理,后果就是代理多了一层此时对应的图如下
数据源多层代理会导致的两个问题在文章开头处已经总结了,下面会有案例介绍。
分支事务提交
通过ConnectionProxy
中执行对应的方法,会发生什么?以update操作涉及到的一个分支事务提交为例:
- 执行
ConnectionProxy#prepareStatement
, 返回一个PreparedStatementProxy
- 执行
PreparedStatementProxy#executeUpdate
,PreparedStatementProxy#executeUpdate
大概会帮做两件事情: 执行业务SQL和提交undo_log
提交业务SQL
// ExecuteTemplate#execute
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
主要流程就是: 先执行业务SQL,然后执行ConnectionProxy的commit方法,在这个方法中,会先帮我们执行对应的 undo_log SQL,然后提交事务
PreparedStatementProxy#executeUpdate =>
ExecuteTemplate#execute =>
BaseTransactionalExecutor#execute =>
AbstractDMLBaseExecutor#doExecute =>
AbstractDMLBaseExecutor#executeAutoCommitTrue =>
AbstractDMLBaseExecutor#executeAutoCommitFalse => 在这一步操中,会触发statementCallback#execute方法,即调用调用原生PreparedStatement#executeUpdate方法
ConnectionProxy#commit
ConnectionProxy#processGlobalTransactionCommit
UNDO_LOG插入
// ConnectionProxy#processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
try {
// 注册分支事务,简单理解向server发一个请求,然后server在branch_table表里插入一条记录,不关注
register();
} catch (TransactionException e) {
// 如果没有for update 的sql,会直接在commit之前做注册,此时不止插入一条branch记录,而会附带锁信息进行竞争,下方的异常一般就是在注册时没拿到锁抛出,一般就是纯update语句的并发下会触发竞争锁失败的异常 @FUNKYE
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// undo_log处理,期望用 targetConnection 处理 @1
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
// 提交本地事务,期望用 targetConnection 处理 @2
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
- undo_log处理@1,解析当前事务分支涉及到的
undo_log
,然后使用TargetConnection
, 写到数据库
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}
String xid = connectionContext.getXid();
long branchId = connectionContext.getBranchId();
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,cp.getTargetConnection());
}
- 提交本地事务@2,即通过
TargetConnection
提交事务。即务SQL执行
、undo_log写入