Skip to main content

· 12 min read

Seata currently supports AT mode, XA mode, TCC mode, and SAGA mode. Previous articles have talked more about non-intrusive AT mode. Today, we will introduce TCC mode, which is also a two-phase commit.

What is TCC

TCC is a two-phase commit protocol in distributed transactions. Its full name is Try-Confirm-Cancel. Their specific meanings are as follows:

  1. Try: Check and reserve business resources;
  2. Confirm: Commit the business transaction, i.e., the commit operation. If Try is successful, this step will definitely be successful;
  3. Cancel: Cancel the business transaction, i.e., the rollback operation. This step will release the resources reserved in Try.

TCC is an intrusive distributed transaction solution. All three operations need to be implemented by the business system itself, which has a significant impact on the business system. The design is relatively complex, but the advantage is that TCC does not rely on the database. It can manage resources across databases and applications, and can implement an atomic operation for different data access through intrusive coding, better solving the distributed transaction problems in various complex business scenarios.

img

Seata TCC mode

Seata TCC mode follows the same principle as the general TCC mode. Let's first use Seata TCC mode to implement a distributed transaction:

Suppose there is a business that needs to use service A and service B to complete a transaction operation. We define a TCC interface for this service in service A:

public interface TccActionOne {
@TwoPhaseBusinessAction(name = "DubboTccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") String a);

public boolean commit(BusinessActionContext actionContext);

public boolean rollback(BusinessActionContext actionContext);
}

Similarly, we define a TCC interface for this service in service B:

public interface TccActionTwo {
@TwoPhaseBusinessAction(name = "DubboTccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
public void prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "b") String b);

public void commit(BusinessActionContext actionContext);

public void rollback(BusinessActionContext actionContext);
}

In the business system, we start a global transaction and execute the TCC reserve resource methods for service A and service B:

@GlobalTransactional
public String doTransactionCommit(){
// Service A transaction participant
tccActionOne.prepare(null,"one");
// Service B transaction participant
tccActionTwo.prepare(null,"two");
}

The example above demonstrates the implementation of a global transaction using Seata TCC mode. It can be seen that the TCC mode also uses the @GlobalTransactional annotation to initiate a global transaction, while the TCC interfaces of Service A and Service B are transaction participants. Seata treats a TCC interface as a Resource, also known as a TCC Resource.

TCC interfaces can be RPC or internal JVM calls, meaning that a TCC interface has both a sender and a caller identity. In the example above, the TCC interface is the sender in Service A and Service B, and the caller in the business system. If the TCC interface is a Dubbo RPC, the caller is a dubbo:reference and the sender is a dubbo:service.

img

When Seata starts, it scans and parses the TCC interfaces. If a TCC interface is a sender, Seata registers the TCC Resource with the TC during startup, and each TCC Resource has a resource ID. If a TCC interface is a caller, Seata proxies the caller and intercepts the TCC interface calls. Similar to the AT mode, the proxy intercepts the call to the Try method, registers a branch transaction with the TC, and then executes the original RPC call.

When the global transaction decides to commit/rollback, the TC will callback to the corresponding participant service to execute the Confirm/Cancel method of the TCC Resource using the resource ID registered by the branch.

How Seata Implements TCC Mode

From the above Seata TCC model, it can be seen that the TCC mode in Seata also follows the TC, TM, RM three-role model. How to implement TCC mode in these three-role models? I mainly summarize the implementation as resource parsing, resource management, and transaction processing.

Resource Parsing

Resource parsing is the process of parsing and registering TCC interfaces. As mentioned earlier, TCC interfaces can be RPC or internal JVM calls. In the Seata TCC module, there is a remoting module that is specifically used to parse TCC interfaces with the TwoPhaseBusinessAction annotation:

img

The RemotingParser interface mainly has methods such as isRemoting, isReference, isService, getServiceDesc, etc. The default implementation is DefaultRemotingParser, and the parsing of various RPC protocols is executed in DefaultRemotingParser. Seata has already implemented parsing of Dubbo, HSF, SofaRpc, and LocalTCC RPC protocols while also providing SPI extensibility for additional RPC protocol parsing classes.

During the Seata startup process, the GlobalTransactionScanner annotation is used for scanning and executes the following method:

io.seata.spring.util.TCCBeanParserUtils#isTccAutoProxy

The purpose of this method is to determine if the bean has been TCC proxied. In the process, it first checks if the bean is a Remoting bean. If it is, it calls the getServiceDesc method to parse the remoting bean, and if it is a sender, it registers the resource:

io.seata.rm.tcc.remoting.parser.DefaultRemotingParser#parserRemotingServiceInfo

public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser){
RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);
if(remotingBeanDesc == null){
return null;
}
remotingServiceMap.put(beanName, remotingBeanDesc);

Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if (remotingParser.isService(bean, beanName)) {
try {
//service bean, registry resource
Object targetBean = remotingBeanDesc.getTargetBean();
for (Method m : methods) {
TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
TCCResource tccResource = new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
tccResource.setTargetBean(targetBean);
tccResource.setPrepareMethod(m);
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),
twoPhaseBusinessAction.commitArgsClasses()));
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),
twoPhaseBusinessAction.rollbackArgsClasses()));
// set argsClasses
tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());
tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());
// set phase two method's keys
tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),
twoPhaseBusinessAction.commitArgsClasses()));
tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),
twoPhaseBusinessAction.rollbackArgsClasses()));
// registry tcc resource
DefaultResourceManager.get().registerResource(tccResource);
}
}
} catch (Throwable t) {
throw new FrameworkException(t, "parser remoting service error");
}
}
if (remotingParser.isReference(bean, beanName)) {
// reference bean, TCC proxy
remotingBeanDesc.setReference(true);
}
return remotingBeanDesc;
}

The above method first calls the parsing class getServiceDesc method to parse the remoting bean and puts the parsed remotingBeanDesc into the local cache remotingServiceMap. At the same time, it calls the parsing class isService method to determine if it is the initiator. If it is the initiator, it parses the content of the TwoPhaseBusinessAction annotation to generate a TCCResource and registers it as a resource.

Resource Management

1. Resource Registration

The resource for Seata TCC mode is called TCCResource, and its resource manager is called TCCResourceManager. As mentioned earlier, after parsing the TCC interface RPC resource, if it is the initiator, it will be registered as a resource:

io.seata.rm.tcc.TCCResourceManager#registerResource

public void registerResource(Resource resource){
TCCResource tccResource=(TCCResource)resource;
tccResourceCache.put(tccResource.getResourceId(),tccResource);
super.registerResource(tccResource);
}

TCCResource contains the relevant information of the TCC interface and is cached locally. It continues to call the parent class registerResource method (which encapsulates communication methods) to register with the TC. The TCC resource's resourceId is the actionName, and the actionName is the name in the @TwoParseBusinessAction annotation.

2. Resource Commit/Rollback

io.seata.rm.tcc.TCCResourceManager#branchCommit

public BranchStatus branchCommit(BranchType branchType,String xid,long branchId,String resourceId,
String applicationData)throws TransactionException{
TCCResource tccResource=(TCCResource)tccResourceCache.get(resourceId);
if(tccResource==null){
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s",resourceId));
}
Object targetTCCBean=tccResource.getTargetBean();
Method commitMethod=tccResource.getCommitMethod();
if(targetTCCBean==null||commitMethod==null){
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s",resourceId));
}
try{
//BusinessActionContext
BusinessActionContext businessActionContext=getBusinessActionContext(xid,branchId,resourceId,
applicationData);
// ... ...
ret=commitMethod.invoke(targetTCCBean,args);
// ... ...
return result?BranchStatus.PhaseTwo_Committed:BranchStatus.PhaseTwo_CommitFailed_Retryable;
}catch(Throwable t){
String msg=String.format("commit TCC resource error, resourceId: %s, xid: %s.",resourceId,xid);
LOGGER.error(msg,t);
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}

When the TM resolves the phase two commit, the TC will callback to the corresponding participant (i.e., TCC interface initiator) service to execute the Confirm/Cancel method of the TCC Resource registered by the branch.

In the resource manager, the corresponding TCCResource will be found in the local cache based on the resourceId, and the corresponding BusinessActionContext will be found based on xid, branchId, resourceId, and applicationData, and the parameters to be executed are in the context. Finally, the commit method of the TCCResource is executed to perform the phase two commit.

The phase two rollback is similar.

Transaction Processing

As mentioned earlier, if the TCC interface is a caller, the Seata TCC proxy will be used to intercept the caller and register the branch before processing the actual RPC method call.

The method io.seata.spring.util.TCCBeanParserUtils#isTccAutoProxy not only parses the TCC interface resources, but also determines whether the TCC interface is a caller. If it is a caller, it returns true:

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

img

As shown in the figure, when GlobalTransactionalScanner scans the TCC interface caller (Reference), it will proxy and intercept it with TccActionInterceptor, which implements MethodInterceptor.

In TccActionInterceptor, it will also call ActionInterceptorHandler to execute the interception logic, and the transaction-related processing is in the ActionInterceptorHandler#proceed method:

public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback) throws Throwable {
//Get action context from arguments, or create a new one and then reset to arguments
BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments);
//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
// ... ...
try {
// ... ...
return targetCallback.execute();
} finally {
try {
//to report business action context finally if the actionContext.getUpdated() is true
BusinessActionContextUtil.reportContext(actionContext);
} finally {
// ... ...
}
}
}

In the process of executing the first phase of the TCC interface, the doTccActionLogStore method is called for branch registration, and the TCC-related information such as parameters is placed in the context. This context will be used for resource submission/rollback as mentioned above.

How to control exceptions

In the process of executing the TCC model, various exceptions may occur, the most common of which are empty rollback, idempotence, and suspense. Here I will explain how Seata handles these three types of exceptions.

How to handle empty rollback

What is an empty rollback?

An empty rollback refers to a situation in a distributed transaction where the TM drives the second-phase rollback of the participant's Cancel method without calling the participant's Try method.

How does an empty rollback occur?

img

As shown in the above figure, after the global transaction is opened, participant A will execute the first-phase RPC method after completing branch registration. If the machine where participant A is located crashes or there is a network anomaly at this time, the RPC call will fail, meaning that participant A's first-phase method did not execute successfully. However, the global transaction has already been opened, so Seata must progress to the final state. When the global transaction is rolled back, participant A's Cancel method will be called, resulting in an empty rollback.

To prevent empty rollback, it is necessary to identify it in the Cancel method. How does Seata do this?

Seata's approach is to add a TCC transaction control table, which contains the XID and BranchID information of the transaction. A record is inserted when the Try method is executed, indicating that phase one has been executed. When the Cancel method is executed, this record is read. If the record does not exist, it means that the Try method was not executed.

How to Handle Idempotent Operations

Idempotent operation refers to TC repeating the two-phase commit, so the Confirm/Cancel interface needs to support idempotent processing, which means that it will not cause duplicate resource submission or release.

So how does idempotent operation arise?

img

As shown in the above figure, after participant A completes the two phases, network jitter or machine failure may cause TC not to receive the return result of participant A's execution of the two phases. TC will continue to make repeated calls until the two-phase execution result is successful.

How does Seata handle idempotent operations?

Similarly, a status field is added to the TCC transaction control table. This field has 3 values:

  1. tried: 1
  2. committed: 2
  3. rollbacked: 3

