Seata Transaction Isolation
This article aims to help users understand how to correctly implement transaction isolation when using Seata AT mode to prevent dirty reads and writes.
It is expected that readers have already read the introduction to the AT mode on the Seata official website and have an understanding of local database locks.
(For example, when two transactions are simultaneously updating the same record, only the transaction that holds the record lock can update successfully, while the other transaction must wait until the record lock is released, or until the transaction times out)
First, take a look at this piece of code. Although it looks "basic," the main thing that the persistence layer framework actually does for us is just that.
@Service
public class StorageService {
@Autowired
private DataSource dataSource;
@GlobalTransactional
public void batchUpdate() throws SQLException {
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
String sql = "update storage_tbl set count = ?" +
" where id = ? and commodity_code = ?";
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, 100);
preparedStatement.setLong(2, 1);
preparedStatement.setString(3, "2001");
preparedStatement.executeUpdate();
connection.commit();
} catch (Exception e) {
throw e;
} finally {
IOutils.close(preparedStatement);
IOutils.close(connection);
}
}
}
Starting with the Proxy Data Source
When using the AT mode, the most important thing is the proxy data source. So what is the purpose of using DataSourceProxy
to proxy the data source?
DataSourceProxy
can help us obtain several important proxy objects
-
Obtain
ConnectionProxy
throughDataSourceProxy.getConnection()
-
Obtain
StatementProxy
throughConnectionProxy.prepareStatement(...)
Seata's implementation of transaction isolation is hidden in these 2 proxies, let me outline the implementation logic first.
Processing logic of StatementProxy.executeXXX()
-
When calling
io.seata.rm.datasource.StatementProxy.executeXXX()
, the SQL is passed toio.seata.rm.datasource.exec.ExecuteTemplate.execute(...)
to process.- In the
ExecuteTemplate.execute(...)
method, Seata uses different Executers based on differentdbType
and SQL statement types, and calls theexecute(Object... args)
method of theio.seata.rm.datasource.exec.Executer
class. - If a DML type Executer is chosen, the following main actions are performed:
- Pre-query image (select for update, obtaining local lock at this time)
- Execute business SQL
- Post-query image
- Prepare undoLog
- If your SQL is
select for update
, thenSelectForUpdateExecutor
will be used (Seata proxiesselect for update
), and the logic for post-processing after proxying is as follows:- First, execute select for update (obtain the database's local lock)
- If in
@GlobalTransactional
or@GlobalLock
, check if there is a global lock - If there is a global lock, under the condition of not starting a local transaction, rollback the local transaction, then re-acquire the local lock and global lock, and so on, unless the global lock is obtained.
- In the
Handling logic of ConnectionProxy.commit()
- In a global transaction (i.e., the data persistence method has
@GlobalTransactional
)- Register branch transaction, obtain global lock
- UndoLog data persistence
- Let the database commit the current transaction
- In
@GlobalLock
(i.e., the data persistence method has@GlobalLock
)- Query the TC for the existence of a global lock, and if it exists, throw an exception
- Let the database commit the current transaction
- For other cases (the
else
branch)- Let the database commit the current transaction
Purpose of @GlobalTransactional
Identifies a global transaction
Purpose of @GlobalLock
+ select for update
If a method like updateA()
has @GlobalLock + select for update
, Seata, in processing, will first obtain a database local lock, then query if there is a global lock for that record, and if there is, it will throw a LockConflictException.
Let's first give an example of dirty write, and then see how Seata prevents dirty write
Let's assume your business code is like this:
updateAll()
is used to update records in both table A and B,updateA()
andupdateB()
are used to update records in table A and B respectivelyupdateAll()
has already been annotated with@GlobalTransactional
class YourBussinessService {
DbServiceA serviceA;
DbServiceB serviceB;
@GlobalTransactional
public boolean updateAll(DTO dto) {
serviceA.update(dto.getA());
serviceB.update(dto.getB());
}
public boolean updateA(DTO dto) {
serviceA.update(dto.getA());
}
}
class DbServiceA {
@Transactional
public boolean update(A a) {
}
}
|
How to prevent dirty write using Seata?
Method 1: Add @GlobalTransactional
to updateA()
as well, how does Seata ensure transaction isolation in this case?
class DbServiceA {
@GlobalTransactional
@Transactional
public boolean updateA(DTO dto) {
serviceA.update(dto.getA());
}
}
updateAll()
is called first (not completed),updateA()
is called afterwards
Method 2: @GlobalLock + select for update
class DbServiceA {
@GlobalLock
@Transactional
public boolean updateA(DTO dto) {
serviceA.selectForUpdate(dto.getA());
serviceA.update(dto.getA());
}
}
updateAll()
is called first (not completed),updateA()
is called afterwards
- What if
updateA()
is called first (not completed), and thenupdateAll()
is called? Since both transactions need to acquire local locks first, dirty write will not occur. - Someone may ask, "Why do we need to add select for update here? Can't we prevent dirty write with just @GlobalLock?"
Yes. But please refer to the diagram above, select for update brings a few advantages:
- Lock conflicts are handled more gracefully. If only @GlobalLock is used, it immediately throws an exception when a global lock is detected. It's a pity to release the global lock after a little "persistence" and throw an exception.
- In
updateA()
, we can use select for update to get the latest A and then perform the update.
How to prevent dirty reads?
Scenario: One business calls updateAll()
first, updateAll()
is not completed, and then another business calls queryA()
Source Code Display
@Service
public class StorageService {
@Autowired
private DataSource dataSource;
@GlobalTransactional
public void update() throws SQLException {
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
String sql = "update storage_tbl set count = ?" +
" where id = ? and commodity_code = ?";
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, 100);
preparedStatement.setLong(2, 1);
preparedStatement.setString(3, "2001");
preparedStatement.execute();
connection.commit();
} catch (Exception e) {
throw e;
} finally {
IOutils.close(preparedStatement);
IOutils.close(connection);
}
}
}
Although this code looks very basic and does not use the persistence layer framework, if we abstract what the framework does for us, it is actually the above code.
Brief explanation of the context of the following source code introduction (mainly focusing on source code related to transaction isolation)
- Purpose of proxy data source
- The role of
DataSourceProxy
(returnsConnectionProxy
)- Introducing a small function of
ConnectionProxy
(storing undolog)
- Introducing a small function of
- The role of
ConnectionProxy
(returnsStatementProxy
) - Processing logic of
StatementProxy.execute()
- Execution logic of
io.seata.rm.datasource.exec.UpdateExecutor
(pre-check image, execute sql, post-check image, prepare undoLog) - Execution logic of
SelectForUpdateExecutor
(fight for local lock, check global lock. If there is a global lock, roll back, fight again...)
- Execution logic of
- Processing logic of
ConnectionProxy.commit()
(register branch transaction (fight for global lock), write undoLog, database commit)
- The role of
- Introducing RootContext
- Different proxy logic for
GlobalTransactionalInterceptor
- How to handle with
@GlobalTransactional
- How to deal with
@GlobalLock
- How to handle with
The role of DataSourceProxy
DataSourceProxy helps us obtain several important proxy objects
-
Obtain
ConnectionProxy
throughDataSourceProxy.getConnection()
package io.seata.rm.datasource;
import java.sql.Connection;
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
}-
First, let's introduce
ConnectionContext
inConnectionProxy
, one of its functions is to store undoLog.package io.seata.rm.datasource;
import io.seata.rm.datasource.undo.SQLUndoLog;
public class ConnectionProxy extends AbstractConnectionProxy {
private ConnectionContext context = new ConnectionContext();
public void appendUndoLog(SQLUndoLog sqlUndoLog) {
context.appendUndoItem(sqlUndoLog);
}
}package io.seata.rm.datasource;
public class ConnectionContext {
private static final Savepoint DEFAULT_SAVEPOINT = new Savepoint() {
@Override
public int getSavepointId() throws SQLException {
return 0;
}
@Override
public String getSavepointName() throws SQLException {
return "DEFAULT_SEATA_SAVEPOINT";
}
};
private final Map<Savepoint, List<SQLUndoLog>> sqlUndoItemsBuffer = new LinkedHashMap<>();
private Savepoint currentSavepoint = DEFAULT_SAVEPOINT;
void appendUndoItem(SQLUndoLog sqlUndoLog) {
sqlUndoItemsBuffer.computeIfAbsent(currentSavepoint, k -> new ArrayList<>()).add(sqlUndoLog);
}
}
-
Get StatementProxy
through ConnectionProxy.prepareStatement(...)
package io.seata.rm.datasource;
public class ConnectionProxy extends AbstractConnectionProxy {
public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
super(dataSourceProxy, targetConnection);
}
}
package io.seata.rm.datasource;
import java.sql.Connection;
public abstract class AbstractConnectionProxy implements Connection {
protected Connection targetConnection;
public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
this.dataSourceProxy = dataSourceProxy;
this.targetConnection = targetConnection;
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
if (BranchType.AT == RootContext.getBranchType()) { //为什么这里会返回AT?
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
// If it is an insert statement, the PreparedStatement created here needs to be able to return the automatically generated primary key, so use this prepareStatement()
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}
}
}
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
public Connection getTargetConnection() {
return targetConnection;
}
}
First, let's raise a question here, and explain it later.
How couldRootContext.getBranchType()
return AT?