After the execution of the two-phase Confirm/Cancel method, the status is changed to committed or rollbacked. When the two-phase Confirm/Cancel method is called repeatedly, checking the transaction status can solve the idempotent problem.

How to Handle Suspend

Suspension refers to the two-phase Cancel method being executed before the phase Try method, because empty rollback is allowed. After the execution of the two-phase Cancel method, directly returning success, the global transaction has ended. However, because the Try method is executed later, this will cause the resources reserved by the phase Try method to never be committed or released.

So how does suspension arise?

img

As shown in the above figure, when participant A's phase Try method is executed, network congestion occurs, and due to Seata's global transaction timeout limit, after the Try method times out, TM resolves to roll back the global transaction. After the rollback is completed, if the RPC request arrives at participant A at this time and the Try method is executed to reserve resources, it will cause suspension.

How does Seata handle suspension?

Add a status to the TCC transaction control table:

  1. suspended: 4

When the two-phase Cancel method is executed, if it is found that there is no related record in the TCC transaction control table, it means that the two-phase Cancel method is executed before the phase Try method. Therefore, a record with status=4 is inserted. Then, when the phase Try method is executed, if status=4 is encountered, it means that the two-phase Cancel has been executed, and false is returned to prevent the phase Try method from succeeding.

Author Introduction

Zhang Chenghui, currently working at Ant Group, loves to share technology. He is the author of the WeChat public account "Advanced Backend," the author of the technical blog (https://objcoding.com/), and his GitHub ID is: objcoding.

· 8 min read

Seata AT mode is a non-intrusive distributed transaction solution. Seata internally implements a proxy layer for database operations. When using Seata AT mode, we actually use the built-in data source proxy DataSourceProxy provided by Seata. Seata adds a lot of logic in this proxy layer, such as inserting rollback undo_log records and checking global locks.

Why check global locks? This is because the transaction isolation of Seata AT mode is based on the local isolation level of supporting transactions. Under the premise of database local isolation level of read committed or above, Seata designs a global write exclusive lock maintained by the transaction coordinator to ensure write isolation between transactions. At the same time, global transactions are by default defined at the read uncommitted isolation level.

Understanding Seata Transaction Isolation Levels

Before discussing Seata transaction isolation levels, let's review the isolation levels of database transactions. Currently, there are four types of database transaction isolation levels, from lowest to highest:

  1. Read uncommitted
  2. Read committed
  3. Repeatable read
  4. Serializable

The default isolation level for databases is usually read committed, such as Oracle, while some databases default to repeatable read, such as MySQL. Generally, the read committed isolation level of databases can satisfy the majority of business scenarios.

We know that a Seata transaction is a global transaction, which includes several local transaction branches. During the execution of a global transaction (before the global transaction is completed), if a local transaction commits and Seata does not take any measures, it may lead to reading of committed local transactions, causing dirty reads. If a local transaction that has been committed before the global transaction commits is modified, it may cause dirty writes.

From this, we can see that traditional dirty reads involve reading uncommitted data, while Seata's dirty reads involve reading data that has not been committed under the global transaction, where the global transaction may include multiple local transactions. The fact that one local transaction commits does not mean that the global transaction commits.

Working under the read committed isolation level is fine for the vast majority of applications. In fact, the majority of scenarios that work under the read uncommitted isolation level also work fine.

In extreme scenarios, if an application needs to achieve global read committed, Seata also provides a global lock mechanism to implement global transaction read committed. However, by default, Seata's global transactions work under the read uncommitted isolation level to ensure efficiency in the majority of scenarios.

Implementation of Global Locks

In AT mode, Seata uses the internal data source proxy DataSourceProxy, and the implementation of global locks is hidden within this proxy. Let's see what happens during the execution and submission processes.

1. Execution Process

The execution process is in the StatementProxy class. During execution, if the executed SQL is select for update, the SelectForUpdateExecutor class is used. If the executed method is annotated with @GlobalTransactional or @GlobalLock, it checks if there is a global lock. If a global lock exists, it rolls back the local transaction and continuously competes to obtain local and global locks through a while loop.

io.seata.rm.datasource.exec.SelectForUpdateExecutor#doExecute

public T doExecute(Object... args) throws Throwable {
Connection conn = statementProxy.getConnection();
// ... ...
try {
// ... ...
while (true) {
try {
// ... ...
if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
// Do the same thing under either @GlobalTransactional or @GlobalLock,
// that only check the global lock here.
statementProxy.getConnectionProxy().checkLock(lockKeys);
} else {
throw new RuntimeException("Unknown situation!");
}
break;
} catch (LockConflictException lce) {
if (sp != null) {
conn.rollback(sp);
} else {
conn.rollback();
}
// trigger retry
lockRetryController.sleep(lce);
}
}
} finally {
// ...
}

2. Submission Process

The submission process occurs in the doCommit method of ConnectionProxy.

  1. If the executed method is annotated with @GlobalTransactional, it will acquire the global lock during branch registration:
  • Requesting TC to register a branch

io.seata.rm.datasource.ConnectionProxy#register

private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
context.setBranchId(branchId);
}
  • When a TC registers a branch, it obtains a global lock

io.seata.server.transaction.at.ATCore#branchSessionLock

protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
if (!branchSession.lock()) {
throw new BranchTransactionException(LockKeyConflict, String
.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()));
}
}

2)If the execution method has a '@GlobalLock' annotation, the global lock is checked for existence before committing, and if it does, an exception is thrown:

io.seata.rm.datasource.ConnectionProxy#processLocalCommitWithGlobalLocks

private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}

GlobalLock Annotation Explanation

From the execution process and submission process, it can be seen that since opening a global transaction with the @GlobalTransactional annotation can check if the global lock exists before transaction submission, why does Seata still provide a @GlobalLock annotation?

This is because not all database operations require opening a global transaction, and opening a global transaction is a relatively heavy operation that involves initiating RPC processes to TC. The @GlobalLock annotation only checks the existence of the global lock during the execution process and does not initiate a global transaction. Therefore, when there is no need for a global transaction but the global lock needs to be checked to avoid dirty reads and writes, using the @GlobalLock annotation is a lighter operation.

How to Prevent Dirty Writes

Let's first understand how dirty writes occur when using Seata AT mode:

Note: Other processes in the branch transaction execution are omitted.

When Business One starts a global transaction containing branch transaction A (modifying A) and branch transaction B (modifying B), Business Two modifies A. Business One's branch transaction A obtains a local lock before Business Two, waiting for Business One to complete the execution of branch transaction A. Business Two then obtains the local lock, modifies A, and commits it to the database. However, Business One encounters an exception during the execution of branch transaction A. Since the data of branch transaction A has been modified by Business Two, Business One's global transaction cannot be rolled back.

How to prevent dirty writes?

  1. Business Two uses @GlobalTransactional annotation:

Note: Other processes in the branch transaction execution are omitted.

During the execution of the global transaction by Business Two, when registering the branch transaction before the submission of branch transaction A and acquiring the global lock, it finds that Business One's global lock has not been released yet. Therefore, Business Two cannot commit and throws an exception to roll back, thus preventing dirty writes.

  1. Business Two uses @GlobalLock annotation:

Note: Other processes in the branch transaction execution are omitted.

Similar to the effect of @GlobalTransactional annotation, but without the need to open a global transaction, it only checks the existence of the global lock before local transaction submission.

  1. Business Two uses @GlobalLock annotation + select for update statement:

If a select for update statement is added, it checks the existence of the global lock before the update operation. Business Two can only execute the updateA operation after the global lock is released.

If only @Transactional is used, there is a possibility of dirty writes. The fundamental reason is that without the GlobalLock annotation, the global lock is not checked, which may lead to another global transaction finding that a branch transaction has been modified when rolling back. Therefore, adding select for update also has a benefit, which is that it allows for retries.

How to Prevent Dirty Reads

Dirty reads in Seata AT mode refer to the scenario where data from a branch transaction that has been committed is read by another business before the global transaction is committed. Essentially, this is because Seata's default global transaction isolation level is read uncommitted.

So how to prevent dirty reads?

Business Two queries A with @GlobalLock annotation + select for update statement:

Adding the select for update statement checks the existence of the global lock before executing the SQL. The SQL can only be executed after the global lock is acquired, thus preventing dirty reads.

Author Bio:

Zhang Chenghui currently works at Ant Group and is passionate about sharing technology. He is the author of the WeChat public account "后端进阶" (Backend Advancement) and the owner of the technical blog (https://objcoding.com/). He is also a Seata Contributor with GitHub ID: objcoding.

· 8 min read

In the previous analysis of the new version of the Snowflake algorithm, we mentioned two changes made in the new version:

  1. The timestamp no longer constantly follows the system clock.
  2. The exchange of positions between node ID and timestamp. From the original: Original Bit Allocation Strategy to: Improved Bit Allocation Strategy

A careful student raised a question: In the new version, the algorithm is indeed monotonically increasing within a single node, but in a multi-instance deployment, it is no longer globally monotonically increasing! Because it is obvious that the node ID is in the high bits, so the generated ID with a larger node ID will definitely be greater than the ID with a smaller node ID, regardless of the chronological order. In contrast, the original algorithm, with the timestamp in the high bits and always following the system clock, can ensure that IDs generated earlier are smaller than those generated later. Only when two nodes happen to generate IDs at the same timestamp, the order of the two IDs is determined by the node ID. So, does it mean that the new version of the algorithm is wrong?

This is a great question! The fact that students can raise this question indicates a deep understanding of the essential differences between the standard Snowflake algorithm and the new version. This is commendable! Here, let's first state the conclusion: indeed, the new version of the algorithm does not possess global monotonicity, but this does not affect our original intention (to reduce database page splits). This conclusion may seem counterintuitive but can be proven.

Before providing the proof, let's briefly review some knowledge about page splits in databases. Taking the classic MySQL InnoDB as an example, InnoDB uses a B+ tree index where the leaf nodes of the primary key index also store the complete records of data rows. The leaf nodes are linked together in the form of a doubly linked list. The physical storage form of the leaf nodes is a data page, and each data page can store up to N rows of records (where N is inversely proportional to the size of each row). As shown in the diagram: Data Page The characteristics of the B+ tree require that the left node should be smaller than the right node. What happens if we want to insert a record with an ID of 25 at this point (assuming each data page can only hold 4 records)? The answer is that it will cause a page split, as shown in the diagram: Page Split Page splits are unfriendly to I/O, requiring the creation of new data pages, copying and transferring part of the records from the old data page, etc., and should be avoided as much as possible.

Ideally, the primary key ID should be sequentially increasing (for example, setting the primary key as auto_increment). This way, a new page will only be needed when the current data page is full, and the doubly linked list will always grow sequentially at the tail, avoiding any mid-node splits.

In the worst-case scenario, if the primary key ID is randomly generated and unordered (for example, a UUID string in Java), new records will be randomly assigned to any data page. If the page is already full, it will trigger a page split.

If the primary key ID is generated by the standard Snowflake algorithm, in the best-case scenario, only one node is generating IDs within each timestamp. In this case, the algorithm's effect is equivalent to the ideal situation of sequential incrementation, similar to auto_increment. In the worst-case scenario, all nodes within each timestamp are generating IDs, and the algorithm's effect is close to unordered (but still much better than completely unordered UUIDs, as the workerId with only 10 bits limits the nodes to a maximum of 1024). In actual production, the algorithm's effectiveness depends on business traffic, and the lower the concurrency, the closer the algorithm is to the ideal scenario.

So, how does it fare with the new version of the algorithm?

The new version of the algorithm, from a global perspective, produces IDs in an unordered manner. However, for each workerId, the generated IDs are strictly monotonically increasing. Additionally, since workerId is finite, it can divide into a maximum of 1024 subsequences, each of which is monotonically increasing.

For a database, initially, the received IDs may be unordered, coming from various subsequences, as illustrated here: Initial State

If, at this point, a worker1-seq2 arrives, it will clearly cause a page split: First Split

However, after the split, interesting things happen. For worker1, subsequent seq3, seq4 will not cause page splits anymore (because there is still space), and seq5 only needs to link to a new page for sequential growth (the difference is that this new page is not at the tail of the doubly linked list). Note that the subsequent IDs of worker1 will not be placed after any nodes from worker2 or beyond (thus avoiding page splits for later nodes) because they are always smaller than the IDs of worker2; nor will they be placed before the current node of worker1 (thus avoiding page splits for previous nodes) because the subsequences of worker1 are always monotonically increasing. Here, we refer to such subsequences as reaching a steady state, meaning that the subsequence has "stabilized," and its subsequent growth will only occur at the end of the subsequence without causing page splits for other nodes.

The same principle can be extended to all subsequences. Regardless of how chaotic the IDs received by the database are initially, after a finite number of page splits, the doubly linked list can always reach a stable state: Steady State

After reaching the steady state, subsequent IDs will only grow sequentially within their respective subsequences, without causing page splits. The difference between this sequential growth and the sequential growth of auto_increment is that the former has 1024 growth points (the ends of various subsequences), while the latter only has one at the end.

At this point, we can answer the question posed at the beginning: indeed, the new algorithm is not globally monotonically increasing, but the algorithm converges. After reaching a steady state, the new algorithm can achieve the same effect as global sequential incrementation.

Further Considerations

The discussion so far has focused on the continuous growth of sequences. However, in practical production, there is not only the insertion of new data but also the deletion of old data. Data deletion may lead to page merging (InnoDB, if it finds that the space utilization of two adjacent data pages is both less than 50%, it will merge them). How does this affect the new algorithm?

As we have seen in the above process, the essence of the new algorithm is to utilize early page splits to gradually separate different subsequences, allowing the algorithm to continuously converge to a steady state. Page merging, on the other hand, may reverse this process by merging different subsequences back into the same data page, hindering the convergence of the algorithm. Especially in the early stages of convergence, frequent page merging may even prevent the algorithm from converging forever (I just separated them, and now I'm merging them back together, back to square one~)! However, after convergence, only page merging at the end nodes of each subsequence has the potential to disrupt the steady state (merging the end node of one subsequence with the head node of the next subsequence). Merging on the remaining nodes of the subsequence does not affect the steady state because the subsequence remains ordered, albeit with a shorter length.

Taking Seata's server as an example, the data in the three tables of the server has a relatively short lifecycle. After a global transaction ends, the data is cleared. This is not friendly to the new algorithm, as it does not provide enough time for convergence. However, there is already a pull request (PR) for delayed deletion in the review process, and with this PR, the effect will be much better. For example, periodic weekly cleanup allows sufficient time for the algorithm to converge in the early stages, and for most of the time, the database can benefit from it. At the time of cleanup, the worst-case result is that the table is cleared, and the algorithm starts from scratch.

If you wish to apply the new algorithm to a business system, make sure to ensure that the algorithm has time to converge. For example, for user tables or similar, where data is intended to be stored for a long time, the algorithm can naturally converge. Alternatively, implement a mechanism for delayed deletion, providing enough time for the algorithm to converge.

If you have better opinions and suggestions, feel free to contact the Seata community!

· 6 min read

Seata incorporates a distributed UUID generator to assist in generating global transaction IDs and branch transaction IDs. The desired characteristics for this generator include high performance, global uniqueness, and trend incrementation.

High performance is self-explanatory, and global uniqueness is crucial to prevent confusion between different global transactions or branch transactions. Additionally, trend incrementation is valuable for users employing databases as the storage tool for TC clusters, as it can reduce the frequency of data page splits, thereby minimizing database IO pressure (the branch_table table uses the branch transaction ID as the primary key).

In the older version of Seata (prior to 1.4), the implementation of this generator was based on the standard version of the Snowflake algorithm. The standard Snowflake algorithm has been well-documented online, so we won't delve into it here. If you're unfamiliar with it, consider referring to existing resources before continuing with this article.

Here, we discuss some drawbacks of the standard Snowflake algorithm:

  1. Clock Sensitivity: Since ID generation is always tied to the current operating system's timestamp (leveraging the monotonicity of time), a clock rollback may result in repeated IDs. Seata's strategy to handle this is by recording the last timestamp and rejecting service if the current timestamp is less than the recorded value (indicating a clock rollback). The service waits until the timestamp catches up with the recorded value. However, this means the TC will be in an unavailable state during this period.

  2. Burst Performance Limit: The standard Snowflake algorithm claims a high QPS, approximately 4 million/s. However, strictly speaking, this is a bit misleading. The timestamp unit of the algorithm is milliseconds, and the bit length allocated to the sequence number is 12, allowing for 4096 sequence spaces per millisecond. So, a more accurate description would be 4096/ms. The distinction between 4 million/s and 4096/ms lies in the fact that the former doesn't require every millisecond's concurrency to be below 4096. Seata also adheres to this limitation. If the sequence space for the current timestamp is exhausted, it will spin-wait for the next timestamp.

In newer versions (1.4 and beyond), the generator has undergone optimizations and improvements to address these issues effectively. The core idea of the improvement is to decouple from the operating system's timestamp, with the generator obtaining the system's current timestamp only during initialization as the initial timestamp. Subsequently, it no longer synchronizes with the system timestamp. The incrementation is solely driven by the incrementation of the sequence number. For example, when the sequence number reaches its maximum value (4095), the next request causes an overflow of the 12-bit space. The sequence number resets to zero, and the overflow carry is added to the timestamp, incrementing it by 1. Thus, the timestamp and sequence number can be considered as a single entity. In practice, we adjusted the bit allocation strategy for the 64-bit ID, swapping the positions of the timestamp and node ID for easier handling of this overflow carry:

Original Bit Allocation Strategy: Original Bit Allocation Strategy

Modified Bit Allocation Strategy (swapping timestamp and node ID): Modified Bit Allocation Strategy

This arrangement allows the timestamp and sequence number to be contiguous in memory, making it easy to use an AtomicLong to simultaneously store them.

/**
* timestamp and sequence mix in one Long
* highest 11 bit: not used
* middle 41 bit: timestamp
* lowest 12 bit: sequence
*/
private AtomicLong timestampAndSequence;

The highest 11 bits can be determined during initialization and remain unchanged thereafter:

/**
* business meaning: machine ID (0 ~ 1023)
* actual layout in memory:
* highest 1 bit: 0
* middle 10 bit: workerId
* lowest 53 bit: all 0
*/
private long workerId;

Producing an ID is then straightforward:

public long nextId() {
// Obtain the incremented timestamp and sequence number
long next = timestampAndSequence.incrementAndGet();
// Extract the lowest 53 bits
long timestampWithSequence = next & timestampAndSequenceMask;
// Perform a bitwise OR operation with the previously saved top 11 bits
return workerId | timestampWithSequence;
}

At this point, we can observe the following:

  1. The generator no longer has a burst performance limit of 4096/ms. If the sequence number space for a timestamp is exhausted, it will directly advance to the next timestamp, "borrowing" the sequence number space of the next timestamp (there is no need to worry about serious consequences of this "advance consumption," as the reasons will be explained below).

  2. The generator has a weak dependency on the operating system clock. During runtime, the generator is not affected by clock backtracking (whether it is manually backtracked or due to machine clock drift) because the generator only fetches the system clock once at startup, and thereafter, they no longer stay synchronized. The only possible scenario for duplicate IDs is a significant clock backtracking during restart (either deliberate human backtracking or modification of the operating system time zone, such as changing Beijing time to London time~ Machine clock drift is typically in the millisecond range and won't have such a large impact).

  3. Will continuous "advance consumption" cause the generator's timestamps to be significantly ahead of the system timestamps, resulting in ID duplicates upon restart? In theory, yes, but practically almost impossible. To achieve this effect, it would mean that the generator's QPS received must be consistently stable at over 400w/s~ To be honest, even TC can't handle such high traffic, so, the bottleneck is definitely not in the generator.

In addition, we also adjusted the strategy for generating node IDs. In the original version, when the user did not manually specify a node ID, it would take the low 10 bits of the local IPv4 address as the node ID. In practical production, it was found that there were occasional occurrences of duplicate node IDs (mostly users deploying with k8s). For example, the following IPs would result in duplicates:

  • 192.168.4.10
  • 192.168.8.10

Meaning, as long as the low 2 bits of the fourth byte and the third byte of the IP are the same, duplicates would occur. The new version's strategy is to prioritize taking the low 10 bits from the MAC address of the local network card. If the local machine does not have a valid network card configuration, it randomly picks one from [0, 1023] as the node ID. After this adjustment, it seems that new version users are no longer reporting the same issue (of course, it remains to be tested over time, but in any case, it won't be worse than the IP extraction strategy).

The above is a brief analysis of Seata's distributed UUID generator. If you find this generator useful, you can directly use it in your project. Its class declaration is public, and the full class name is: io.seata.common.util.IdWorker

Of course, if you have better ideas, you are also welcome to discuss them with the Seata community.

· 4 min read

Current Situation & Pain Points

For Seata, it records the before and after data of DML operations to perform possible rollback operations, and stores this data in a blob field in the database. For batch operations such as insert, update, delete, etc., the number of affected rows may be significant, concatenated into a large field inserted into the database, which may lead to the following issues:

  1. Exceeding the maximum write limit for a single database operation (such as the max_allowed_package parameter in MySQL).
  2. Significant network IO and database disk IO overhead due to a large amount of data.

Brainstorming

For the first issue, the max_allowed_package parameter limit can be increased based on the actual situation of the business to avoid the "query is too large" problem. For the second issue, increasing bandwidth and using high-performance SSD as the database storage medium can help.

The above solutions involve external or costly measures. Is there a framework-level solution to address the pain points mentioned above?

Considering the root cause of the pain points mentioned above, the problem lies in the generation of excessively large data fields. Therefore, if the corresponding data can be compressed at the business level before data transmission and storage, theoretically, it can solve the problems mentioned above.

Feasibility Analysis

Combining the brainstorming above, in practical development, when large batch operations are required, they are often scheduled during periods of relatively low user activity and low concurrency. At such times, CPU and memory resources can be relatively more utilized to quickly complete the corresponding operations. Therefore, by consuming CPU and memory resources to compress rollback data, the size of data transmission and storage can be reduced.

At this point, two things need to be demonstrated:

  1. After compression, it can reduce the pressure on network IO and database disk IO. This can be measured by the total time taken for data compression + storage in the database.
  2. After compression, the efficiency of compression compared to the original data size. This can be measured by the data size before and after compression.

Testing the time spent on compressing network usage:

image

Compression Ratio Test:

image

The test results clearly indicate that using gzip or zip compression can significantly reduce the pressure on the database and network transmission. At the same time, it can substantially decrease the size of the stored data.

Implementation

Implementation Approach

Compression

Partial Code

# Whether to enable undo_log compression, default is true
seata.client.undo.compress.enable=true

# Compressor type, default is zip, generally recommended to be zip
seata.client.undo.compress.type=zip

# Compression threshold for enabling compression, default is 64k
seata.client.undo.compress.threshold=64k

Determining Whether the Undo_Log Compression Feature is Enabled and if the Compression Threshold is Reached

protected boolean needCompress(byte[] undoLogContent) {
// 1. Check whether undo_log compression is enabled (1.4.2 Enabled by Default).
// 2. Check whether the compression threshold has been reached (64k by default).
// If both return requirements are met, the corresponding undoLogContent is compressed
return ROLLBACK_INFO_COMPRESS_ENABLE
&& undoLogContent.length > ROLLBACK_INFO_COMPRESS_THRESHOLD;
}

Initiating Compression for Undo_Log After Determining the Need

// If you need to compress, compress undo_log
if (needCompress(undoLogContent)) {
// Gets the compression type, default zip
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
//Get the corresponding compressor and compress it
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
}
// else does not need to compress and does not need to do anything

Save the compression type synchronously to the database for use when rolling back:

protected String buildContext(String serializer, CompressorType compressorType) {
Map<String, String> map = new HashMap<>();
map.put(UndoLogConstants.SERIALIZER_KEY, serializer);
// Save the compression type to the database
map.put(UndoLogConstants.COMPRESSOR_TYPE_KEY, compressorType.name());
return CollectionUtils.encodeMap(map);
}

Decompress the corresponding information when rolling back:

protected byte[] getRollbackInfo(ResultSet rs) throws SQLException  {
// Gets a byte array of rollback information saved to the database
byte[] rollbackInfo = rs.getBytes(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
// Gets the compression type
// getOrDefault uses the default value CompressorType.NONE to directly upgrade 1.4.2+ to compatible versions earlier than 1.4.2
String rollbackInfoContext = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = CollectionUtils.decodeMap(rollbackInfoContext);
CompressorType compressorType = CompressorType.getByName(context.getOrDefault(UndoLogConstants.COMPRESSOR_TYPE_KEY,
CompressorType.NONE.name()));
// Get the corresponding compressor and uncompress it
return CompressorFactory.getCompressor(compressorType.getCode())
.decompress(rollbackInfo);
}

peroration

By compressing undo_log, Seata can further improve its performance when processing large amounts of data at the framework level. At the same time, it also provides the corresponding switch and relatively reasonable default value, which is convenient for users to use out of the box, but also convenient for users to adjust according to actual needs, so that the corresponding function is more suitable for the actual use scenario.

· 13 min read
  1. seata version: 1.4.0, but all versions below 1.4 also have this problem.
  2. Problem description: In a global transaction, a pure query operation on a branch transaction suddenly gets stuck without any feedback (logs/exceptions) until the RPC timeout on the consumer side

! image.png

Problem Troubleshooting

  1. The whole process is in a global transaction, the consumer and provider can be seen as two branches of the global transaction, consumer --> provider.
  2. the consumer first executes some local logic, and then sends an RPC request to the provider to make sure that the consumer has sent the request and the provider has received it.
  3. the provider first prints a log, and then executes a pure query SQL, if the SQL is executed properly, it will print the log, but the current phenomenon is that only the log before the execution of the SQL is printed, and no SQL-related logs are printed. Find DBA to check the SQL log, and found that the SQL is not executed.
  4. Determined that the operation is only a pure query operation under the global transaction, before the operation, the overall process of the global transaction is completely normal.
  5. In fact, the phenomenon here has been very obvious, but at that time the idea did not change over, has been concerned about the query SQL, always thinking that even if the query timeout and other reasons should be thrown exceptions ah, should not be nothing. DBA can not find the query record, that is not to say that the SQL may not have been executed ah, but in the execution of the SQL before the problem, such as the agent?
  6. With the help of arthas's watch command, there is no output. The first log output means that the method must have been executed, and the delay in outputting the result means that the current request is stuck, why is it stuck?
  7. With arthas's thread command thread -b, thread -n, is to find out the current busiest thread. This works very well, there is a thread CPU usage 92%, and because of this thread caused the other 20 or so Dubbo threads BLOCKED. The stack information is as follows
  8. Analysing the stack information, we can clearly find the interface related to seata, which is probably related to seata's data source proxy; at the same time, we found that the thread with the highest CPU usage is stuck in the ConcurrentHashMap#computeIfAbsent method. Is there a bug in the ConcurrentHashMap#computeIfAbsent method?
  9. By now, we don't know the exact cause of the problem, but it should have something to do with seata's data source proxy, and we need to analyse both the business code and the seata code to find out why.

! image.png

Problem analysis

ConcurrentHashMap##computeIfAbsent

This method does have the potential for problems: if two keys have the same hascode, and the computeIfAbsent operation is performed in the corresponding mappingFunction, it will lead to a dead loop, refer to this article for specific analysis: https://juejin.cn/post/ 6844904191077384200

Seata data source autoproxy

Related content has been analysed before, let's focus on the following core classes:

  1. SeataDataSourceBeanPostProcessor
  2. SeataAutoDataSourceProxyAdvice
  3. DataSourceProxyHolder
SeataDataSourceBeanPostProcessor

The SeataDataSourceBeanPostProcessor is a BeanPostProcessor implementation class that creates a seataAutoDataSourceProxyDataSource for the data source configured by the business side in the postProcessAfterInitialization method (i.e., after the bean is initialised). proxy data source

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy. if (!excludes.contains.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}
//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy."); return ((SeataDataSourceProxy); } }
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean.
}
}
SeataAutoDataSourceProxyAdvice

SeataAutoDataSourceProxyAdvice is a MethodInterceptor, SeataAutoDataSourceProxyCreator in seata creates a dynamic proxy object for Bean of type DataSource, the proxy logic is the SeataAutoDataSourceProxyAdvice#invoke logic. That is: when executing the relevant methods of the DataSourceAOPProxyAdvice, it will go through its invoke method, and in the invoke method, it will find the corresponding SeataAutoDataSourceProxyAdvice according to the native data source, which will ultimately execute the SeataAutoDataSourceProxyAdvice logic.

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
......
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (!RootContext.requireGlobalLock() && dataSourceProxyMode ! = RootContext.getBranchType()) {
return invocation.proceed();
}
Method method = invocation.getMethod();
Object[] args = invocation.getArguments(); } Method m = BeanUtils.getMethod(); }
Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m ! = null) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode ); return m.invoke(dataSourceProxyHolder).
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
}
DataSourceProxyHolder

The process is clear to us, now there is a question, how to maintain the relationship between native data source and seata proxy data source? It is maintained by DataSourceProxyHolder, which is a singleton object that maintains the relationship between the two through a ConcurrentHashMap: native data source as key --> seata proxy data source as value.

public class DataSourceProxyHolder {
public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource = dataSource;
......
return CollectionUtils.computeIfAbsent(this.dataSourceProxyMap, originalDataSource, BranchType.
BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new);
}
}


// CollectionUtils.java
public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value ! = null) {
return value; }
}
return map.computeIfAbsent(key, mappingFunction);
}

Client data source configuration

  1. Two data sources are configured: DynamicDataSource, P6DataSource, P6DataSource, P6DataSource and P6DataSource.
  2. P6DataSource can be seen as a wrapper for DynamicDataSource. 3.
  3. Let's not worry about whether this configuration makes sense or not, now we just analyse the problem purely based on this data source configuration.
@Qualifier("dsMaster")
@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}

@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(dsMaster());
return p6DataSource;
}

Analyse the process

Assuming that by now everyone is aware of the problems that may arise from ConcurrentHashMap#computeIfAbsent, it is known that this problem has now arisen, and in combination with the stack information, we can see roughly where this problem has arisen.

1, ConcurrentHashMap#computeIfAbsent will produce this problem precondition is: two key hashcode is the same; mappingFunction corresponds to a put operation. Combined with our seata usage scenario, the mappingFunction corresponds to DataSourceProxy::new, suggesting that the put operation may be triggered in the DataSourceProxy's constructor method

! image.png

Execute AOP proxy data source related methods =>
Enter SeataAutoDataSourceProxyAdvice cutover logic =>
Execute DataSourceProxyHolder#putDataSource method =>
Execute DataSourceProxy::new =>
AOP proxy data source's getConnection method =>
The getConnection method of the native data source =>
Enter SeataAutoDataSourceProxyAdvice cutover logic =>
Execute DataSourceProxyHolder#putDataSource method =>
Execute DataSourceProxy::new =>
DuridDataSource's getConnection method

2, What is the AOP proxy data source and native data source stated in step 1? Look at the following diagram ! image.png

3, the above also said to produce this problem there is a condition two key hashcode the same, but I see that these two data source objects are not overriding the hashcode method, so by definition, these two objects must be different hashcode. After looking at the ConcurrentHashMap problem again, I feel that the statement two keys have the same hashcode is not correct, and two keys will generate hash conflict is more reasonable, which explains why two objects with different hashcodes will encounter this problem. To prove this, I've given an example below.

public class Test {
public static void main(String[] args) {
ConcurrentHashMap map = new ConcurrentHashMap(8); Num n1 = new Num(8)
Num n1 = new Num(3);
Num n2 = new Num(19); Num n3 = new Num(19); Num n3 = new Num(19)
Num n3 = new Num(20);

// map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)); // This line of code does not cause the program to loop.
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)); // this line of code will cause the program to die
}

static class Num{
private int i; public Num(int i){
public Num(int i){
this.i = i; } static class Num{ private int i; public Num(int i){ this.
}

public int hashCode() { this.i = i; this.i = i; }
public int hashCode() {
return i; } @Override public int hashCode() { this.i = i; }
}
}
}
  1. To make it easier to reproduce the problem, we rewrite the Num#hashCode method to ensure that the constructor input is the hashcode value.
  2. create a ConcurrentHashMap object, initialCapacity is 8, sizeCtl calculated value is 16, that is, the default length of the array in the map is 16
  3. create object n1, the input parameter is 3, that is, the hashcode is 3, the calculation of its corresponding array subscript 3
  4. create object n2, the input parameter is 19, that is, the hashcode is 19, calculate its corresponding array subscript is 3, at this time we can think of n1 and n2 hash conflict.
  5. create object n3 with input 20, i.e., hashcode 20, and its corresponding array subscript is 4.
  6. execute map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)), the programme exits normally: Because there is no hash conflict between n1 and n3, the programme terminates normally.
  7. Execute map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)), the programme exits normally: because n1 and n2 have a hash conflict, so it is in a dead loop.

4、During the initialisation of the object, hasn't SeataDataSourceBeanPostProcessor already initialised the corresponding data source proxy of the object? Why the corresponding data source proxy is still created in SeataAutoDataSourceProxyAdvice?

  1. First of all, the SeataDataSourceBeanPostProcessor execution period is later than the creation of the AOP proxy object, so when executing the SeataDataSourceBeanPostProcessor related methods, the SeataAutoDataSourceBeanPostProcessor method is executed. SeataAutoDataSourceProxyAdviceshould actually take effect when theSeataDataSourceBeanPostProcessor` related methods are executed.
  2. when adding elements to the map in SeataDataSourceBeanPostProcessor, the key is AOP proxy datasource; in SeataAutoDataSourceProxyAdvice, the key is native datasource, so the key is not the same as invocation.getThis(), so the key is not the same. `, so the key is not the same

! image.png

5, there is another problem, SeataAutoDataSourceProxyAdvic#invoke method does not filter toString, hashCode and other methods, the proxy object created by cglib will override these methods by default, if these methods of the proxy object are triggered when putting elements into the map. If these methods are triggered when putting elements into the map, the proxy object will re-enter the SeataAutoDataSourceProxyAdvic#invoke cut until the thread stack benefits.

Summary of the problem

  1. in two key will produce hash conflict, will trigger ConcurrentHashMap#computeIfAbsent BUG, the performance of this BUG is to make the current thread into a dead loop
  2. business feedback, the problem is occasional, occasional for two reasons: first, the application is a multi-node deployment, but only one node on the line triggered the BUG (hashcode conflict), so only when the request hits this node may trigger the BUG; second, because each restart object address (hashcode) are not sure, so not triggered after every app restart, but if once triggered, the node will always have this problem. Having a thread that keeps dying and blocking other threads that are trying to get the proxy data source from the map is business feedback that the request is stuck. If this happens to successive requests, the business side may restart the service, and then, ``because the hash conflict does not necessarily exist after the restart, the business may behave normally after the restart, but it is also possible that the bug will be triggered again on the next restart.
  3. when encountering this problem, from the perspective of the whole problem, it is indeed a deadlock, because the dead loop thread occupant lock has not been released, resulting in other threads operating on the map is BLOCKED!
  4. essentially because ConcurrentHashMap#computeIfAbsent method may trigger a bug, and seata's usage scenario just triggered the bug.
  5. The following demo is actually a complete simulation of what happens when something goes wrong online, as follows:
public class Test {
public static void main(String[] args) {

ConcurrentHashMap map = new ConcurrentHashMap(8);

Num n1 = new Num(3);
Num n2 = new Num(19);

for(int i = 0; i< 20; i++){
new Thread(()-> {
try {
Thread.sleep(1000); } catch (InterruptedException e.g.
} catch (InterruptedException e) {
e.printStackTrace();
}

map.computeIfAbsent(n1, k-> 200); }).start(); }
}).start();
}
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200));
}


static class Num{
private int i; public Num(int i){

public Num(int i){
this.i = i; }
}
public int hashCode() { this.i = i; this.i = i; }
public int hashCode() {
return i; } @Override public int hashCode() { this.i = i; }
}
}
}

! image.png

Solving the problem

The problem can be solved in two ways:

  1. business changes: P6DataSource and DynamicDataSource do not need to be proxied, directly proxy P6DataSource can be, DynamicDataSource does not need to be declared as a bean; or through the excludes property excludes P6DataSource, so that there is no duplicate proxy problem. There will be no problem of duplicate proxies
  2. Seata refinement: improve the logic related to data source proxy
Business changes
  1. Data source related configuration can be changed to the following:
@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(new TuYaDynamicDataSource(masterDsRoute));
logger.warn("dsMaster={}, hashcode={}",p6DataSource, p6DataSource.hashCode());
return p6DataSource.
}
  1. Or leave the current data source configuration unchanged and add the excludes property
@EnableAutoDataSourceProxy(excludes={"P6DataSource"})
Seata refinement
  1. ConcurrentHashMap#computeIfAbsent method is changed to double check as follows:
SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
synchronized (dataSourceProxyMap) {
dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
dataSourceProxyMap.put(originalDataSource, dsProxy);
}
}
}
return dsProxy;

I wanted to change the CollectionUtils#computeIfAbsent method directly, and the feedback from the group was that this might cause the data source to be created multiple times, which is indeed a problem: as follows

public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value ! = null) {
return value; }
}
value = mappingFunction.apply(key);
return map.computeIfAbsent(key, value);
}
  1. Add some filtering to the SeataAutoDataSourceProxyAdvice cutout logic
Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m ! = null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode ); return m.invoke(dataSourceProxyHolder).
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}

Legacy issues

In the corresponding methods of SeataDataSourceBeanPostProcessor and SeataAutoDataSourceProxyAdvice, the keys corresponding to initialising the seata datasource proxy into the map are fundamentally different, the The key inSeataDataSourceBeanPostProcessoris theAOP proxy data source; the key in SeataAutoDataSourceProxyAdviceis the native object, which results in the unnecessary creation of theseata data source proxy` object.

What is the best suggestion for this problem? Is it possible to specify an order for SeataDataSourceBeanPostProcessor to take effect before the AOP proxy object is created?

Link to original article

https://juejin.cn/post/6939041336964153352/

· 15 min read

"Just getting started with Seata and don't know enough about its modules?
Want to dive into the Seata source code, but haven't done so yet?
Want to find out what your application is doing "on the sly" during startup after integrating Seata?
Want to learn the design concepts and best practices of Seata as a great open source framework?
If any of the above apply to you, then today's article is for you!

Preface

In Seata's application-side (RM, TM) startup process, the first thing to do is to establish communication with the coordinator side (TC), which is a prerequisite for Seata to be able to complete the distributed transaction coordination, so Seata in the process of completing the initialisation of the application side and establishing a connection with the TC, it is How to find the cluster and address of the TC Transaction Coordinator? And how does it get various configuration information from the configuration module? That's what this article is going to explore.

Give a qualification

Seata as a middleware level of the underlying components, is very careful to introduce third-party frameworks for specific implementations, interested students can learn more about Seata's SPI mechanism, to see how Seata is through a large number of extension points (Extension), to invert the specific implementation of the dependent components out of the turn rely on abstract interfaces, and at the same time, Seata in order to better At the same time, Seata in order to better integrate into microservices, cloud native and other popular architectures derived from the ecosystem, but also based on the SPI mechanism on a number of mainstream microservice frameworks, registry, configuration centre and Java development frameworks, "the leader" - SpringBoot and so on. Do the active integration , in order to ensure that the microkernel architecture , loosely coupled , scalable at the same time , but also can be very good with all kinds of components "to play with", so that the environment using a variety of technology stacks can be more convenient to introduce Seata.

In this paper, in order to be close to everyone ** just introduced Seata trial ** scene , in the following introduction , select ** application side ** qualifications are as follows : the use of **File (file) as the configuration centre and registration centre **, and based on ** SpringBoot ** start.

With this qualification, let's dive into the Seata source code and find out what's going on.

RM/TM Initialisation Process with Alternating Multi-Module Collaboration

In Seata Client Startup Process Dissection (I) , we analysed the initialization of TM and RM on the application side of Seata, and how the application side creates a Netty Channel and sends a registration request to the TC Server to send a registration request. In addition to this, during RM initialisation, several other Seata modules (Registration Centre, Configuration Centre, Load Balancing) come into play and collaborate with each other to complete the process of connecting to the TC Server.

When executing the Client reconnect to TC Server method: NettyClientChannelManager.Channreconnect(), you first need to get the list of available TC Server addresses based on the current transaction grouping:

/**
* NettyClientChannelManager.reconnect()
* Reconnect to remote server of current transaction service group.
*
* @param transactionServiceGroup transaction service group
*/
void reconnect(String transactionServiceGroup) {
List<String> availList = null; }
try {
// Get the available TC Server addresses from the registry
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e); return; {// Get the available TC Server addresses from the registry.
return; }
}
// The following code is omitted
}

For a detailed introduction to the concept of transaction grouping, you can refer to the official document Introduction to Transaction Grouping. Here is a brief introduction.

  • Each Seata application-side RM, TM, has a transaction grouping name
  • Each TC on the Seata coordinator side has a cluster name and address. The application side goes through the following two steps when connecting to the coordinator side:
  • Through the name of the transaction grouping, the cluster name of the TC corresponding to this application side is obtained from the configuration
  • By using the cluster name, the address list of the TC cluster can be obtained from the registry. The above concepts, relationships and processes are shown in the following figure: ! Relationship between Seata transaction grouping and connection establishment

Getting TC Server cluster addresses from Registry

After understanding the main concepts and steps involved in connecting TCs from RM/TC, let's move on to explore the getAvailServerList method:

private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {
//① Use the registry factory to get a registry instance.
//② Call the registry's lookup method lookUp() to get a list of the addresses of the available Servers in the TC cluster based on the transaction group name.
List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);
if (CollectionUtils.isEmpty(availInetSocketAddressList)) {
return Collections.emptyList();
}

return availInetSocketAddressList.stream()
.map(NetUtil::toStringAddress)
.collect(Collectors.toList()); }
}

Which registry to use? The Seata meta-configuration file gives the answer

As mentioned above, Seata supports a variety of registry implementations, so Seata first needs to get the "type of registry" information from a place first.

Seata has designed a "configuration file" to store some basic information about the components used in its framework. I prefer to call this configuration file "meta-configuration file ", because the information it contains is actually the "configuration of the configuration", i.e., the "configuration of the configuration", i.e., the "configuration of the configuration". This is because the information it contains is actually the "configuration of the configuration", i.e., the concept of "meta", which can be understood by comparing the information in the database table with the information in the structure of the database table itself (table data and table metadata).

We can think of the information in the Registry and Configuration Centre as configuration information itself, and what is the configuration** of this **configuration information? This information, then, is contained in Seata's meta-configuration file. In fact, there are only two types of information contained in the 'meta-configuration file':

  • The first is the type of registry: registry.type, as well as some basic information about that type of registry, for example, when the registry type is a file, the meta configuration file stores the file's name information; when the registry type is Nacos, the meta configuration file stores Nacos addresses, namespaces, cluster names and other information.
  • Second, the type of configuration centre: config.type, as well as some basic information about the type of configuration centre, such as when the configuration centre is a file, the meta-configuration file stores information about the name of the file; when the type of registry is Consul, the meta-configuration file stores information about the address of the Consul

Seata's meta-configuration file supports Yaml, Properties and other formats , and can be integrated into the SpringBoot application.yaml file ( use seata-spring-boot-starter can be ) , easy to integrate with SpringBoot .

The default meta-configuration file that comes with Seata is registry.conf, and when we use a file as the registration and configuration centre, the content in registry.conf is set as follows:

registry {
# file , nacos , eureka, redis, zk, consul, etcd3, sofa
type = "file"
file {
name = "file.conf"
}
}

config {
# file, nacos, apollo, zk, consul, etcd3
type = "file"
file {
name = "file.conf"
}
}

In the following source code, we can find that the type of registry used by Seata is taken from ConfigurationFactory.CURRENT_FILE_INSTANCE, and this CURRENT_FILE_INSTANCE is what we call, an instance of the Seata **meta-configuration file **

// In getInstance(), call buildRegistryService to build the specific registry instance
public static RegistryService getInstance() {
if (instance == null) {
synchronized (RegistryFactory.class) {
if (instance == null) {
instance = buildRegistryService();
}
}
}
return instance; }
}

private static RegistryService buildRegistryService() {
RegistryType registryType.
// Get the registry type
String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
try {
registryType = RegistryType.getType(registryTypeName); } catch (Exception exx); exx = RegistryType.
} catch (Exception exx) {
throw new NotSupportYetException("not support registry type: " + registryTypeName); }
}
if (RegistryType.File == registryType) {
return FileRegistryServiceImpl.getInstance(); } else {
} else {
// Load the registry instance using the SPI method based on the registry type
return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
}
}

Let's look at the initialisation process of the meta-configuration file, which triggers the initialisation of the ConfigurationFactory class when the static field CURRENT_FILE_INSTANCE is fetched for the first time:

   // Static block of the ConfigurationFactory class
static {
load();
}

/**
* In the load() method, load Seata's meta configuration file
*/
private static void load() {
// The name of the meta configuration file, support through the system variable, environment variable expansion
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (seataConfigName == null) {
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}
if (seataConfigName == null) {
seataConfigName = REGISTRY_CONF_DEFAULT;
}
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (envValue == null) {
envValue = System.getenv(ENV_SYSTEM_KEY); }
}
// Create a file configuration instance that implements the Configuration interface based on the meta-configuration file name
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName,
false) : new FileConfiguration(seataConfigName + "-" + envValue, false);
Configuration extConfiguration = null;
// Determine if an extended configuration provider exists by loading it through SPI
//When the application side uses seata-spring-boot-starer, it will pass the SpringBootConfigurationProvider as the extended configuration provider, at this point, when getting the meta-configuration item, it will no longer get it from file.conf (the default), but from application. properties/application.yaml.
try {
// Replace the original Configuration instance with an instance of the extended configuration via the ExtConfigurationProvider's provide method
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null ? configuration.getClass().getSimpleName()
: extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
// Existence of an extended configuration returns an instance of the extended configuration, otherwise it returns an instance of the file configuration
CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
}

The call sequence diagram for the load() method is as follows: ! Seata metaconfiguration file loading process

In the above sequence diagram, you can focus on the following points:

  • Seata meta configuration file Name support extension
  • Seata meta-configuration file suffixes** support 3 suffixes**, yaml/properties/conf, which will be attempted to match in turn when the meta-configuration file instance is created
  • Seata ** configuration capabilities related to the top-level interface for the Configuration **, a variety of configuration centres are required to implement this interface, Seata's meta-configuration file is the use of FileConfiguration (file type configuration centre) to implement this interface
/**
* Seata Configuration Capability Interface
* package: io.seata.config
*/

public interface Configuration {
/**
* Gets short.
*
* @param dataId the data id
* @param defaultValue the default value
* @param timeoutMills the timeout mills
* @return the short
*/short getShort(String dataId)
short getShort(String dataId, int defaultValue, long timeoutMills);; short getShort(String dataId, int defaultValue, long timeoutMills)

// The following content is omitted, the main ability to add, delete and retrieve configuration
}
  • Seata provides an extension point of type ExtConfigurationProvider, opening up the ability to extend the specific implementation of the configuration, which has a provide() method to receive the original Configuration, return a completely new Configuration, the form of the methods of this interface determines that the general The form of this interface method determines that, in general, static proxies, dynamic proxies, decorators and other design patterns can be used to implement this method to achieve the original Configuration enhancement.
/**
* Seata extends the Configuration Provider interface
* package: io.seata.configuration
*/
public interface ExtConfigurationProvider {
/**
* provide a AbstractConfiguration implementation instance
* @param originalConfiguration
* @return configuration
*/
Configuration provide(Configuration originalConfiguration); }
}
  • When the application side is started based on seata-seata-spring-boot-starter, it will ** use "SpringBootConfigurationProvider" as the extended configuration provider ** and in its provide method, it uses dynamic bytecode generation (CGLIB) to create a dynamic proxy class for the "FileConfiguration" instance. FileConfiguration' instance using dynamic bytecode generation (CGLIB) to create a dynamic proxy class that intercepts all methods starting with "get" to get meta-configuration items from application.properties/application.yaml.

SpringBootConfigurationProvider class, this article only explains the implementation of the idea , no longer unfolding the analysis of the source code, which is only an implementation of the ExtConfigurationProvider interface, from the point of view of the Configuration can be extended, can be replaced , Seata is precisely through the ExtConfigurationProvider such an extension point for the implementation of a variety of configurations provides a broad stage , allowing a variety of configuration implementation and access options.

After going through the above loading process, if we didn't extend the configuration provider, we would get the registry type of file from the Seata meta-configuration file, and at the same time create a file registry instance: FileRegistryServiceImpl

Getting the TC Server address from the registry centre

After getting the registry instance, you need to execute the lookup() method (RegistryFactory.getInstance(). lookup(transactionServiceGroup)), FileRegistryServiceImpl.lookup() is implemented as follows:

/**
* Get a list of available addresses for TC Server based on the transaction group name
* package: io.seata.discovery.registry
* class: FileRegistryServiceImpl
*/
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
// Get TC Server cluster name
String clusterName = getServiceGroup(key);
if (clusterName == null) {
if (clusterName == null) { return null; }
}
//Get all available Server addresses in the TC cluster from the Configuration Centre
String endpointStr = CONFIG.getConfig(
PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + clusterName + POSTFIX_GROUPLIST);
if (StringUtils.isNullOrEmpty(endpointStr)) {
throw new IllegalArgumentException(clusterName + POSTFIX_GROUPLIST + " is required");
}
// Encapsulate the address as InetSocketAddress and return it
String[] endpoints = endpointStr.split(ENDPOINT_SPLIT_CHAR);
List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();
for (String endpoint : endpoints) {
String[] ipAndPort = endpoint.split(IP_PORT_SPLIT_CHAR);
if (ipAndPort.length ! = 2) {
throw new IllegalArgumentException("endpoint format should be like ip:port");;
}
inetSocketAddresses.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1]))); }
}
return inetSocketAddresses;
}

/**
* default method in the registry interface
* package: io.seata.discovery.registry
* class: RegistryService
*/
default String getServiceGroup(String key) {
key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
// In the configuration cache, add a transaction group name change listening event.
if (!SERVICE_GROUP_NAME.contains(key)) {
ConfigurationCache.addConfigListener(key);
SERVICE_GROUP_NAME.add(key);
}
// Get the TC cluster name corresponding to the transaction grouping from the Configuration Centre
return ConfigurationFactory.getInstance().getConfig(key);
}

As you can see, the code logic matches the flow in Figure Seata Transaction Grouping in Relation to Establishing Connections in Section I. At this point, the registry will need assistance from the Configuration Centre to get the cluster name corresponding to the transaction grouping and to find the available service addresses in the cluster.

Get TC cluster name from Configuration Centre

Configuration Centre initialisation

The initialisation of the configuration centre (in ConfigurationFactory.buildConfiguration()) is similar to the initialisation process of the registration centre, which is to get the type of the configuration centre and other information from the meta-configuration file first, and then initialise a specific instance of the configuration centre, which is no longer repeated here, with the foundation of the previous analysis.

Getting the value of a configuration item

The two methods in the above snippet, FileRegistryServiceImpl.lookup() and RegistryService.getServiceGroup(), both get the values of the configuration items from the configuration centre:

  • lookup() need to be implemented by the specific registry, the use of file as a registry, in fact, is a direct connection to the TC Server, the special point is that **TC Server's address is written to death in the configuration ** (normal should be stored in the registry), so FileRegistryServiceImpl.lookup() method, is the address information of the Server in the TC cluster obtained through the configuration centre.
  • getServiceGroup() is the default method in the RegistryServer interface, which is the public implementation of all registries. Any kind of registry in Seata needs to be configured to get the TC cluster name based on the name of the transaction group.

Load Balancing

After the above link configuration centre, registration centre collaboration, now we have obtained the current application side of all the available TC Server address, then before sending the real request, you also need to pass a specific load balancing policy, select a TC Server address, this part of the source code is relatively simple, will not take you to analyse.

About the load balancing source code, you can read AbstractNettyRemotingClient.doSelect(), because the code analysed in this article is the reconnection method of RMClient/TMClient, in this method, all the obtained Server addresses will be connected (reconnected) sequentially by traversing, so here There is no need to do load balancing.

The above is the Seata application side in the startup process, the registration centre and configuration centre of the two key modules between the collaboration and workflow, welcome to discuss and learn together!

Postscript: This article and its predecessor Seata client startup process dissection (a), is the first batch of technical blogs written by me, will be on the hands of Seata, I personally believe that Seata in the more complex, need to study and figure out. When I started Seata, I have analysed and documented some of the more complex parts of Seata's source code that I think need to be researched and figured out. I welcome any suggestions for improvement from readers, thank you!

· 9 min read

"Just started with Seata and don't have a deep understanding of its various modules?
Want to delve into Seata's source code but haven't taken the plunge yet?
Curious about what your application does 'secretly' during startup after integrating Seata?
Want to learn about the design principles and best practices embodied in Seata as an excellent open-source framework?
If you have any of the above thoughts, then this article is tailor-made for you~

Introduction

Those who have seen the first picture in the official README should know that Seata coordinates distributed transactions through its coordinator side TC, which communicates and interacts with the application side TM and RM to ensure data consistency among multiple transaction participants in distributed transactions. So, how does Seata establish connections and communicate between the coordinator side and the application side?

That's right, the answer is Netty. Netty, as a high-performance RPC communication framework, ensures efficient communication between TC and RM. This article will not go into detail about Netty; instead, our focus today is on how the **application side, during startup, uses a series of Seata's key modules (such as RPC, Config/Registry Center, etc.) to establish communication with the coordinator side.

Starting with GlobalTransactionScanner

We know that Seata provides several development annotations, such as @GlobalTransactional for enabling distributed transactions, @TwoPhraseBusinessAction for declaring TCC two-phase services, and so on, which are based on the Spring AOP mechanism to enhance the annotations by assigning the corresponding bean methods to interceptors. Interceptors are enhanced to complete the corresponding processing logic. GlobalTransactionScanner, a Spring bean, carries the responsibility of assigning interceptors to annotations. From the name of its scanner, it is not difficult to deduce that it is designed for the startup of the Spring application, and the global transaction (GlobalTransactionScanner). GlobalTransactionScanner) during Spring application startup.

In addition, the process of initialising the application-side RPC clients (TMClient, RMClient) and establishing a connection with the TC is also initiated in GlobalTransactionScanner#afterPropertiesSet():

/**
* package: io.seata.spring.annotation
* class: GlobalTransactionScanner
*/
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return.
}
// Perform TM, RM initialisation after the bean properties are initialised
initClient();

}

Initialisation and connection process of RM & TM

Here, we take RMClient.init() as an example, and the initialisation process of TMClient is the same.

Design of class relationship

Looking at the source code of RMClient#init(), we find that RMClient first constructs an RmNettyRemotingClient, and then executes its initialisation init() method. The constructor and initialisation methods of RmNettyRemotingClient call the constructor and initialisation methods of the parent class layer by layer

    /**
* RMClient's initialisation logic
* package: io.seata.rm
* class: RMClient
*/
public static void init(String applicationId, String transactionServiceGroup) {
//① Start with the RmNettyRemotingClient class and call the constructor of the parent class in turn

rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
//② Then, starting with the RmNettyRemotingClient class, call init() of the parent class in turn
rmNettyRemotingClient.init();
}

The relationship between the above RMClient family classes and the process of calling the constructor and init() initialisation method is illustrated in the following diagram: Relationship between the simplified version of the RMClient.init process and the main classes

So why did you design RMClient with such a more complex inheritance relationship? In fact, it is in order to divide the responsibilities and boundaries of each layer clearly, so that each layer can focus on specific logic processing, to achieve better scalability, this part of the detailed design ideas, you can refer to the Seata RPC module refactoring PR of the operator by Hui brother's article! The Road to Seata-RPC Refactoring)

The complete flow of initialisation

The main logic in the constructor and initialisation methods of each class can be sorted out with the help of the following ideographic sequence diagram, which can also be skipped first, and then looked back to see when these classes debut and how they interact with each other after we have analysed a few key classes below. Initialisation flow of RMClient

Grabbing the core - Channel creation

First of all, we need to know that the communication between the application side and the coordinator side is done with the help of Netty's Channel, so the key to the communication process lies in the creation of the Channel**, which is created and managed in Seata by means of pooling (with the help of the object pool in common-pool).

Here we need to briefly introduce the simple concept of object pool and its implementation in Seata: The main classes in common-pool are involved:

  • GenericKeydObjectPool<K, V>: A KV generic object pool that provides access to all objects, while object creation is done by its internal factory class.
  • KeyedPoolableObjectFactory<K, V>: KV generic object factory responsible for the creation of pooled objects, held by the object pool

The main classes involved are related to the implementation of object pooling in Seata:

  • First, the pooled objects are Channel, which corresponds to the generic V in common-pool.

  • NettyPoolKey: Key for Channel, corresponding to generic K in common-pool, NettyPoolKey contains two main information:

    • address:Address of TC Server when the Channel is created.
    • message:The RPC message sent to TC Server when the Channel is created.
  • GenericKeydObjectPool<NettyPoolKey,Channel>: Pool of Channel objects.

  • NettyPoolableFactory: the factory class for creating Channel. Having recognised the main classes related to object pooling above, let's take a look at some of the main classes in Seata that are involved in channel management and are related to RPC:

  • NettyClientChannelManager:

    • Holds the pool of Channel objects.
    • Interacts with the channel object pool to manage application-side channels (acquisition, release, destruction, caching, etc.).
  • RpcClientBootstrap: core bootstrap class for RPC clients, holds the Netty framework bootstrap object with start/stop capability; has the ability to get a new Channel based on the connection address for the Channel factory class to call.

  • AbstractNettyRemotingClient:

    • Initialises and holds the RpcClientBootstrap.
    • Application-side Netty client top-level abstraction, abstracts the ability of application-side RM/TM to obtain the NettyPoolKey corresponding to their respective Channel, for NettyClientChannelManager to call.
    • Initialising the NettyPoolableFactory

Understanding the above concepts, we can simplify the process of creating a channel in Seata as follows: Process of creating a Channel object

When you see this, you can go back and take a look at the above Initialisation Sequence Diagram for RMClient, and you should have a clearer understanding of the responsibilities and relationships of the various categories in the diagram, as well as the intent of the entire initialisation process.

Timing and flow of establishing a connection

So, when does RMClient establish a connection with Server?

During the initialisation of RMClient, you will find that many init() methods set up some timed tasks, and the mechanism of reconnecting (connecting) the Seata application side to the coordinator is achieved through timed tasks:

/**
* package: io.seata.core.rpcn.netty
* class: AbstractNettyRemotingClient
*/
public void init() {
// Set the timer to reconnect to the TC Server at regular intervals.
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}

Let's see how the classes we explored above work together to connect RMClient to TC by tracing the execution of a reconnect (the first connection may actually occur during registerResource, but the process is the same) RMClient and TC Server connection process

In this diagram, you can focus on these points:

  • NettyClientChannelManager executes the callback function (getPoolKeyFunction()) to get the NettyPoolKey in the concrete AbstractNettyRemotingClient: the different Clients (RMClient and TMClient) on the application side, when they create the NettyPoolKey, they create the NettyChannelManager. TMClient) on the application side, the Key used when creating the Channel is different, so that they send different registration messages when reconnecting to the TC Server, which is also determined by the different roles they play in Seata:
    • TMClient: plays the role of transaction manager, when creating a Channel, it only sends a TM registration request (RegisterTMRequest) to the TC.
    • RMClient: plays the role of resource manager, needs to manage all transaction resources on the application side, therefore, when creating a Channel, it needs to get all transaction resource information on the application side before sending RM registration request (RegisterRMRequest), and register it to TC Server.
  • In the Channel object factory's NettyPoolableFactory's makeObject (create Channel) method, two tasks are completed using the two pieces of information in NettyPoolKey:
    • A new Channel is created using the address from NettyPoolKey.
    • A registration request is sent to the TC Server using the message from NettyPoolKey and the new Channel. This is the Client's initial connection (first execution) or reconnection (subsequent executions driven by scheduled tasks) request to the TC Server.

The above content covers the entire process of the Seata application's initialization and its connection establishment with the TC Server coordinator side.

For deeper details, it is recommended to thoroughly read the source code based on the outline and key points mentioned in this article. This will undoubtedly lead to a deeper understanding and new insights!

Postscript: Considering the length and to maintain a suitable amount of information for a source code analysis article, the collaboration of configuration and registration modules mentioned in the introduction was not expanded upon in this article.
In the next source code analysis, I will focus on the configuration center and registration center, analyzing how the Seata application side discovers the TC Server through service discovery and how it obtains various information from the configuration module before establishing connections between RMClient/TM Client and the TC Server.

· 6 min read

This article will introduce how to integrate Seata (1.4.0) with Spring Cloud and Feign using the TCC mode. In practice, Seata's AT mode can meet about 80% of our distributed transaction needs. However, when dealing with operations on databases and middleware (such as Redis) that do not support transactions, or when using databases that are not currently supported by the AT mode (currently AT supports MySQL, Oracle, and PostgreSQL), cross-company service invocations, cross-language application invocations, or the need for manual control of the entire two-phase commit process, we need to combine the TCC mode. Moreover, the TCC mode also supports mixed usage with the AT mode.

一、The concept of TCC mode

In Seata, a distributed global transaction follows a two-phase commit model with a Try-[Confirm/Cancel] pattern. Both the AT (Automatic Transaction) mode and the TCC (Try-Confirm-Cancel) mode in Seata are implementations of the two-phase commit. The main differences between them are as follows:

AT mode is based on relational databases that support local ACID transactions (currently supporting MySQL, Oracle, and PostgreSQL):

The first phase, prepare: In the local transaction, it combines the submission of business data updates and the recording of corresponding rollback logs. The second phase, commit: It immediately completes successfully and automatically asynchronously cleans up the rollback logs. The second phase, rollback: It automatically generates compensation operations through the rollback logs to complete data rollback.

On the other hand, TCC mode does not rely on transaction support from underlying data resources:

The first phase, prepare: It calls a custom-defined prepare logic. The second phase, commit: It calls a custom-defined commit logic. The second phase, rollback: It calls a custom-defined rollback logic.

TCC mode refers to the ability to include custom-defined branch transactions in the management of global transactions.

In summary, Seata's TCC mode is a manual implementation of the AT mode that allows you to define the processing logic for the two phases without relying on the undo_log used in the AT mode.

二、prepare

三、Building TM and TCC-RM

This chapter focuses on the implementation of TCC using Spring Cloud + Feign. For the project setup, please refer to the source code (this project provides demos for both AT mode and TCC mode).

DEMO

3.1 build seata server

build server doc

3.2 build TM

service-tm

3.3 build RM-TCC

3.3.1 Defining TCC Interface

Since we are using Spring Cloud + Feign, which relies on HTTP for communication, we can use @LocalTCC here. It is important to note that @LocalTCC must be annotated on the interface. This interface can be a regular business interface as long as it implements the corresponding methods for the two-phase commit in TCC. The TCC-related annotations are as follows:

  • @LocalTCC: Used for TCC in the Spring Cloud + Feign mode.
  • @TwoPhaseBusinessAction: Annotates the try method. The name attribute represents the bean name of the current TCC method, which can be the method name (globally unique). The commitMethod attribute points to the commit method, and the rollbackMethod attribute points to the transaction rollback method. After specifying these three methods, Seata will automatically invoke the commit or rollback method based on the success or failure of the global transaction.
  • @BusinessActionContextParameter: Annotates the parameters to be passed to the second phase (commitMethod/rollbackMethod) methods.
  • BusinessActionContext: Represents the TCC transaction context.

Here is an example:

/**
* Here we define the TCC interface.
* It must be defined on the interface.
* We are using Spring Cloud for remote invocation.
* Therefore, we can use LocalTCC here.
*
*/
@LocalTCC
public interface TccService {

/**
* Define the two-phase commit.
* name = The bean name of this TCC, globally unique.
* commitMethod = The method for the second phase confirmation.
* rollbackMethod = The method for the second phase cancellation.
* Use the BusinessActionContextParameter annotation to pass parameters to the second phase.
*
* @param params
* @return String
*/
@TwoPhaseBusinessAction(name = "insert", commitMethod = "commitTcc", rollbackMethod = "cancel")
String insert(
@BusinessActionContextParameter(paramName = "params") Map<String, String> params
);

/**
* The confirmation method can be named differently, but it must be consistent with the commitMethod.
* The context can be used to pass the parameters from the try method.
* @param context
* @return boolean
*/
boolean commitTcc(BusinessActionContext context);

/**
* two phase cancel
*
* @param context
* @return boolean
*/
boolean cancel(BusinessActionContext context);
}

3.3.2 Business Implementation of TCC Interface

To keep the code concise, we will combine the routing layer with the business layer for explanation here. However, in actual projects, this may not be the case.

  • Using @Transactional in the try method allows for direct rollback of operations in relational databases through Spring transactions. The rollback of operations in non-relational databases or other middleware can be handled in the rollbackMethod.
  • By using context.getActionContext("params"), you can retrieve the parameters defined in the try phase and perform business rollback operations on these parameters in the second phase.
  • Note 1: It is not advisable to catch exceptions here (similarly, handle exceptions with aspects), as doing so would cause TCC to recognize the operation as successful, and the second phase would directly execute the commitMethod.
  • Note 2: In TCC mode, it is the responsibility of the developer to ensure idempotence and transaction suspension prevention.
@Slf4j
@RestController
public class TccServiceImpl implements TccService {

@Autowired
TccDAO tccDAO;

/**
* tcc t(try)method
* Choose the actual business execution logic or resource reservation logic based on the actual business scenario.
*
* @param params - name
* @return String
*/
@Override
@PostMapping("/tcc-insert")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public String insert(@RequestBody Map<String, String> params) {
log.info("xid = " + RootContext.getXID());
//todo Perform actual operations or operations on MQ, Redis, etc.
tccDAO.insert(params);
//Remove the following annotations to throw an exception
//throw new RuntimeException("服务tcc测试回滚");
return "success";
}

/**
* TCC service confirm method
* If resource reservation is used in the first phase, the reserved resources should be committed during the second phase confirmation
* @param context
* @return boolean
*/
@Override
public boolean commitTcc(BusinessActionContext context) {
log.info("xid = " + context.getXid() + "提交成功");
//todo If resource reservation is used in the first phase, resources should be committed here.
return true;
}

/**
* tcc cancel method
*
* @param context
* @return boolean
*/
@Override
public boolean cancel(BusinessActionContext context) {
//todo Here, write the rollback operations for middleware or non-relational databases.
System.out.println("please manually rollback this data:" + context.getActionContext("params"));
return true;
}
}

3.3.3 Starting a Global Transaction in TM and Invoking RM-TCC Interface

Please refer to the project source code in section 3.2.

With this, the integration of TCC mode with Spring Cloud is complete.

· 10 min read

When it comes to Seata configuration management, you may think of Seata in the adaptation of the various configuration centre, in fact, today to say that this is not the case, although it will also be a simple analysis of Seata and the process of adapting to the configuration centre, but the main still explain the core implementation of Seata configuration management

Before talking about the configuration centre, first briefly introduce the startup process of the Server side, because this piece involves the initialisation of the configuration management, the core process is as follows:

  1. The entry point of the process is in the Server#main method.
  2. several forms of obtaining the port: from the container; from the command line; the default port
  3. Parse the command line parameters: host, port, storeMode and other parameters, this process may be involved in the initialisation of the configuration management
  4. Metric-related: irrelevant, skip
  5. NettyServer initialisation
  6. core controller initialisation: the core of the Server side, but also includes a few timed tasks
  7. NettyServer startup

The code is as follows, with non-core code removed

public static void main(String[] args) throws IOException {
// Get the port in several forms: from the container; from the command line; the default port, use to logback.xml
int port = PortHelper.getPort(args);
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

// Parsing startup parameters, container and non-container.
ParameterParser parameterParser = new ParameterParser(args); // Parsing startup parameters, both container and non-container.

// Metric-related
MetricsManager.get().init(); // MetricsManager.get().init(); // NettyServer initialisation.

// NettyServer initialisation
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); // NettyServer initialisation.

// Core controller initialisation
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer); // Core controller initialisation.
coordinator.init(); // Initialise the core controller.

// NettyServer startup
nettyRemotingServer.init(); // NettyServer starts.
}
``

Why does ``step 3`` involve the initialisation of the configuration management? The core code is as follows:
```java
if (StringUtils.isBlank(storeMode)) {
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
SERVER_DEFAULT_STORE_MODE);
}

If storeMode is not specifically specified in the startup parameters, the configuration will be fetched through the ConfigurationFactory related API, which involves two parts in the ConfigurationFactory.getInstance() line of code: ConfigurationFactory static code initialisation and Configuration initialisation. Let's focus on analysing this part

Configuration management initialisation

ConfigurationFactory initialisation

We know that there are two key configuration files in Seata: registry.conf, which is the core configuration file and must be there, and file.conf, which is only needed if the configuration centre is File. The ConfigurationFactory static code block actually mainly loads the registry.conf file, roughly as follows:

1, in the case of no manual configuration, the default read registry.conf file, encapsulated into a FileConfiguration object, the core code is as follows:

Configuration configuration = new FileConfiguration(seataConfigName,false);
FileConfigFactory.load("registry.conf", "registry");
FileConfig fileConfig = EnhancedServiceLoader.load(FileConfig.class, "CONF", argsType, args);

2、Configuration enhancement: similar to the proxy model, to get the configuration, do some other processing inside the proxy object, the following details

Configuration extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
  1. Assign the proxy object in step 2 to the CURRENT_FILE_INSTANCE reference, which is used directly in many places as a static reference to CURRENT_FILE_INSTANCE.
   CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;

It's easy to assume that CURRENT_FILE_INSTANCE corresponds to the contents of registry.conf. I don't think registry.conf is a good name for the file, it's too ambiguous, would it be better to call it bootstrap.conf?

Configuration initialisation

Configuration actually corresponds to the configuration centre, Seata currently supports a lot of configuration centres, almost all the mainstream configuration centres are supported, such as: apollo, consul, etcd, nacos, zk, springCloud, local files. When using local files as a configuration centre, it involves the loading of file.conf, which of course is the default name and can be configured by yourself. By now, you basically know the relationship between registry.conf and file.conf.

Configuration as a single instance in ConfigurationFactory, so the initialisation logic of Configuration is also in ConfigurationFactory, the approximate process is as follows: 1, first read the config.type attribute from the registry.conf file, which is file by default.

configTypeName = CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR+ ConfigurationKeys.FILE_ROOT_TYPE);
  1. Load the configuration centre based on the value of the config.type attribute, e.g., the default is: file, then first read the registry.conf file from config.file.name to read the corresponding file name of the local file configuration centre, and then create a FileConfiguration object based on the name of the file. This loads the configuration in file.conf into memory. Similarly, if the configuration is for another Configuration Centre, the other Configuration Centre will be initialised via SPI. Another thing to note is that at this stage, if the configuration centre is a local file, a proxy object is created for it; if it is not a local file, the corresponding configuration centre is loaded via SPI
if (ConfigType.File == configType) {
String pathDataId = String.join("config.file.name");
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId); configuration = new FileConfiguration(name);
try {
// Configure the Enhanced Proxy
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration); } catch (Exception e) { { new FileConfiguration(name); configuration = new FileConfiguration(name); }
} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
} else {
configuration = EnhancedServiceLoader
.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
}

3, based on the Configuration object created in step 2, create another layer of proxy for it, this proxy object has two roles: one is a local cache, you do not need to get the configuration from the configuration every time you get the configuration; the other is a listener, when the configuration changes will update the cache it maintains. The following:

if (null ! = extConfiguration) {
configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
} else {
configurationCache = ConfigurationCache.getInstance().proxy(configuration);
}

At this point, the initialisation of the configuration management is complete. Seata initialises the configuration centre by first loading the registry.conf file to get the corresponding configuration centre information, the registry, and then initialising the configuration centre based on the corresponding information obtained. In the case of using a local file as the configuration centre, the default is to load the file.conf file. Then create a proxy object for the corresponding configuration centre to support local caching and configuration listening.

The finishing process is still relatively simple, so I'm going to throw out a few questions here:

  1. what is configuration enhancement and how is it done in Seata?
  2. if using a local file as a configuration centre, the configuration has to be defined in the file.conf file. If it is a Spring application, can the corresponding configuration items be defined directly in application.yaml?
  3. In step 2 mentioned above, why is it necessary to create the corresponding configuration enhancement proxy object for Configuration first in the case of using a local file configuration centre, but not for other configuration centres?

These 3 questions are all closely related and are all related to Seata's configuration additions. Here are the details

Configuration Management Enhancements

Configuration enhancement, in simple terms, is to create a proxy object for which proxy logic is executed when executing the target method of the target unique object, and from the perspective of the configuration centre, it is to execute the proxy logic when obtaining the corresponding configuration through the configuration centre.

  1. get the configuration through ConfigurationFactory.CURRENT_FILE_INSTANCE.getgetConfig(key).
  2. Load the registry.conf file to create the FileConfiguration object configuration.
  3. Create a proxy object configurationProxy for configuration based on SpringBootConfigurationProvider.
  4. Get the configuration centre connection information file zk nacos etc from configurationProxy.
  5. Create a configuration centre configuration object configuration2 based on the connection information.
  6. Create a proxy object configurationProxy2 for configuration2 based on SpringBootConfigurationProvider.
  7. Execute the proxy logic for configurationProxy2.
  8. Find the corresponding SpringBean based on the key.
  9. Execute the getXxx method of the SpringBean.

!

Configuration Enhancement Implementation

Configuration enhancement was also briefly mentioned above and the related code is as follows:

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
  1. First get an ExtConfigurationProvider object through the SPI machine, there is only one implementation in Seata by default: SpringBootConfigurationProvider.
  2. Through the ExtConfigurationProvider#provider method to create a proxy object for the Configuration.

The core code is as follows.

public Configuration provide(Configuration originalConfiguration) {
return (Configuration) Enhancer.create(originalConfiguration.getClass(), new MethodInterceptor() {
@Override
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy)
throws Throwable {
if (method.getName().startsWith("get") && args.length > 0) {
Object result = null;
String rawDataId = (String) args[0];
if (args.length == 1) {
result = get(convertDataId(rawDataId));
} else if (args.length == 2) {
result = get(convertDataId(rawDataId), args[1]); } else if (args.length == 2) { result = get(convertDataId(rawDataId))
} else if (args.length == 3) {
result = get(convertDataId(rawDataId), args[1], (Long) args[2]); } else if (Long) args.length == 3); }
}
if (result ! = null) {
//If the return type is String,need to convert the object to string
if (method.getReturnType().equals(String.class)) {
return String.valueOf(result); }
}
return result; }
}
}

return method.invoke(originalConfiguration, args); }
}
}); }
}

private Object get(String dataId) throws IllegalAccessException, InstantiationException {
String propertyPrefix = getPropertyPrefix(dataId); }; private Object get(String dataId); }; }; }
String propertySuffix = getPropertySuffix(dataId);
ApplicationContext applicationContext = (ApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT);
Class<? > propertyClass = PROPERTY_BEAN_MAP.get(propertyPrefix);
Object valueObject = null;
if (propertyClass ! = null) {
try {
Object propertyBean = applicationContext.getBean(propertyClass);
valueObject = getFieldValue(propertyBean, propertySuffix, dataId);
} catch (NoSuchBeanDefinitionException ignore) {

}
} else {
throw new ShouldNeverHappenException("PropertyClass for prefix: [" + propertyPrefix + "] should not be null."); }
}
if (valueObject == null) {
valueObject = getFieldValue(propertyClass.newInstance(), propertySuffix, dataId);
}

return valueObject; }
}

1, if the method starts with get and the number of arguments is 1/2/3, then perform the other logic of getting the configuration, otherwise perform the logic of the native Configuration object 2, we do not need to bother why this rule, this is a Seata agreement 3, Other logic to get the configuration, that is, through the Spring way to get the corresponding configuration value

Here has been clear about the principle of configuration enhancement, at the same time, can also be guessed that the only ExtConfigurationProvider implementation of SpringBootConfigurationProvider, must be related to the Spring

Configuration Enhancement and Spring

Before we introduce this piece, let's briefly describe how Seata is used:

  1. Non-Starter way: introduce dependency seata-all, then manually configure a few core beans.
  2. Starter way: Introduce the dependency seata-spring-boot-starter, fully automated quasi-configuration, do not need to automatically inject the core bean

The SpringBootConfigurationProvider is in the seata-spring-boot-starter module, i.e. when we use Seata by introducing seata-all, the configuration enhancement doesn't really do anything, because at this point there is no ExtConfigurationProvider to be found. ExtConfigurationProvider` implementation class can't be found at this point, so naturally it won't be enhanced.

So how does seata-spring-boot-starter tie all this together?

  1. First, in the resources/META-INF/services directory of the seata-spring-boot-starter module, there exists a spring.factors file with the following contents
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\

# Ignore for now
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration

2, in the SeataAutoConfiguration file, the following beans will be created: GlobalTransactionScanner , SeataDataSourceBeanPostProcessor, SeataAutoDataSourceProxyCreator SpringApplicationContextProvider. The first three are not related to what we are going to talk about in this article, mainly focus on SpringApplicationContextProvider, the core code is very simple, is to save the ApplicationContext:

public class SpringApplicationContextProvider implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT, applicationContext);
}
}
  1. Then, in the SeataAutoConfiguration file, some xxxProperties.Class and the corresponding Key prefixes are also cached into PROPERTY_BEAN_MAP. The xxxProperties are simply understood as the various configuration items in application.yaml:
   static {
PROPERTY_BEAN_MAP.put(SEATA_PREFIX, SeataProperties.class);
PROPERTY_BEAN_MAP.put(CLIENT_RM_PREFIX, RmProperties.class);
PROPERTY_BEAN_MAP.put(SHUTDOWN_PREFIX, ShutdownProperties.class); ...
... Omit ...
}

At this point, the whole process is actually clear, when there is SpringBootConfigurationProvider configuration enhancement, we get a configuration item as follows:

  1. first according to the p Configuration Key to get the corresponding xxxProperties object
  2. get the corresponding xxxProperties SpringBean through the ApplicationContext in the ObjectHolder.
  3. Get the value of the corresponding configuration based on the xxxProperties SpringBean.
  4. At this point, we have successfully obtained the values in application.yaml through configuration enhancement.