Skip to main content

· 4 min read

Current Situation & Pain Points

For Seata, it records the before and after data of DML operations to perform possible rollback operations, and stores this data in a blob field in the database. For batch operations such as insert, update, delete, etc., the number of affected rows may be significant, concatenated into a large field inserted into the database, which may lead to the following issues:

  1. Exceeding the maximum write limit for a single database operation (such as the max_allowed_package parameter in MySQL).
  2. Significant network IO and database disk IO overhead due to a large amount of data.

Brainstorming

For the first issue, the max_allowed_package parameter limit can be increased based on the actual situation of the business to avoid the "query is too large" problem. For the second issue, increasing bandwidth and using high-performance SSD as the database storage medium can help.

The above solutions involve external or costly measures. Is there a framework-level solution to address the pain points mentioned above?

Considering the root cause of the pain points mentioned above, the problem lies in the generation of excessively large data fields. Therefore, if the corresponding data can be compressed at the business level before data transmission and storage, theoretically, it can solve the problems mentioned above.

Feasibility Analysis

Combining the brainstorming above, in practical development, when large batch operations are required, they are often scheduled during periods of relatively low user activity and low concurrency. At such times, CPU and memory resources can be relatively more utilized to quickly complete the corresponding operations. Therefore, by consuming CPU and memory resources to compress rollback data, the size of data transmission and storage can be reduced.

At this point, two things need to be demonstrated:

  1. After compression, it can reduce the pressure on network IO and database disk IO. This can be measured by the total time taken for data compression + storage in the database.
  2. After compression, the efficiency of compression compared to the original data size. This can be measured by the data size before and after compression.

Testing the time spent on compressing network usage:

image

Compression Ratio Test:

image

The test results clearly indicate that using gzip or zip compression can significantly reduce the pressure on the database and network transmission. At the same time, it can substantially decrease the size of the stored data.

Implementation

Implementation Approach

Compression

Partial Code

# Whether to enable undo_log compression, default is true
seata.client.undo.compress.enable=true

# Compressor type, default is zip, generally recommended to be zip
seata.client.undo.compress.type=zip

# Compression threshold for enabling compression, default is 64k
seata.client.undo.compress.threshold=64k

Determining Whether the Undo_Log Compression Feature is Enabled and if the Compression Threshold is Reached

protected boolean needCompress(byte[] undoLogContent) {
// 1. Check whether undo_log compression is enabled (1.4.2 Enabled by Default).
// 2. Check whether the compression threshold has been reached (64k by default).
// If both return requirements are met, the corresponding undoLogContent is compressed
return ROLLBACK_INFO_COMPRESS_ENABLE
&& undoLogContent.length > ROLLBACK_INFO_COMPRESS_THRESHOLD;
}

Initiating Compression for Undo_Log After Determining the Need

// If you need to compress, compress undo_log
if (needCompress(undoLogContent)) {
// Gets the compression type, default zip
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
//Get the corresponding compressor and compress it
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
}
// else does not need to compress and does not need to do anything

Save the compression type synchronously to the database for use when rolling back:

protected String buildContext(String serializer, CompressorType compressorType) {
Map<String, String> map = new HashMap<>();
map.put(UndoLogConstants.SERIALIZER_KEY, serializer);
// Save the compression type to the database
map.put(UndoLogConstants.COMPRESSOR_TYPE_KEY, compressorType.name());
return CollectionUtils.encodeMap(map);
}

Decompress the corresponding information when rolling back:

protected byte[] getRollbackInfo(ResultSet rs) throws SQLException  {
// Gets a byte array of rollback information saved to the database
byte[] rollbackInfo = rs.getBytes(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
// Gets the compression type
// getOrDefault uses the default value CompressorType.NONE to directly upgrade 1.4.2+ to compatible versions earlier than 1.4.2
String rollbackInfoContext = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = CollectionUtils.decodeMap(rollbackInfoContext);
CompressorType compressorType = CompressorType.getByName(context.getOrDefault(UndoLogConstants.COMPRESSOR_TYPE_KEY,
CompressorType.NONE.name()));
// Get the corresponding compressor and uncompress it
return CompressorFactory.getCompressor(compressorType.getCode())
.decompress(rollbackInfo);
}

peroration

By compressing undo_log, Seata can further improve its performance when processing large amounts of data at the framework level. At the same time, it also provides the corresponding switch and relatively reasonable default value, which is convenient for users to use out of the box, but also convenient for users to adjust according to actual needs, so that the corresponding function is more suitable for the actual use scenario.

· 13 min read
  1. seata version: 1.4.0, but all versions below 1.4 also have this problem.
  2. Problem description: In a global transaction, a pure query operation on a branch transaction suddenly gets stuck without any feedback (logs/exceptions) until the RPC timeout on the consumer side

! image.png

Problem Troubleshooting

  1. The whole process is in a global transaction, the consumer and provider can be seen as two branches of the global transaction, consumer --> provider.
  2. the consumer first executes some local logic, and then sends an RPC request to the provider to make sure that the consumer has sent the request and the provider has received it.
  3. the provider first prints a log, and then executes a pure query SQL, if the SQL is executed properly, it will print the log, but the current phenomenon is that only the log before the execution of the SQL is printed, and no SQL-related logs are printed. Find DBA to check the SQL log, and found that the SQL is not executed.
  4. Determined that the operation is only a pure query operation under the global transaction, before the operation, the overall process of the global transaction is completely normal.
  5. In fact, the phenomenon here has been very obvious, but at that time the idea did not change over, has been concerned about the query SQL, always thinking that even if the query timeout and other reasons should be thrown exceptions ah, should not be nothing. DBA can not find the query record, that is not to say that the SQL may not have been executed ah, but in the execution of the SQL before the problem, such as the agent?
  6. With the help of arthas's watch command, there is no output. The first log output means that the method must have been executed, and the delay in outputting the result means that the current request is stuck, why is it stuck?
  7. With arthas's thread command thread -b, thread -n, is to find out the current busiest thread. This works very well, there is a thread CPU usage 92%, and because of this thread caused the other 20 or so Dubbo threads BLOCKED. The stack information is as follows
  8. Analysing the stack information, we can clearly find the interface related to seata, which is probably related to seata's data source proxy; at the same time, we found that the thread with the highest CPU usage is stuck in the ConcurrentHashMap#computeIfAbsent method. Is there a bug in the ConcurrentHashMap#computeIfAbsent method?
  9. By now, we don't know the exact cause of the problem, but it should have something to do with seata's data source proxy, and we need to analyse both the business code and the seata code to find out why.

! image.png

Problem analysis

ConcurrentHashMap##computeIfAbsent

This method does have the potential for problems: if two keys have the same hascode, and the computeIfAbsent operation is performed in the corresponding mappingFunction, it will lead to a dead loop, refer to this article for specific analysis: https://juejin.cn/post/ 6844904191077384200

Seata data source autoproxy

Related content has been analysed before, let's focus on the following core classes:

  1. SeataDataSourceBeanPostProcessor
  2. SeataAutoDataSourceProxyAdvice
  3. DataSourceProxyHolder
SeataDataSourceBeanPostProcessor

The SeataDataSourceBeanPostProcessor is a BeanPostProcessor implementation class that creates a seataAutoDataSourceProxyDataSource for the data source configured by the business side in the postProcessAfterInitialization method (i.e., after the bean is initialised). proxy data source

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy. if (!excludes.contains.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}
//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy."); return ((SeataDataSourceProxy); } }
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean.
}
}
SeataAutoDataSourceProxyAdvice

SeataAutoDataSourceProxyAdvice is a MethodInterceptor, SeataAutoDataSourceProxyCreator in seata creates a dynamic proxy object for Bean of type DataSource, the proxy logic is the SeataAutoDataSourceProxyAdvice#invoke logic. That is: when executing the relevant methods of the DataSourceAOPProxyAdvice, it will go through its invoke method, and in the invoke method, it will find the corresponding SeataAutoDataSourceProxyAdvice according to the native data source, which will ultimately execute the SeataAutoDataSourceProxyAdvice logic.

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
......
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (!RootContext.requireGlobalLock() && dataSourceProxyMode ! = RootContext.getBranchType()) {
return invocation.proceed();
}
Method method = invocation.getMethod();
Object[] args = invocation.getArguments(); } Method m = BeanUtils.getMethod(); }
Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m ! = null) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode ); return m.invoke(dataSourceProxyHolder).
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
}
DataSourceProxyHolder

The process is clear to us, now there is a question, how to maintain the relationship between native data source and seata proxy data source? It is maintained by DataSourceProxyHolder, which is a singleton object that maintains the relationship between the two through a ConcurrentHashMap: native data source as key --> seata proxy data source as value.

public class DataSourceProxyHolder {
public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource = dataSource;
......
return CollectionUtils.computeIfAbsent(this.dataSourceProxyMap, originalDataSource, BranchType.
BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new);
}
}


// CollectionUtils.java
public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value ! = null) {
return value; }
}
return map.computeIfAbsent(key, mappingFunction);
}

Client data source configuration

  1. Two data sources are configured: DynamicDataSource, P6DataSource, P6DataSource, P6DataSource and P6DataSource.
  2. P6DataSource can be seen as a wrapper for DynamicDataSource. 3.
  3. Let's not worry about whether this configuration makes sense or not, now we just analyse the problem purely based on this data source configuration.
@Qualifier("dsMaster")
@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}

@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(dsMaster());
return p6DataSource;
}

Analyse the process

Assuming that by now everyone is aware of the problems that may arise from ConcurrentHashMap#computeIfAbsent, it is known that this problem has now arisen, and in combination with the stack information, we can see roughly where this problem has arisen.

1, ConcurrentHashMap#computeIfAbsent will produce this problem precondition is: two key hashcode is the same; mappingFunction corresponds to a put operation. Combined with our seata usage scenario, the mappingFunction corresponds to DataSourceProxy::new, suggesting that the put operation may be triggered in the DataSourceProxy's constructor method

! image.png

Execute AOP proxy data source related methods =>
Enter SeataAutoDataSourceProxyAdvice cutover logic =>
Execute DataSourceProxyHolder#putDataSource method =>
Execute DataSourceProxy::new =>
AOP proxy data source's getConnection method =>
The getConnection method of the native data source =>
Enter SeataAutoDataSourceProxyAdvice cutover logic =>
Execute DataSourceProxyHolder#putDataSource method =>
Execute DataSourceProxy::new =>
DuridDataSource's getConnection method

2, What is the AOP proxy data source and native data source stated in step 1? Look at the following diagram ! image.png

3, the above also said to produce this problem there is a condition two key hashcode the same, but I see that these two data source objects are not overriding the hashcode method, so by definition, these two objects must be different hashcode. After looking at the ConcurrentHashMap problem again, I feel that the statement two keys have the same hashcode is not correct, and two keys will generate hash conflict is more reasonable, which explains why two objects with different hashcodes will encounter this problem. To prove this, I've given an example below.

public class Test {
public static void main(String[] args) {
ConcurrentHashMap map = new ConcurrentHashMap(8); Num n1 = new Num(8)
Num n1 = new Num(3);
Num n2 = new Num(19); Num n3 = new Num(19); Num n3 = new Num(19)
Num n3 = new Num(20);

// map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)); // This line of code does not cause the program to loop.
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)); // this line of code will cause the program to die
}

static class Num{
private int i; public Num(int i){
public Num(int i){
this.i = i; } static class Num{ private int i; public Num(int i){ this.
}

public int hashCode() { this.i = i; this.i = i; }
public int hashCode() {
return i; } @Override public int hashCode() { this.i = i; }
}
}
}
  1. To make it easier to reproduce the problem, we rewrite the Num#hashCode method to ensure that the constructor input is the hashcode value.
  2. create a ConcurrentHashMap object, initialCapacity is 8, sizeCtl calculated value is 16, that is, the default length of the array in the map is 16
  3. create object n1, the input parameter is 3, that is, the hashcode is 3, the calculation of its corresponding array subscript 3
  4. create object n2, the input parameter is 19, that is, the hashcode is 19, calculate its corresponding array subscript is 3, at this time we can think of n1 and n2 hash conflict.
  5. create object n3 with input 20, i.e., hashcode 20, and its corresponding array subscript is 4.
  6. execute map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)), the programme exits normally: Because there is no hash conflict between n1 and n3, the programme terminates normally.
  7. Execute map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)), the programme exits normally: because n1 and n2 have a hash conflict, so it is in a dead loop.

4、During the initialisation of the object, hasn't SeataDataSourceBeanPostProcessor already initialised the corresponding data source proxy of the object? Why the corresponding data source proxy is still created in SeataAutoDataSourceProxyAdvice?

  1. First of all, the SeataDataSourceBeanPostProcessor execution period is later than the creation of the AOP proxy object, so when executing the SeataDataSourceBeanPostProcessor related methods, the SeataAutoDataSourceBeanPostProcessor method is executed. SeataAutoDataSourceProxyAdviceshould actually take effect when theSeataDataSourceBeanPostProcessor` related methods are executed.
  2. when adding elements to the map in SeataDataSourceBeanPostProcessor, the key is AOP proxy datasource; in SeataAutoDataSourceProxyAdvice, the key is native datasource, so the key is not the same as invocation.getThis(), so the key is not the same. `, so the key is not the same

! image.png

5, there is another problem, SeataAutoDataSourceProxyAdvic#invoke method does not filter toString, hashCode and other methods, the proxy object created by cglib will override these methods by default, if these methods of the proxy object are triggered when putting elements into the map. If these methods are triggered when putting elements into the map, the proxy object will re-enter the SeataAutoDataSourceProxyAdvic#invoke cut until the thread stack benefits.

Summary of the problem

  1. in two key will produce hash conflict, will trigger ConcurrentHashMap#computeIfAbsent BUG, the performance of this BUG is to make the current thread into a dead loop
  2. business feedback, the problem is occasional, occasional for two reasons: first, the application is a multi-node deployment, but only one node on the line triggered the BUG (hashcode conflict), so only when the request hits this node may trigger the BUG; second, because each restart object address (hashcode) are not sure, so not triggered after every app restart, but if once triggered, the node will always have this problem. Having a thread that keeps dying and blocking other threads that are trying to get the proxy data source from the map is business feedback that the request is stuck. If this happens to successive requests, the business side may restart the service, and then, ``because the hash conflict does not necessarily exist after the restart, the business may behave normally after the restart, but it is also possible that the bug will be triggered again on the next restart.
  3. when encountering this problem, from the perspective of the whole problem, it is indeed a deadlock, because the dead loop thread occupant lock has not been released, resulting in other threads operating on the map is BLOCKED!
  4. essentially because ConcurrentHashMap#computeIfAbsent method may trigger a bug, and seata's usage scenario just triggered the bug.
  5. The following demo is actually a complete simulation of what happens when something goes wrong online, as follows:
public class Test {
public static void main(String[] args) {

ConcurrentHashMap map = new ConcurrentHashMap(8);

Num n1 = new Num(3);
Num n2 = new Num(19);

for(int i = 0; i< 20; i++){
new Thread(()-> {
try {
Thread.sleep(1000); } catch (InterruptedException e.g.
} catch (InterruptedException e) {
e.printStackTrace();
}

map.computeIfAbsent(n1, k-> 200); }).start(); }
}).start();
}
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200));
}


static class Num{
private int i; public Num(int i){

public Num(int i){
this.i = i; }
}
public int hashCode() { this.i = i; this.i = i; }
public int hashCode() {
return i; } @Override public int hashCode() { this.i = i; }
}
}
}

! image.png

Solving the problem

The problem can be solved in two ways:

  1. business changes: P6DataSource and DynamicDataSource do not need to be proxied, directly proxy P6DataSource can be, DynamicDataSource does not need to be declared as a bean; or through the excludes property excludes P6DataSource, so that there is no duplicate proxy problem. There will be no problem of duplicate proxies
  2. Seata refinement: improve the logic related to data source proxy
Business changes
  1. Data source related configuration can be changed to the following:
@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(new TuYaDynamicDataSource(masterDsRoute));
logger.warn("dsMaster={}, hashcode={}",p6DataSource, p6DataSource.hashCode());
return p6DataSource.
}
  1. Or leave the current data source configuration unchanged and add the excludes property
@EnableAutoDataSourceProxy(excludes={"P6DataSource"})
Seata refinement
  1. ConcurrentHashMap#computeIfAbsent method is changed to double check as follows:
SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
synchronized (dataSourceProxyMap) {
dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
dataSourceProxyMap.put(originalDataSource, dsProxy);
}
}
}
return dsProxy;

I wanted to change the CollectionUtils#computeIfAbsent method directly, and the feedback from the group was that this might cause the data source to be created multiple times, which is indeed a problem: as follows

public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value ! = null) {
return value; }
}
value = mappingFunction.apply(key);
return map.computeIfAbsent(key, value);
}
  1. Add some filtering to the SeataAutoDataSourceProxyAdvice cutout logic
Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m ! = null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode ); return m.invoke(dataSourceProxyHolder).
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}

Legacy issues

In the corresponding methods of SeataDataSourceBeanPostProcessor and SeataAutoDataSourceProxyAdvice, the keys corresponding to initialising the seata datasource proxy into the map are fundamentally different, the The key inSeataDataSourceBeanPostProcessoris theAOP proxy data source; the key in SeataAutoDataSourceProxyAdviceis the native object, which results in the unnecessary creation of theseata data source proxy` object.

What is the best suggestion for this problem? Is it possible to specify an order for SeataDataSourceBeanPostProcessor to take effect before the AOP proxy object is created?

Link to original article

https://juejin.cn/post/6939041336964153352/

· 15 min read

"Just getting started with Seata and don't know enough about its modules?
Want to dive into the Seata source code, but haven't done so yet?
Want to find out what your application is doing "on the sly" during startup after integrating Seata?
Want to learn the design concepts and best practices of Seata as a great open source framework?
If any of the above apply to you, then today's article is for you!

Preface

In Seata's application-side (RM, TM) startup process, the first thing to do is to establish communication with the coordinator side (TC), which is a prerequisite for Seata to be able to complete the distributed transaction coordination, so Seata in the process of completing the initialisation of the application side and establishing a connection with the TC, it is How to find the cluster and address of the TC Transaction Coordinator? And how does it get various configuration information from the configuration module? That's what this article is going to explore.

Give a qualification

Seata as a middleware level of the underlying components, is very careful to introduce third-party frameworks for specific implementations, interested students can learn more about Seata's SPI mechanism, to see how Seata is through a large number of extension points (Extension), to invert the specific implementation of the dependent components out of the turn rely on abstract interfaces, and at the same time, Seata in order to better At the same time, Seata in order to better integrate into microservices, cloud native and other popular architectures derived from the ecosystem, but also based on the SPI mechanism on a number of mainstream microservice frameworks, registry, configuration centre and Java development frameworks, "the leader" - SpringBoot and so on. Do the active integration , in order to ensure that the microkernel architecture , loosely coupled , scalable at the same time , but also can be very good with all kinds of components "to play with", so that the environment using a variety of technology stacks can be more convenient to introduce Seata.

In this paper, in order to be close to everyone ** just introduced Seata trial ** scene , in the following introduction , select ** application side ** qualifications are as follows : the use of **File (file) as the configuration centre and registration centre **, and based on ** SpringBoot ** start.

With this qualification, let's dive into the Seata source code and find out what's going on.

RM/TM Initialisation Process with Alternating Multi-Module Collaboration

In Seata Client Startup Process Dissection (I) , we analysed the initialization of TM and RM on the application side of Seata, and how the application side creates a Netty Channel and sends a registration request to the TC Server to send a registration request. In addition to this, during RM initialisation, several other Seata modules (Registration Centre, Configuration Centre, Load Balancing) come into play and collaborate with each other to complete the process of connecting to the TC Server.

When executing the Client reconnect to TC Server method: NettyClientChannelManager.Channreconnect(), you first need to get the list of available TC Server addresses based on the current transaction grouping:

/**
* NettyClientChannelManager.reconnect()
* Reconnect to remote server of current transaction service group.
*
* @param transactionServiceGroup transaction service group
*/
void reconnect(String transactionServiceGroup) {
List<String> availList = null; }
try {
// Get the available TC Server addresses from the registry
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e); return; {// Get the available TC Server addresses from the registry.
return; }
}
// The following code is omitted
}

For a detailed introduction to the concept of transaction grouping, you can refer to the official document Introduction to Transaction Grouping. Here is a brief introduction.

  • Each Seata application-side RM, TM, has a transaction grouping name
  • Each TC on the Seata coordinator side has a cluster name and address. The application side goes through the following two steps when connecting to the coordinator side:
  • Through the name of the transaction grouping, the cluster name of the TC corresponding to this application side is obtained from the configuration
  • By using the cluster name, the address list of the TC cluster can be obtained from the registry. The above concepts, relationships and processes are shown in the following figure: ! Relationship between Seata transaction grouping and connection establishment

Getting TC Server cluster addresses from Registry

After understanding the main concepts and steps involved in connecting TCs from RM/TC, let's move on to explore the getAvailServerList method:

private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {
//① Use the registry factory to get a registry instance.
//② Call the registry's lookup method lookUp() to get a list of the addresses of the available Servers in the TC cluster based on the transaction group name.
List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);
if (CollectionUtils.isEmpty(availInetSocketAddressList)) {
return Collections.emptyList();
}

return availInetSocketAddressList.stream()
.map(NetUtil::toStringAddress)
.collect(Collectors.toList()); }
}

Which registry to use? The Seata meta-configuration file gives the answer

As mentioned above, Seata supports a variety of registry implementations, so Seata first needs to get the "type of registry" information from a place first.

Seata has designed a "configuration file" to store some basic information about the components used in its framework. I prefer to call this configuration file "meta-configuration file ", because the information it contains is actually the "configuration of the configuration", i.e., the "configuration of the configuration", i.e., the "configuration of the configuration". This is because the information it contains is actually the "configuration of the configuration", i.e., the concept of "meta", which can be understood by comparing the information in the database table with the information in the structure of the database table itself (table data and table metadata).

We can think of the information in the Registry and Configuration Centre as configuration information itself, and what is the configuration** of this **configuration information? This information, then, is contained in Seata's meta-configuration file. In fact, there are only two types of information contained in the 'meta-configuration file':

  • The first is the type of registry: registry.type, as well as some basic information about that type of registry, for example, when the registry type is a file, the meta configuration file stores the file's name information; when the registry type is Nacos, the meta configuration file stores Nacos addresses, namespaces, cluster names and other information.
  • Second, the type of configuration centre: config.type, as well as some basic information about the type of configuration centre, such as when the configuration centre is a file, the meta-configuration file stores information about the name of the file; when the type of registry is Consul, the meta-configuration file stores information about the address of the Consul

Seata's meta-configuration file supports Yaml, Properties and other formats , and can be integrated into the SpringBoot application.yaml file ( use seata-spring-boot-starter can be ) , easy to integrate with SpringBoot .

The default meta-configuration file that comes with Seata is registry.conf, and when we use a file as the registration and configuration centre, the content in registry.conf is set as follows:

registry {
# file , nacos , eureka, redis, zk, consul, etcd3, sofa
type = "file"
file {
name = "file.conf"
}
}

config {
# file, nacos, apollo, zk, consul, etcd3
type = "file"
file {
name = "file.conf"
}
}

In the following source code, we can find that the type of registry used by Seata is taken from ConfigurationFactory.CURRENT_FILE_INSTANCE, and this CURRENT_FILE_INSTANCE is what we call, an instance of the Seata **meta-configuration file **

// In getInstance(), call buildRegistryService to build the specific registry instance
public static RegistryService getInstance() {
if (instance == null) {
synchronized (RegistryFactory.class) {
if (instance == null) {
instance = buildRegistryService();
}
}
}
return instance; }
}

private static RegistryService buildRegistryService() {
RegistryType registryType.
// Get the registry type
String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
try {
registryType = RegistryType.getType(registryTypeName); } catch (Exception exx); exx = RegistryType.
} catch (Exception exx) {
throw new NotSupportYetException("not support registry type: " + registryTypeName); }
}
if (RegistryType.File == registryType) {
return FileRegistryServiceImpl.getInstance(); } else {
} else {
// Load the registry instance using the SPI method based on the registry type
return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
}
}

Let's look at the initialisation process of the meta-configuration file, which triggers the initialisation of the ConfigurationFactory class when the static field CURRENT_FILE_INSTANCE is fetched for the first time:

   // Static block of the ConfigurationFactory class
static {
load();
}

/**
* In the load() method, load Seata's meta configuration file
*/
private static void load() {
// The name of the meta configuration file, support through the system variable, environment variable expansion
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (seataConfigName == null) {
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}
if (seataConfigName == null) {
seataConfigName = REGISTRY_CONF_DEFAULT;
}
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (envValue == null) {
envValue = System.getenv(ENV_SYSTEM_KEY); }
}
// Create a file configuration instance that implements the Configuration interface based on the meta-configuration file name
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName,
false) : new FileConfiguration(seataConfigName + "-" + envValue, false);
Configuration extConfiguration = null;
// Determine if an extended configuration provider exists by loading it through SPI
//When the application side uses seata-spring-boot-starer, it will pass the SpringBootConfigurationProvider as the extended configuration provider, at this point, when getting the meta-configuration item, it will no longer get it from file.conf (the default), but from application. properties/application.yaml.
try {
// Replace the original Configuration instance with an instance of the extended configuration via the ExtConfigurationProvider's provide method
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null ? configuration.getClass().getSimpleName()
: extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
// Existence of an extended configuration returns an instance of the extended configuration, otherwise it returns an instance of the file configuration
CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
}

The call sequence diagram for the load() method is as follows: ! Seata metaconfiguration file loading process

In the above sequence diagram, you can focus on the following points:

  • Seata meta configuration file Name support extension
  • Seata meta-configuration file suffixes** support 3 suffixes**, yaml/properties/conf, which will be attempted to match in turn when the meta-configuration file instance is created
  • Seata ** configuration capabilities related to the top-level interface for the Configuration **, a variety of configuration centres are required to implement this interface, Seata's meta-configuration file is the use of FileConfiguration (file type configuration centre) to implement this interface
/**
* Seata Configuration Capability Interface
* package: io.seata.config
*/

public interface Configuration {
/**
* Gets short.
*
* @param dataId the data id
* @param defaultValue the default value
* @param timeoutMills the timeout mills
* @return the short
*/short getShort(String dataId)
short getShort(String dataId, int defaultValue, long timeoutMills);; short getShort(String dataId, int defaultValue, long timeoutMills)

// The following content is omitted, the main ability to add, delete and retrieve configuration
}
  • Seata provides an extension point of type ExtConfigurationProvider, opening up the ability to extend the specific implementation of the configuration, which has a provide() method to receive the original Configuration, return a completely new Configuration, the form of the methods of this interface determines that the general The form of this interface method determines that, in general, static proxies, dynamic proxies, decorators and other design patterns can be used to implement this method to achieve the original Configuration enhancement.
/**
* Seata extends the Configuration Provider interface
* package: io.seata.configuration
*/
public interface ExtConfigurationProvider {
/**
* provide a AbstractConfiguration implementation instance
* @param originalConfiguration
* @return configuration
*/
Configuration provide(Configuration originalConfiguration); }
}
  • When the application side is started based on seata-seata-spring-boot-starter, it will ** use "SpringBootConfigurationProvider" as the extended configuration provider ** and in its provide method, it uses dynamic bytecode generation (CGLIB) to create a dynamic proxy class for the "FileConfiguration" instance. FileConfiguration' instance using dynamic bytecode generation (CGLIB) to create a dynamic proxy class that intercepts all methods starting with "get" to get meta-configuration items from application.properties/application.yaml.

SpringBootConfigurationProvider class, this article only explains the implementation of the idea , no longer unfolding the analysis of the source code, which is only an implementation of the ExtConfigurationProvider interface, from the point of view of the Configuration can be extended, can be replaced , Seata is precisely through the ExtConfigurationProvider such an extension point for the implementation of a variety of configurations provides a broad stage , allowing a variety of configuration implementation and access options.

After going through the above loading process, if we didn't extend the configuration provider, we would get the registry type of file from the Seata meta-configuration file, and at the same time create a file registry instance: FileRegistryServiceImpl

Getting the TC Server address from the registry centre

After getting the registry instance, you need to execute the lookup() method (RegistryFactory.getInstance(). lookup(transactionServiceGroup)), FileRegistryServiceImpl.lookup() is implemented as follows:

/**
* Get a list of available addresses for TC Server based on the transaction group name
* package: io.seata.discovery.registry
* class: FileRegistryServiceImpl
*/
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
// Get TC Server cluster name
String clusterName = getServiceGroup(key);
if (clusterName == null) {
if (clusterName == null) { return null; }
}
//Get all available Server addresses in the TC cluster from the Configuration Centre
String endpointStr = CONFIG.getConfig(
PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + clusterName + POSTFIX_GROUPLIST);
if (StringUtils.isNullOrEmpty(endpointStr)) {
throw new IllegalArgumentException(clusterName + POSTFIX_GROUPLIST + " is required");
}
// Encapsulate the address as InetSocketAddress and return it
String[] endpoints = endpointStr.split(ENDPOINT_SPLIT_CHAR);
List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();
for (String endpoint : endpoints) {
String[] ipAndPort = endpoint.split(IP_PORT_SPLIT_CHAR);
if (ipAndPort.length ! = 2) {
throw new IllegalArgumentException("endpoint format should be like ip:port");;
}
inetSocketAddresses.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1]))); }
}
return inetSocketAddresses;
}

/**
* default method in the registry interface
* package: io.seata.discovery.registry
* class: RegistryService
*/
default String getServiceGroup(String key) {
key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
// In the configuration cache, add a transaction group name change listening event.
if (!SERVICE_GROUP_NAME.contains(key)) {
ConfigurationCache.addConfigListener(key);
SERVICE_GROUP_NAME.add(key);
}
// Get the TC cluster name corresponding to the transaction grouping from the Configuration Centre
return ConfigurationFactory.getInstance().getConfig(key);
}

As you can see, the code logic matches the flow in Figure Seata Transaction Grouping in Relation to Establishing Connections in Section I. At this point, the registry will need assistance from the Configuration Centre to get the cluster name corresponding to the transaction grouping and to find the available service addresses in the cluster.

Get TC cluster name from Configuration Centre

Configuration Centre initialisation

The initialisation of the configuration centre (in ConfigurationFactory.buildConfiguration()) is similar to the initialisation process of the registration centre, which is to get the type of the configuration centre and other information from the meta-configuration file first, and then initialise a specific instance of the configuration centre, which is no longer repeated here, with the foundation of the previous analysis.

Getting the value of a configuration item

The two methods in the above snippet, FileRegistryServiceImpl.lookup() and RegistryService.getServiceGroup(), both get the values of the configuration items from the configuration centre:

  • lookup() need to be implemented by the specific registry, the use of file as a registry, in fact, is a direct connection to the TC Server, the special point is that **TC Server's address is written to death in the configuration ** (normal should be stored in the registry), so FileRegistryServiceImpl.lookup() method, is the address information of the Server in the TC cluster obtained through the configuration centre.
  • getServiceGroup() is the default method in the RegistryServer interface, which is the public implementation of all registries. Any kind of registry in Seata needs to be configured to get the TC cluster name based on the name of the transaction group.

Load Balancing

After the above link configuration centre, registration centre collaboration, now we have obtained the current application side of all the available TC Server address, then before sending the real request, you also need to pass a specific load balancing policy, select a TC Server address, this part of the source code is relatively simple, will not take you to analyse.

About the load balancing source code, you can read AbstractNettyRemotingClient.doSelect(), because the code analysed in this article is the reconnection method of RMClient/TMClient, in this method, all the obtained Server addresses will be connected (reconnected) sequentially by traversing, so here There is no need to do load balancing.

The above is the Seata application side in the startup process, the registration centre and configuration centre of the two key modules between the collaboration and workflow, welcome to discuss and learn together!

Postscript: This article and its predecessor Seata client startup process dissection (a), is the first batch of technical blogs written by me, will be on the hands of Seata, I personally believe that Seata in the more complex, need to study and figure out. When I started Seata, I have analysed and documented some of the more complex parts of Seata's source code that I think need to be researched and figured out. I welcome any suggestions for improvement from readers, thank you!

· 9 min read

"Just started with Seata and don't have a deep understanding of its various modules?
Want to delve into Seata's source code but haven't taken the plunge yet?
Curious about what your application does 'secretly' during startup after integrating Seata?
Want to learn about the design principles and best practices embodied in Seata as an excellent open-source framework?
If you have any of the above thoughts, then this article is tailor-made for you~

Introduction

Those who have seen the first picture in the official README should know that Seata coordinates distributed transactions through its coordinator side TC, which communicates and interacts with the application side TM and RM to ensure data consistency among multiple transaction participants in distributed transactions. So, how does Seata establish connections and communicate between the coordinator side and the application side?

That's right, the answer is Netty. Netty, as a high-performance RPC communication framework, ensures efficient communication between TC and RM. This article will not go into detail about Netty; instead, our focus today is on how the **application side, during startup, uses a series of Seata's key modules (such as RPC, Config/Registry Center, etc.) to establish communication with the coordinator side.

Starting with GlobalTransactionScanner

We know that Seata provides several development annotations, such as @GlobalTransactional for enabling distributed transactions, @TwoPhraseBusinessAction for declaring TCC two-phase services, and so on, which are based on the Spring AOP mechanism to enhance the annotations by assigning the corresponding bean methods to interceptors. Interceptors are enhanced to complete the corresponding processing logic. GlobalTransactionScanner, a Spring bean, carries the responsibility of assigning interceptors to annotations. From the name of its scanner, it is not difficult to deduce that it is designed for the startup of the Spring application, and the global transaction (GlobalTransactionScanner). GlobalTransactionScanner) during Spring application startup.

In addition, the process of initialising the application-side RPC clients (TMClient, RMClient) and establishing a connection with the TC is also initiated in GlobalTransactionScanner#afterPropertiesSet():

/**
* package: io.seata.spring.annotation
* class: GlobalTransactionScanner
*/
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return.
}
// Perform TM, RM initialisation after the bean properties are initialised
initClient();

}

Initialisation and connection process of RM & TM

Here, we take RMClient.init() as an example, and the initialisation process of TMClient is the same.

Design of class relationship

Looking at the source code of RMClient#init(), we find that RMClient first constructs an RmNettyRemotingClient, and then executes its initialisation init() method. The constructor and initialisation methods of RmNettyRemotingClient call the constructor and initialisation methods of the parent class layer by layer

    /**
* RMClient's initialisation logic
* package: io.seata.rm
* class: RMClient
*/
public static void init(String applicationId, String transactionServiceGroup) {
//① Start with the RmNettyRemotingClient class and call the constructor of the parent class in turn

rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
//② Then, starting with the RmNettyRemotingClient class, call init() of the parent class in turn
rmNettyRemotingClient.init();
}

The relationship between the above RMClient family classes and the process of calling the constructor and init() initialisation method is illustrated in the following diagram: Relationship between the simplified version of the RMClient.init process and the main classes

So why did you design RMClient with such a more complex inheritance relationship? In fact, it is in order to divide the responsibilities and boundaries of each layer clearly, so that each layer can focus on specific logic processing, to achieve better scalability, this part of the detailed design ideas, you can refer to the Seata RPC module refactoring PR of the operator by Hui brother's article! The Road to Seata-RPC Refactoring)

The complete flow of initialisation

The main logic in the constructor and initialisation methods of each class can be sorted out with the help of the following ideographic sequence diagram, which can also be skipped first, and then looked back to see when these classes debut and how they interact with each other after we have analysed a few key classes below. Initialisation flow of RMClient

Grabbing the core - Channel creation

First of all, we need to know that the communication between the application side and the coordinator side is done with the help of Netty's Channel, so the key to the communication process lies in the creation of the Channel**, which is created and managed in Seata by means of pooling (with the help of the object pool in common-pool).

Here we need to briefly introduce the simple concept of object pool and its implementation in Seata: The main classes in common-pool are involved:

  • GenericKeydObjectPool<K, V>: A KV generic object pool that provides access to all objects, while object creation is done by its internal factory class.
  • KeyedPoolableObjectFactory<K, V>: KV generic object factory responsible for the creation of pooled objects, held by the object pool

The main classes involved are related to the implementation of object pooling in Seata:

  • First, the pooled objects are Channel, which corresponds to the generic V in common-pool.

  • NettyPoolKey: Key for Channel, corresponding to generic K in common-pool, NettyPoolKey contains two main information:

    • address:Address of TC Server when the Channel is created.
    • message:The RPC message sent to TC Server when the Channel is created.
  • GenericKeydObjectPool<NettyPoolKey,Channel>: Pool of Channel objects.

  • NettyPoolableFactory: the factory class for creating Channel. Having recognised the main classes related to object pooling above, let's take a look at some of the main classes in Seata that are involved in channel management and are related to RPC:

  • NettyClientChannelManager:

    • Holds the pool of Channel objects.
    • Interacts with the channel object pool to manage application-side channels (acquisition, release, destruction, caching, etc.).
  • RpcClientBootstrap: core bootstrap class for RPC clients, holds the Netty framework bootstrap object with start/stop capability; has the ability to get a new Channel based on the connection address for the Channel factory class to call.

  • AbstractNettyRemotingClient:

    • Initialises and holds the RpcClientBootstrap.
    • Application-side Netty client top-level abstraction, abstracts the ability of application-side RM/TM to obtain the NettyPoolKey corresponding to their respective Channel, for NettyClientChannelManager to call.
    • Initialising the NettyPoolableFactory

Understanding the above concepts, we can simplify the process of creating a channel in Seata as follows: Process of creating a Channel object

When you see this, you can go back and take a look at the above Initialisation Sequence Diagram for RMClient, and you should have a clearer understanding of the responsibilities and relationships of the various categories in the diagram, as well as the intent of the entire initialisation process.

Timing and flow of establishing a connection

So, when does RMClient establish a connection with Server?

During the initialisation of RMClient, you will find that many init() methods set up some timed tasks, and the mechanism of reconnecting (connecting) the Seata application side to the coordinator is achieved through timed tasks:

/**
* package: io.seata.core.rpcn.netty
* class: AbstractNettyRemotingClient
*/
public void init() {
// Set the timer to reconnect to the TC Server at regular intervals.
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}

Let's see how the classes we explored above work together to connect RMClient to TC by tracing the execution of a reconnect (the first connection may actually occur during registerResource, but the process is the same) RMClient and TC Server connection process

In this diagram, you can focus on these points:

  • NettyClientChannelManager executes the callback function (getPoolKeyFunction()) to get the NettyPoolKey in the concrete AbstractNettyRemotingClient: the different Clients (RMClient and TMClient) on the application side, when they create the NettyPoolKey, they create the NettyChannelManager. TMClient) on the application side, the Key used when creating the Channel is different, so that they send different registration messages when reconnecting to the TC Server, which is also determined by the different roles they play in Seata:
    • TMClient: plays the role of transaction manager, when creating a Channel, it only sends a TM registration request (RegisterTMRequest) to the TC.
    • RMClient: plays the role of resource manager, needs to manage all transaction resources on the application side, therefore, when creating a Channel, it needs to get all transaction resource information on the application side before sending RM registration request (RegisterRMRequest), and register it to TC Server.
  • In the Channel object factory's NettyPoolableFactory's makeObject (create Channel) method, two tasks are completed using the two pieces of information in NettyPoolKey:
    • A new Channel is created using the address from NettyPoolKey.
    • A registration request is sent to the TC Server using the message from NettyPoolKey and the new Channel. This is the Client's initial connection (first execution) or reconnection (subsequent executions driven by scheduled tasks) request to the TC Server.

The above content covers the entire process of the Seata application's initialization and its connection establishment with the TC Server coordinator side.

For deeper details, it is recommended to thoroughly read the source code based on the outline and key points mentioned in this article. This will undoubtedly lead to a deeper understanding and new insights!

Postscript: Considering the length and to maintain a suitable amount of information for a source code analysis article, the collaboration of configuration and registration modules mentioned in the introduction was not expanded upon in this article.
In the next source code analysis, I will focus on the configuration center and registration center, analyzing how the Seata application side discovers the TC Server through service discovery and how it obtains various information from the configuration module before establishing connections between RMClient/TM Client and the TC Server.

· 6 min read

This article will introduce how to integrate Seata (1.4.0) with Spring Cloud and Feign using the TCC mode. In practice, Seata's AT mode can meet about 80% of our distributed transaction needs. However, when dealing with operations on databases and middleware (such as Redis) that do not support transactions, or when using databases that are not currently supported by the AT mode (currently AT supports MySQL, Oracle, and PostgreSQL), cross-company service invocations, cross-language application invocations, or the need for manual control of the entire two-phase commit process, we need to combine the TCC mode. Moreover, the TCC mode also supports mixed usage with the AT mode.

一、The concept of TCC mode

In Seata, a distributed global transaction follows a two-phase commit model with a Try-[Confirm/Cancel] pattern. Both the AT (Automatic Transaction) mode and the TCC (Try-Confirm-Cancel) mode in Seata are implementations of the two-phase commit. The main differences between them are as follows:

AT mode is based on relational databases that support local ACID transactions (currently supporting MySQL, Oracle, and PostgreSQL):

The first phase, prepare: In the local transaction, it combines the submission of business data updates and the recording of corresponding rollback logs. The second phase, commit: It immediately completes successfully and automatically asynchronously cleans up the rollback logs. The second phase, rollback: It automatically generates compensation operations through the rollback logs to complete data rollback.

On the other hand, TCC mode does not rely on transaction support from underlying data resources:

The first phase, prepare: It calls a custom-defined prepare logic. The second phase, commit: It calls a custom-defined commit logic. The second phase, rollback: It calls a custom-defined rollback logic.

TCC mode refers to the ability to include custom-defined branch transactions in the management of global transactions.

In summary, Seata's TCC mode is a manual implementation of the AT mode that allows you to define the processing logic for the two phases without relying on the undo_log used in the AT mode.

二、prepare

三、Building TM and TCC-RM

This chapter focuses on the implementation of TCC using Spring Cloud + Feign. For the project setup, please refer to the source code (this project provides demos for both AT mode and TCC mode).

DEMO

3.1 build seata server

build server doc

3.2 build TM

service-tm

3.3 build RM-TCC

3.3.1 Defining TCC Interface

Since we are using Spring Cloud + Feign, which relies on HTTP for communication, we can use @LocalTCC here. It is important to note that @LocalTCC must be annotated on the interface. This interface can be a regular business interface as long as it implements the corresponding methods for the two-phase commit in TCC. The TCC-related annotations are as follows:

  • @LocalTCC: Used for TCC in the Spring Cloud + Feign mode.
  • @TwoPhaseBusinessAction: Annotates the try method. The name attribute represents the bean name of the current TCC method, which can be the method name (globally unique). The commitMethod attribute points to the commit method, and the rollbackMethod attribute points to the transaction rollback method. After specifying these three methods, Seata will automatically invoke the commit or rollback method based on the success or failure of the global transaction.
  • @BusinessActionContextParameter: Annotates the parameters to be passed to the second phase (commitMethod/rollbackMethod) methods.
  • BusinessActionContext: Represents the TCC transaction context.

Here is an example:

/**
* Here we define the TCC interface.
* It must be defined on the interface.
* We are using Spring Cloud for remote invocation.
* Therefore, we can use LocalTCC here.
*
*/
@LocalTCC
public interface TccService {

/**
* Define the two-phase commit.
* name = The bean name of this TCC, globally unique.
* commitMethod = The method for the second phase confirmation.
* rollbackMethod = The method for the second phase cancellation.
* Use the BusinessActionContextParameter annotation to pass parameters to the second phase.
*
* @param params
* @return String
*/
@TwoPhaseBusinessAction(name = "insert", commitMethod = "commitTcc", rollbackMethod = "cancel")
String insert(
@BusinessActionContextParameter(paramName = "params") Map<String, String> params
);

/**
* The confirmation method can be named differently, but it must be consistent with the commitMethod.
* The context can be used to pass the parameters from the try method.
* @param context
* @return boolean
*/
boolean commitTcc(BusinessActionContext context);

/**
* two phase cancel
*
* @param context
* @return boolean
*/
boolean cancel(BusinessActionContext context);
}

3.3.2 Business Implementation of TCC Interface

To keep the code concise, we will combine the routing layer with the business layer for explanation here. However, in actual projects, this may not be the case.

  • Using @Transactional in the try method allows for direct rollback of operations in relational databases through Spring transactions. The rollback of operations in non-relational databases or other middleware can be handled in the rollbackMethod.
  • By using context.getActionContext("params"), you can retrieve the parameters defined in the try phase and perform business rollback operations on these parameters in the second phase.
  • Note 1: It is not advisable to catch exceptions here (similarly, handle exceptions with aspects), as doing so would cause TCC to recognize the operation as successful, and the second phase would directly execute the commitMethod.
  • Note 2: In TCC mode, it is the responsibility of the developer to ensure idempotence and transaction suspension prevention.
@Slf4j
@RestController
public class TccServiceImpl implements TccService {

@Autowired
TccDAO tccDAO;

/**
* tcc t(try)method
* Choose the actual business execution logic or resource reservation logic based on the actual business scenario.
*
* @param params - name
* @return String
*/
@Override
@PostMapping("/tcc-insert")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public String insert(@RequestBody Map<String, String> params) {
log.info("xid = " + RootContext.getXID());
//todo Perform actual operations or operations on MQ, Redis, etc.
tccDAO.insert(params);
//Remove the following annotations to throw an exception
//throw new RuntimeException("服务tcc测试回滚");
return "success";
}

/**
* TCC service confirm method
* If resource reservation is used in the first phase, the reserved resources should be committed during the second phase confirmation
* @param context
* @return boolean
*/
@Override
public boolean commitTcc(BusinessActionContext context) {
log.info("xid = " + context.getXid() + "提交成功");
//todo If resource reservation is used in the first phase, resources should be committed here.
return true;
}

/**
* tcc cancel method
*
* @param context
* @return boolean
*/
@Override
public boolean cancel(BusinessActionContext context) {
//todo Here, write the rollback operations for middleware or non-relational databases.
System.out.println("please manually rollback this data:" + context.getActionContext("params"));
return true;
}
}

3.3.3 Starting a Global Transaction in TM and Invoking RM-TCC Interface

Please refer to the project source code in section 3.2.

With this, the integration of TCC mode with Spring Cloud is complete.

· 10 min read

When it comes to Seata configuration management, you may think of Seata in the adaptation of the various configuration centre, in fact, today to say that this is not the case, although it will also be a simple analysis of Seata and the process of adapting to the configuration centre, but the main still explain the core implementation of Seata configuration management

Before talking about the configuration centre, first briefly introduce the startup process of the Server side, because this piece involves the initialisation of the configuration management, the core process is as follows:

  1. The entry point of the process is in the Server#main method.
  2. several forms of obtaining the port: from the container; from the command line; the default port
  3. Parse the command line parameters: host, port, storeMode and other parameters, this process may be involved in the initialisation of the configuration management
  4. Metric-related: irrelevant, skip
  5. NettyServer initialisation
  6. core controller initialisation: the core of the Server side, but also includes a few timed tasks
  7. NettyServer startup

The code is as follows, with non-core code removed

public static void main(String[] args) throws IOException {
// Get the port in several forms: from the container; from the command line; the default port, use to logback.xml
int port = PortHelper.getPort(args);
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

// Parsing startup parameters, container and non-container.
ParameterParser parameterParser = new ParameterParser(args); // Parsing startup parameters, both container and non-container.

// Metric-related
MetricsManager.get().init(); // MetricsManager.get().init(); // NettyServer initialisation.

// NettyServer initialisation
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); // NettyServer initialisation.

// Core controller initialisation
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer); // Core controller initialisation.
coordinator.init(); // Initialise the core controller.

// NettyServer startup
nettyRemotingServer.init(); // NettyServer starts.
}
``

Why does ``step 3`` involve the initialisation of the configuration management? The core code is as follows:
```java
if (StringUtils.isBlank(storeMode)) {
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
SERVER_DEFAULT_STORE_MODE);
}

If storeMode is not specifically specified in the startup parameters, the configuration will be fetched through the ConfigurationFactory related API, which involves two parts in the ConfigurationFactory.getInstance() line of code: ConfigurationFactory static code initialisation and Configuration initialisation. Let's focus on analysing this part

Configuration management initialisation

ConfigurationFactory initialisation

We know that there are two key configuration files in Seata: registry.conf, which is the core configuration file and must be there, and file.conf, which is only needed if the configuration centre is File. The ConfigurationFactory static code block actually mainly loads the registry.conf file, roughly as follows:

1, in the case of no manual configuration, the default read registry.conf file, encapsulated into a FileConfiguration object, the core code is as follows:

Configuration configuration = new FileConfiguration(seataConfigName,false);
FileConfigFactory.load("registry.conf", "registry");
FileConfig fileConfig = EnhancedServiceLoader.load(FileConfig.class, "CONF", argsType, args);

2、Configuration enhancement: similar to the proxy model, to get the configuration, do some other processing inside the proxy object, the following details

Configuration extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
  1. Assign the proxy object in step 2 to the CURRENT_FILE_INSTANCE reference, which is used directly in many places as a static reference to CURRENT_FILE_INSTANCE.
   CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;

It's easy to assume that CURRENT_FILE_INSTANCE corresponds to the contents of registry.conf. I don't think registry.conf is a good name for the file, it's too ambiguous, would it be better to call it bootstrap.conf?

Configuration initialisation

Configuration actually corresponds to the configuration centre, Seata currently supports a lot of configuration centres, almost all the mainstream configuration centres are supported, such as: apollo, consul, etcd, nacos, zk, springCloud, local files. When using local files as a configuration centre, it involves the loading of file.conf, which of course is the default name and can be configured by yourself. By now, you basically know the relationship between registry.conf and file.conf.

Configuration as a single instance in ConfigurationFactory, so the initialisation logic of Configuration is also in ConfigurationFactory, the approximate process is as follows: 1, first read the config.type attribute from the registry.conf file, which is file by default.

configTypeName = CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR+ ConfigurationKeys.FILE_ROOT_TYPE);
  1. Load the configuration centre based on the value of the config.type attribute, e.g., the default is: file, then first read the registry.conf file from config.file.name to read the corresponding file name of the local file configuration centre, and then create a FileConfiguration object based on the name of the file. This loads the configuration in file.conf into memory. Similarly, if the configuration is for another Configuration Centre, the other Configuration Centre will be initialised via SPI. Another thing to note is that at this stage, if the configuration centre is a local file, a proxy object is created for it; if it is not a local file, the corresponding configuration centre is loaded via SPI
if (ConfigType.File == configType) {
String pathDataId = String.join("config.file.name");
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId); configuration = new FileConfiguration(name);
try {
// Configure the Enhanced Proxy
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration); } catch (Exception e) { { new FileConfiguration(name); configuration = new FileConfiguration(name); }
} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
} else {
configuration = EnhancedServiceLoader
.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
}

3, based on the Configuration object created in step 2, create another layer of proxy for it, this proxy object has two roles: one is a local cache, you do not need to get the configuration from the configuration every time you get the configuration; the other is a listener, when the configuration changes will update the cache it maintains. The following:

if (null ! = extConfiguration) {
configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
} else {
configurationCache = ConfigurationCache.getInstance().proxy(configuration);
}

At this point, the initialisation of the configuration management is complete. Seata initialises the configuration centre by first loading the registry.conf file to get the corresponding configuration centre information, the registry, and then initialising the configuration centre based on the corresponding information obtained. In the case of using a local file as the configuration centre, the default is to load the file.conf file. Then create a proxy object for the corresponding configuration centre to support local caching and configuration listening.

The finishing process is still relatively simple, so I'm going to throw out a few questions here:

  1. what is configuration enhancement and how is it done in Seata?
  2. if using a local file as a configuration centre, the configuration has to be defined in the file.conf file. If it is a Spring application, can the corresponding configuration items be defined directly in application.yaml?
  3. In step 2 mentioned above, why is it necessary to create the corresponding configuration enhancement proxy object for Configuration first in the case of using a local file configuration centre, but not for other configuration centres?

These 3 questions are all closely related and are all related to Seata's configuration additions. Here are the details

Configuration Management Enhancements

Configuration enhancement, in simple terms, is to create a proxy object for which proxy logic is executed when executing the target method of the target unique object, and from the perspective of the configuration centre, it is to execute the proxy logic when obtaining the corresponding configuration through the configuration centre.

  1. get the configuration through ConfigurationFactory.CURRENT_FILE_INSTANCE.getgetConfig(key).
  2. Load the registry.conf file to create the FileConfiguration object configuration.
  3. Create a proxy object configurationProxy for configuration based on SpringBootConfigurationProvider.
  4. Get the configuration centre connection information file zk nacos etc from configurationProxy.
  5. Create a configuration centre configuration object configuration2 based on the connection information.
  6. Create a proxy object configurationProxy2 for configuration2 based on SpringBootConfigurationProvider.
  7. Execute the proxy logic for configurationProxy2.
  8. Find the corresponding SpringBean based on the key.
  9. Execute the getXxx method of the SpringBean.

!

Configuration Enhancement Implementation

Configuration enhancement was also briefly mentioned above and the related code is as follows:

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
  1. First get an ExtConfigurationProvider object through the SPI machine, there is only one implementation in Seata by default: SpringBootConfigurationProvider.
  2. Through the ExtConfigurationProvider#provider method to create a proxy object for the Configuration.

The core code is as follows.

public Configuration provide(Configuration originalConfiguration) {
return (Configuration) Enhancer.create(originalConfiguration.getClass(), new MethodInterceptor() {
@Override
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy)
throws Throwable {
if (method.getName().startsWith("get") && args.length > 0) {
Object result = null;
String rawDataId = (String) args[0];
if (args.length == 1) {
result = get(convertDataId(rawDataId));
} else if (args.length == 2) {
result = get(convertDataId(rawDataId), args[1]); } else if (args.length == 2) { result = get(convertDataId(rawDataId))
} else if (args.length == 3) {
result = get(convertDataId(rawDataId), args[1], (Long) args[2]); } else if (Long) args.length == 3); }
}
if (result ! = null) {
//If the return type is String,need to convert the object to string
if (method.getReturnType().equals(String.class)) {
return String.valueOf(result); }
}
return result; }
}
}

return method.invoke(originalConfiguration, args); }
}
}); }
}

private Object get(String dataId) throws IllegalAccessException, InstantiationException {
String propertyPrefix = getPropertyPrefix(dataId); }; private Object get(String dataId); }; }; }
String propertySuffix = getPropertySuffix(dataId);
ApplicationContext applicationContext = (ApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT);
Class<? > propertyClass = PROPERTY_BEAN_MAP.get(propertyPrefix);
Object valueObject = null;
if (propertyClass ! = null) {
try {
Object propertyBean = applicationContext.getBean(propertyClass);
valueObject = getFieldValue(propertyBean, propertySuffix, dataId);
} catch (NoSuchBeanDefinitionException ignore) {

}
} else {
throw new ShouldNeverHappenException("PropertyClass for prefix: [" + propertyPrefix + "] should not be null."); }
}
if (valueObject == null) {
valueObject = getFieldValue(propertyClass.newInstance(), propertySuffix, dataId);
}

return valueObject; }
}

1, if the method starts with get and the number of arguments is 1/2/3, then perform the other logic of getting the configuration, otherwise perform the logic of the native Configuration object 2, we do not need to bother why this rule, this is a Seata agreement 3, Other logic to get the configuration, that is, through the Spring way to get the corresponding configuration value

Here has been clear about the principle of configuration enhancement, at the same time, can also be guessed that the only ExtConfigurationProvider implementation of SpringBootConfigurationProvider, must be related to the Spring

Configuration Enhancement and Spring

Before we introduce this piece, let's briefly describe how Seata is used:

  1. Non-Starter way: introduce dependency seata-all, then manually configure a few core beans.
  2. Starter way: Introduce the dependency seata-spring-boot-starter, fully automated quasi-configuration, do not need to automatically inject the core bean

The SpringBootConfigurationProvider is in the seata-spring-boot-starter module, i.e. when we use Seata by introducing seata-all, the configuration enhancement doesn't really do anything, because at this point there is no ExtConfigurationProvider to be found. ExtConfigurationProvider` implementation class can't be found at this point, so naturally it won't be enhanced.

So how does seata-spring-boot-starter tie all this together?

  1. First, in the resources/META-INF/services directory of the seata-spring-boot-starter module, there exists a spring.factors file with the following contents
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\

# Ignore for now
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration

2, in the SeataAutoConfiguration file, the following beans will be created: GlobalTransactionScanner , SeataDataSourceBeanPostProcessor, SeataAutoDataSourceProxyCreator SpringApplicationContextProvider. The first three are not related to what we are going to talk about in this article, mainly focus on SpringApplicationContextProvider, the core code is very simple, is to save the ApplicationContext:

public class SpringApplicationContextProvider implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT, applicationContext);
}
}
  1. Then, in the SeataAutoConfiguration file, some xxxProperties.Class and the corresponding Key prefixes are also cached into PROPERTY_BEAN_MAP. The xxxProperties are simply understood as the various configuration items in application.yaml:
   static {
PROPERTY_BEAN_MAP.put(SEATA_PREFIX, SeataProperties.class);
PROPERTY_BEAN_MAP.put(CLIENT_RM_PREFIX, RmProperties.class);
PROPERTY_BEAN_MAP.put(SHUTDOWN_PREFIX, ShutdownProperties.class); ...
... Omit ...
}

At this point, the whole process is actually clear, when there is SpringBootConfigurationProvider configuration enhancement, we get a configuration item as follows:

  1. first according to the p Configuration Key to get the corresponding xxxProperties object
  2. get the corresponding xxxProperties SpringBean through the ApplicationContext in the ObjectHolder.
  3. Get the value of the corresponding configuration based on the xxxProperties SpringBean.
  4. At this point, we have successfully obtained the values in application.yaml through configuration enhancement.

· 17 min read

Author | Liu Xiaomin Yu Yu

I. Introduction

In the Java world, netty is a widely used high-performance network communication framework, and many RPC frameworks are based on netty. In the golang world, getty is also a high-performance network communication library similar to netty. getty was originally developed by Yu Yu, the leader of the dubbogo project, and is available in dubbo-go as an underlying communication library. github.com/apache/dubbo-go). With the donation of dubbo-go to the apache foundation, getty eventually made its way into the apache family and was renamed dubbo-getty, thanks to the efforts of the community.

In '18, I was practicing microservices in my company, and the biggest problem I encountered at that time was distributed transactions. In the same year, Ali open-sourced their distributed transaction solution in the community, and I quickly noticed this project, which was initially called fescar, but later renamed seata. Since I was very interested in open source technology, I added a lot of community groups, and at that time, I also paid attention to the dubbo-go project, and silently dived in it. As I learnt more about seata, the idea of making a go version of a distributed transaction framework gradually emerged.

To make a golang version of distributed transaction framework, one of the first problems is how to achieve RPC communication. dubbo-go is a very good example in front of us, so we started to study the underlying getty of dubbo-go.

How to implement RPC communication based on getty?

The overall model of the getty framework is as follows:

! [image.png]( https://img.alicdn.com/imgextra/i1/O1CN011TIcL01jY4JaweOfV_! !6000000004559-2-tps-954-853.png)

The following is a detailed description of the RPC communication process of seata-golang with related code.

1. Establish Connection

To implement RPC communication, we need to establish a network connection first, let's start from client.go.

func (c *client) connect() {
var (
err error
ss Session
)

for {
// Create a session connection
ss = c.dial()
if ss == nil {
if ss == nil { // client has been closed
if ss == nil { // client has been closed
}
err = c.newSession(ss)
if err == nil {
// send and receive messages
ss.(*session).run()
// Omit some code here

break
}
// don't distinguish between tcp connection and websocket connection. because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
ss.Conn().Close()
Close()
}

The connect() method gets a session connection via the dial() method into the dial() method:

func (c *client) dial() Session {
switch c.endPointType {
case TCP_CLIENT.
return c.dialTCP()
case UDP_CLIENT: return c.dialUDP()
return c.dialUDP()
case WS_CLIENT: return c.dialWS()
return c.dialWS()
case WSS_CLIENT: return c.dialWSS()
return c.dialWSS()
}

return nil
}

We're concerned with TCP connections, so we continue into the c.dialTCP() method:

func (c *client) dialTCP() Session {
var (
err error
conn net.
)

for {
if c.IsClosed() {
return nil
}
if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig ! = nil {
d := &net.Dialer{Timeout: connectTimeout}
// Establish an encrypted connection
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
}
} else {
// Establish a tcp connection
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
}
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err == nil {
// Return a TCPSession
return newTCPSession(conn, c)
}

log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}

At this point, we know how getty establishes a TCP connection and returns a TCPSession.

2. Sending and Receiving Messages

How does it send and receive messages? Let's go back to the connection method and look at the next line, which is ss.(*session).run(). After this line of code, the code is a very simple operation, so we guess that the logic of this line of code must include sending and receiving messages, and then go to the run() method:

func (s *session) run() {
// Omit some of the code

go s.handleLoop()
go s.handlePackage()
}

There are two goroutines up here, handleLoop and handlePackage, which literally match our guesses into the handleLoop() method:

func (s *session) handleLoop() {
// Omit some of the code

for {
// A select blocks until one of its cases is ready to run.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
It choose one at random if multiple are ready.
// Otherwise it choose default branch if none is ready.

case outPkg, ok = <-s.wQ.
// Omit some of the code

iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
// Encode interface{} type outPkg into binary bits via s.writer
pkgBytes, err = s.writer.Write(s, outPkg)
// Omit some of the code

iovec = append(iovec, pkgBytes)

// omit some code
}
// Send these binary bits out
err = s.WriteBytesArray(iovec[:]...)
if err ! = nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}

case <-wheel.After(s.period).
if flag {
if wsFlag {
err := wsConn.writePing()
if err ! = nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
// Logic for timed execution, heartbeat, etc.
s.listener.OnCron(s)
}
}
}
}

With the above code, it is easy to see that the handleLoop() method handles the logic of sending the message, which is encoded into binary bits by s.writer and then sent over the established TCP connection. This s.writer corresponds to the Writer interface, which is an interface that must be implemented by the RPC framework.

Moving on to the handlePackage() method:

func (s *session) handlePackage() {
// Omit some of the code

if _, ok := s.Connection.(*gettyTCPConn); ok {
if s.reader == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
log.Error(errStr)
panic(errStr)
}

err = s.handleTCPPackage()
} else if _, ok := s.Connection.(*gettyWSConn); ok {
err = s.handleWSPackage()
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
err = s.handleUDPPackage()
} else {
panic(fmt.Sprintf("unknown type session{%#v}", s))
}
}

Go to the handleTCPPackage() method:

func (s *session) handleTCPPackage() error {
// Omit some of the code

conn = s.Connection.(*gettyTCPConn)
for {
// omit some code

bufLen = 0
for {
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
// Receive a message from the TCP connection
bufLen, err = conn.recv(buf)
// Omit some of the code

break
}
// Omit part of the code

// Write the binary bits of the received message to pkgBuf
pktBuf.Write(buf[:bufLen])
for {
if pktBuf.Len() <= 0 {
Write(buf[:bufLen]) for { if pktBuf.
}
// Decode the received message into an RPC message via s.reader
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// Omit some of the code

s.UpdateActive()
// Put the received message into a TaskQueue for consumption by the RPC consumer.
s.addTask(pkg)
pktBuf.Next(pkgLen)
// continue to handle case 5
If exit { pktBuf.Next(pkgLen) // continue to handle case 5
if exit {
pktBuf.Next(pkgLen) // continue to handle case 5 } if exit {
}
}

return perrors.WithStack(err)
}

From the above code logic, we analyse that the RPC consumer needs to decode the binary bits received from the TCP connection into messages that can be consumed by RPC, and this work is implemented by s.reader, so we need to implement the Reader interface corresponding to s.reader in order to build the RPC communication layer.

3. How to decouple the underlying network message processing logic from the business logic

We all know that netty decouples the underlying network logic from the business logic through the boss thread and the worker thread. So how does getty do it?

At the end of the handlePackage() method, we see that the incoming message is put into the s.addTask(pkg) method, so let's move on:

func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if taskPool := s.EndPoint().GetTaskPool(); taskPool ! = nil {
taskPool.AddTaskAlways(f)
return
}
f()
}

The pkg argument is passed to an anonymous method that ends up in taskPool. This method is critical, and I ran into a pitfall later on when I wrote the seata-golang code, which is analysed later.

Next we look at the definition of taskPool:

// NewTaskPoolSimple builds a simple task pool.
func NewTaskPoolSimple(size int) GenericTaskPool {
if size < 1 {
size = runtime.NumCPU() * 100
NumCPU() * 100 }
return &taskPoolSimple{
work: make(chan task), sem: make(chan struct{task
sem: make(chan struct{}, size),
done: make(chan struct{}),
}
}

Builds a channel sem with a buffer size of size (defaults to runtime.NumCPU() * 100). Then look at the method AddTaskAlways(t task):

func (p *taskPoolSimple) AddTaskAlways(t task) {
select {
case <-p.done.
return
default.
}

select {
case p.work <- t.
return
default: }
}
select {
case p.work <- t: return default: }
case p.sem <- struct{}{}.
p.wg.Add(1)
go p.worker(t)
default.
goSafely(t)
}
}

When a task is added, it is consumed by len(p.sem) goroutines, and if no goroutine is free, a temporary goroutine is started to run t(). This is equivalent to having len(p.sem) goroutines to form a goroutine pool, and the goroutines in the pool process business logic instead of the goroutines that process network messages to run business logic, thus achieving decoupling. One of the pitfalls I encountered when writing seata-golang was that I forgot to set the taskPool, which resulted in the same goroutine handling the business logic and the underlying network message logic. When I blocked the business logic and waited for a task to complete, I blocked the entire goroutine, and I couldn't receive any messages during the blocking period.

4. Implementation

The following code is available at getty.go:

// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
Read(Session, []byte) (interface{}, int, error)
}

// Writer is used to marshal a pkg and write to session.
type Writer interface {
// If @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) ([]byte, error)
Write(Session, interface{}) ([]byte, error) }

// ReadWriter interface use for handle application packages.
type ReadWriter interface {
Writer
Writer
}
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(Session) error

OnOpen(Session) error // invoked when session closed.
EventListener { // invoked when session opened // If the return error is not nil, @Session will be closed.)

OnOpen(Session) error // invoked when session closed.
OnError(Session, error)

// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(Session)

// invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
If the message's length is greater than it, u should should return err in // Reader{Read} and getty will close this connection soon.
// If ur logic processing in this func
// If ur logic processing in this func will take a long time, u should start a goroutine
// If ur logic processing in this func will take a long time, u should start a goroutine pool (like working thread pool in cpp) to handle the processing asynchronously.
// can do the logic processing in other asynchronous way.
Or u // can do the logic processing in other asynchronous way. !In short, ur OnMessage callback func should return asap.
// In short, ur OnMessage callback func should return asap.
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}

By analysing the entire getty code, we only need to implement ReadWriter to encode and decode RPC messages, and then implement EventListener to handle the corresponding specific logic of RPC messages, and then inject the ReadWriter implementation and the EventLister implementation into the Client and Server sides of RPC, then we can implement RPC communication. Inject the ReadWriter implementation and EventLister implementation into the Client and Server side of RPC to achieve RPC communication.

4.1 Codec Protocol Implementation

The following is the definition of the seata protocol: ! [image-20201205214556457.png](https://cdn.nlark.com/yuque/0/2020/png/737378/1607180799872-5f96afb6-680d-4e69-8c95-b8fd1ac4c3a7.png #align=left&display=inline&height=209&margin=%5Bobject%20Object%5D&name=image-20201205214556457.png& originHeight=209&originWidth=690&size=18407&status=done&style=none&width=690)

In the ReadWriter interface implementation RpcPackageHandler, call the Codec method to codec the message body in the above format:

// Encode the message into binary bits
func MessageEncoder(codecType byte, in interface{}) []byte {
switch codecType {
case SEATA.
return SeataEncoder(in)
default.
log.Errorf("not support codecType, %s", codecType)
return nil
}
}

// Decode the binary bits into the message body
func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
switch codecType {
case SEATA.
return SeataDecoder(in)
default.
log.Errorf("not support codecType, %s", codecType)
return nil, 0
}
}

4.2 Client Side Implementation

Let's look at the client-side implementation of EventListener [RpcRemotingClient](https://github.com/opentrx/seata-golang/blob/dev/pkg/client/rpc_remoting_client. go):

func (client *RpcRemoteClient) OnOpen(session getty.Session) error {
go func()
request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.
ApplicationId: client.conf.
TransactionServiceGroup: client.conf.
}}
// Once the connection is established, make a request to the Transaction Coordinator to register the TransactionManager.
_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err == nil {
// Save the connection to the Transaction Coordinator in the connection pool for future use.
clientSessionManager.RegisterGettySession(session)
client.GettySessionOnOpenChannel <- session.RemoteAddr()
}
}()

return nil
}

// OnError ...
func (client *RpcRemoteClient) OnError(session getty.Session, err error) {
clientSessionManager.ReleaseGettySession(session)
}

// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {
clientSessionManager.ReleaseGettySession(session)
}

// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {
log.Info("received message:{%v}", pkg)
rpcMessage, ok := pkg.(clientRpcRemoteClient.Session, pkg interface{}) { log.Info("received message:{%v}", pkg)
if ok {
heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {
log.Debugf("received PONG from %s", session.RemoteAddr())
}
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)

// Process the transaction message, commit or rollback
client.onMessage(rpcMessage, session.RemoteAddr())
} else {
resp, loaded := client.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
client.futures.Delete(rpcMessage.Id)
}
}
}

// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
// Send a heartbeat
client.defaultSendRequest(session, protocal.HeartBeatMessagePing)
}

The logic of clientSessionManager.RegisterGettySession(session) is analysed in subsection 4.4.

4.3 Server-side Transaction Coordinator Implementation

See DefaultCoordinator for code:

func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {
log.Infof("got getty_session:%s", session.Stat())
error { log.Infof("got getty_session:%s", session.Stat())
}

func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {
// Release the TCP connection
SessionManager.ReleaseGettySession(session)
session.Close()
log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)
}

func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {
log.Info("getty_session{%s} is closing......" , session.Stat())
}

func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {
log.Debugf("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.)
RpcMessage) if ok {
_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)
if isRegTM {
// Map the TransactionManager information to the TCP connection.
coordinator.OnRegTmMessage(rpcMessage, session)
OnRegTmMessage(rpcMessage, session)
}

heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {
coordinator.OnCheckMessage(rpcMessage, session)
OnCheckMessage(rpcMessage, session)
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
_, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)
if isRegRM {
// Map the ResourceManager information to the TCP connection.
coordinator.OnRegRmMessage(rpcMessage, session)
} else {
if SessionManager.IsRegistered(session) {
if err := recover(); } else { if SessionManager.IsRegistered(session) {
if err := recover(); err ! = nil { log.Errorf(); err !
log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)
}
}()
// Handle transaction messages, global transaction registration, branch transaction registration, branch transaction commit, global transaction rollback, etc.
coordinator.OnTrxMessage(rpcMessage, session)
} else {
session.Close()
log.Infof("Close an unhandled connection! [%v]", session)
}
}
} else {
resp, loaded := coordinator.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
coordinator.futures.Delete(rpcMessage.Id)
}
}
}
}

func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {

}

coordinator.OnRegTmMessage(rpcMessage, session) registers the Transaction Manager, coordinator.OnRegRmMessage(rpcMessage, session) registers the Resource The logic is analysed in Section 4.4. The message enters the coordinator.OnTrxMessage(rpcMessage, session) method and is routed to the specific logic according to the message type code:

switch msg.GetTypeCode() {
case protocal.TypeGlobalBegin:
req := msg.(protocal.GlobalBeginRequest)
resp := coordinator.doGlobalBegin(req, ctx)
return resp
case protocal.TypeGlobalStatus.
TypeGlobalStatus. req := msg.(protocal.GlobalStatusRequest)
resp := coordinator.doGlobalStatus(req, ctx)
return resp
case protocal.TypeGlobalReport.
req := msg.(protocal.GlobalReportRequest)
resp := coordinator.doGlobalReport(req, ctx)
return resp
case protocal.TypeGlobalCommit.
req := msg.(protocal.GlobalCommitRequest)
resp := coordinator.doGlobalCommit(req, ctx)
return resp
case protocal.TypeGlobalRollback.
req := msg.(protocal.GlobalRollbackRequest)
resp := coordinator.doGlobalRollback(req, ctx)
return resp
case protocal.TypeBranchRegister.
TypeBranchRegister. req := msg.(protocal.BranchRegisterRequest)
resp := coordinator.doBranchRegister(req, ctx)
return resp
case protocal.TypeBranchStatusReport.
TypeBranchStatusReport: req := msg.(protocal.BranchReportRequest)
resp := coordinator.doBranchReport(req, ctx)
return resp
default.
return nil
}

4.4 Session Manager Analysis

After the Client establishes a connection with the Transaction Coordinator, it saves the connection in the map serverSessions = sync.Map{} by using clientSessionManager.RegisterGettySession(session). The key of the map is the RemoteAddress of the Transaction Coordinator obtained from the session, and the value is the session. This allows the Client to register the Transaction Manager and Resource Manager with the Transaction Coordinator through a session in the map. See [getty_client_session_manager.go]. (https://github.com/opentrx/seata-golang/blob/dev/pkg/client/getty_client_session_manager.go) After the Transaction Manager and Resource Manager are registered with the Transaction Coordinator, a connection can be used to send either TM messages or RM messages. We identify a connection with an RpcContext:

type RpcContext struct {
Version string
TransactionServiceGroup string
ClientRole meta.TransactionRole
ApplicationId string
ClientId string
ResourceSets *model.
Session getty.
Session }

When a transaction message is received, we need to construct such an RpcContext to be used by the subsequent transaction logic. So, we will construct the following map to cache the mapping relationships:

var (
// session -> transactionRole
// TM will register before RM, if a session is not the TM registered, // it will be the RM registered.
// it will be the RM registered
session_transactionroles = sync.Map{}

// session -> applicationId
identified_sessions = sync.Map{}

// applicationId -> ip -> port -> session
client_sessions = sync.Map{}

// applicationId -> resourceIds
client_resources = sync.Map{}
)

In this way, the Transaction Manager and Resource Manager are registered to the Transaction Coordinator via coordinator.OnRegTmMessage(rpcMessage, session) and coordinator.OnRegRmMessage(rpcMessage, session) respectively. session) are registered with the Transaction Coordinator, the relationship between applicationId, ip, port and session is cached in the above client_sessions map, and the relationship between applicationId, ip, port and resourceIds (an application may be able to register with the Transaction Coordinator) is cached in the client_resources map. and resourceIds (there may be multiple Resource Managers for an application) in the client_resources map. When needed, we can construct an RpcContext from these mappings, which is very different from the java version of seata, so if you're interested, you can dig a little deeper. See [getty_session_manager.go`]. (https://github.com/opentrx/seata-golang/blob/dev/tc/server/getty_session_manager.go) At this point, we have analysed seata-golang the entire mechanism of the RPC communication model.

III. The Future of seata-golang

The development of seata-golang started in April this year, and in August it basically realised the interoperability with the java version of seata 1.2 protocol. seata) protocol, implemented AT mode for mysql database (automatically coordinating the commit rollback of distributed transactions), implemented TCC mode, and used mysql to store data on the TC side, which turned TC into a stateless application to support high-availability deployment. The following figure shows the principle of AT mode: ! [image20201205-232516.png]( https://img.alicdn.com/imgextra/i3/O1CN01alqsQS1G2oQecFYIs_! !6000000000565-2-tps-1025-573.png)

There is still a lot of work to be done, such as support for the registry, support for the configuration centre, protocol interoperability with the java version of seata 1.4, support for other databases, implementation of the craft transaction coordinator, etc. We hope that developers interested in the distributed transaction problem can join in to build a perfect golang's distributed transaction framework.

If you have any questions, please feel free to join the group [group number 33069364]:

Author Bio

Xiaomin Liu (GitHubID dk-lockdown), currently working at h3c Chengdu, is good at using Java/Go language, and has dabbled in cloud-native and microservices related technologies, currently specialising in distributed transactions. Yu Yu (github @AlexStocks), dubbo-go project and community leader, a programmer with more than 10 years of frontline experience in server-side infrastructure R&D, has participated in the improvement of Muduo/Pika/Dubbo/Sentinel-go and other well-known projects, and is currently engaged in container orchestration and service mesh work in the Trusted Native Department of ants. Currently, he is working on container orchestration and service mesh in the Trusted Native Department of AntGold.

References

seata official: https://seata.apache.org

java version seata:https://github.com/apache/incubator-seata

seata-golang project address: https://github.com/apache/incubator-seata-go

seata-golang go night reading b站分享:https://www.bilibili.com/video/BV1oz411e72T

· 30 min read

In Seata version 1.3.0, data source auto-proxy and manual proxy must not be mixed, otherwise it will lead to multi-layer proxy, which will lead to the following problems:

  1. single data source case: cause branch transaction commit, undo_log itself is also proxied, i.e. generated undo_log for undo_log, assumed to be undo_log2, at this time, undo_log will be treated as a branch transaction; branch transaction rollback, because of the undo_log2 generated by the faulty in undo_log corresponding transaction branch rollback. When the branch transaction is rolled back, because there is a problem with the generation of undo_log2, when the transaction branch corresponding to the undo_log is rolled back, it will delete the undo_log associated with the business table, which will lead to the discovery that the business table corresponding to the business tableis rolled back and theundo_logdoesn't exist, and thus generate an additional status of 1 for theundo_log.' This time, the overall logic is already messed up, which is a very serious problem!
  2. multiple data sources and logical data sources are proxied case: in addition to the problems that will occur in the case of a single data source, may also cause deadlock problems. The reason for the deadlock is that for the undo_log operation, the select for update and delete operations that should have been performed in one transaction are spread out over multiple transactions, resulting in one transaction not committing after executing the select for update, and one transaction waiting for a lock when executing the delete until the timeout expires, and then the lock will not lock until the timeout expires. until it times out.

Proxy description

This is a layer of DataSource proxying that overrides some methods. For example, the getConnection method does not return a Connection, but a ConnectionProxy, and so on.

// DataSourceProxy

public DataSourceProxy(DataSource targetDataSource) {
this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
}

private void init(DataSource dataSource, String resourceGroupId) {
DefaultResourceManager.get().registerResource(this); }
}

public Connection getPlainConnection() throws SQLException {
return targetDataSource.getConnection(); } public Connection getPlainConnection(); return targetDataSource.
}

@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection(); } @Override public ConnectionProxy getConnection(); }
return new ConnectionProxy(this, targetConnection);
}

Manual Proxy

That is, manually inject a DataSourceProxy as follows

@Bean
public DataSource druidDataSource() {
return new DruidDataSource()
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource); }
}

AutoProxy

Create a proxy class for DataSource, get DataSourceProxy (create it if it doesn't exist) based on DataSource inside the proxy class, and then call the relevant methods of DataSourceProxy. The core logic is in SeataAutoDataSourceProxyCreator.

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final String[] excludes; private final Advisor advisor = new SeataAutoDataSourceProxyCreator.class
private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());

public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {
this.excludes = excludes;
setProxyTargetClass(!useJdkProxy);
}

@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<? > beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
return new Object[]{advisor};
}

@Override
protected boolean shouldSkip(Class<? > beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}
}

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
}
public Object invoke(MethodInvocation invocation) throws Throwable {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (m ! = null) {
return m.invoke(dataSourceProxy, args); } else { m.invoke(DataSourceProxy.class, method.getName(), method.getParameterTypes())
} else {
return invocation.proceed();
}
}

@Override
public Class<? >[] getInterfaces() {
return new Class[]{SeataProxy.class};
}
}

Data Source Multi-Level Proxy

@Bean.
@DependsOn("strangeAdapter")
public DataSource druidDataSource(StrangeAdapter strangeAdapter) {
druidDataSource(StrangeAdapter strangeAdapter) { doxx
return new DruidDataSource()
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource); }
}
  1. First we inject two DataSources into the configuration class: DruidDataSource and DataSourceProxy, where DruidDataSource is used as the targetDataSource attribute of DataSourceProxy and DataSourceProxy is used as the targetDataSource attribute of DruidDataSource. DataSourceProxyis declared using the@Primary` annotation.
  2. The application has automatic data source proxying enabled by default, so when calling methods related to DruidDataSource, a corresponding data source proxy DataSourceProxy2 will be created for DruidDataSource.
  3. What happens when we want to get a Connection in our application?
  4. first get a DataSource, because the DataSourceProxy is Primary, so we get a DataSourceProxy. 2. based on the DataSource, we create a corresponding DataSourceProxy2.
  5. get a Connection based on the DataSource, i.e. get the Connection through the DataSourceProxy. At this time, we will first call the getConnection method of targetDataSource, i.e. DruidDataSource, but since the cutover will intercept DruidDataSource, according to the interception logic in step 2, we can know that a DataSourceProxy2will be created automatically, and then call theDataSourceProxy2. Then call DataSourceProxy2#getConnection, and then call DruidDataSource#getConnection. This results in a two-tier proxy, and the returned Connectionis also a two-tierConnectionProxy`.

!

The above is actually the modified proxy logic, Seata's default autoproxy will proxy the DataSourceProxy again, the consequence is that there is one more layer of proxy at this time the corresponding diagram is as follows

!

The two problems that can result from multiple layers of proxies for a data source are summarised at the beginning of the article, with case studies below.

Branching Transaction Commits

What happens when the corresponding method is executed through the ConnectionProxy? Let's take an example of a branching transaction commit involving an update operation:

  1. Execute ConnectionProxy#prepareStatement, which returns a PreparedStatementProxy.
  2. Execute PreparedStatementProxy#executeUpdate, PreparedStatementProxy#executeUpdate will probably do two things: execute the business SQL and commit the undo_log.

Commit business SQL

// ExecuteTemplate#execute
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT.
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementLoader.load(InsertExecutor.class, dbType)) { case INSERT.
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new
new Object[]{statementProxy, statementCallback, sqlRecognizer});
statementProxy, statementCallback, sqlRecognizer}); break;
case UPDATE: executor = new UpdateExecutor
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case UPDATE.
break;
case DELETE.
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE.
break; case SELECT_FOR_UPDATE.
case SELECT_FOR_UPDATE: executor = new SelectForUpdate.
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE.
break; break
default: executor = new PlainExecutor
executor = new PlainExecutor<>(statementProxy, statementCallback); break; default.
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } else { executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); }
}

The main process is: first execute the business SQL, then execute the commit method of the ConnectionProxy, in which the corresponding undo_log SQL will be executed for us, and then commit the transaction.

PreparedStatementProxy#executeUpdate =>
ExecuteTemplate#execute =>
BaseTransactionalExecutor#execute =>
AbstractDMLBaseExecutor#doExecute =>
AbstractDMLBaseExecutor#executeAutoCommitTrue =>
AbstractDMLBaseExecutor#executeAutoCommitFalse => In this step, the statementCallback#execute method will be triggered, i.e. the native PreparedStatement#executeUpdate method will be called.
ConnectionProxy#commit
ConnectionProxy#processGlobalTransactionCommit

UNDO_LOG insert

// ConnectionProxy#processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
try {
// Register for a branch transaction, simply understand that a request is sent to the server, and then the server inserts a record into the branch_table table.
register();
} catch (TransactionException e) {
// If there is no for update sql, it will register directly before commit, then not only insert a branch record, but also lock information for the competition, the following exception is generally thrown in the registration did not get the lock, generally is pure update statement concurrency will trigger the competition lock failure exception @FUNKYE
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// undo_log handling, expect targetConnection handling @1
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // Commit local transaction, expect targetConnection.

// Commit the local transaction, expecting it to be handled by targetConnection @2
targetConnection.commit(); } catch (Throwable ex)
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); report(false); }
report(false); } catch (Throwable ex); }
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true); }
}
context.reset();
}
  1. undo_log processing @1, parses the undo_log involved in the current transaction branch and writes it to the database using TargetConnection.
   public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}

String xid = connectionContext.getXid(); long branchId = connectionContext.hasUndoLog(); { return; }
long branchId = connectionContext.getBranchId(); }

BranchUndoLog branchUndoLog = new BranchUndoLog(); branchUndoLog.setBranchId = connectionContext.getBranchId(); }
branchUndoLog.setXid(xid); branchUndoLog.
branchUndoLog.setBranchId(branchId); branchUndoLog.
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}

insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,cp.getTargetConnection());
}
  1. Commit local transaction @2, i.e., commit the transaction via TargetConnection. That is, the same TargetConnection is used for service SQL execution, undo_log write, and i.e. transaction commit.

lcn's built-in database solution, lcn is to write undolog to his embedded h2 (I forget if it's this one) database, at this time it will become two local transactions, one is h2's undolog insertion transaction, one is the transaction of the business database, if the business database is abnormal after the insertion of the h2, lcn's solution will be data redundancy, roll back the data. data is the same, delete undolog and rollback business data is not a local transaction. But the advantage of lcn is the invasion of small, do not need to add another undolog table. Thanks to @FUNKYE for the advice, I don't know much about lcn, I'll look into it when I get a chance!

Branch Transaction Rollback

  1. Server sends a rollback request to Client. 2.
  2. Client receives the request from Server, and after a series of processing, it ends up in the DataSourceManager#branchRollback method. 3.
  3. first according to the resourceId from the DataSourceManager.dataSourceCache to get the corresponding DataSourceProxy, at this time for the masterSlaveProxy (rollback stage we do not test the proxy data source, simple and direct, anyway, the final get all the TragetConnection)
  4. According to the xid and branchId sent from the Server side to find the corresponding undo_log and parse its rollback_info attribute, each undo_log may be parsed out of more than one SQLUndoLog, each SQLUndoLog can be interpreted as an operation. For example, if a branch transaction updates table A and then table B, the undo_log generated for the branch transaction contains two SQLUndoLogs: the first SQLUndoLog corresponds to the snapshot before and after the update of table A; the second SQLUndoLog corresponds to the snapshot before and after the update of table B.
  5. for each SQLUndoLog execute the corresponding rollback operation, for example, a SQLUndoLog corresponds to the operation INSERT, then its corresponding rollback operation is DELETE.
  6. Delete the undo_log based on the xid and branchId.
// AbstractUndoLogManager#undo removes some non-critical code

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true; for (; ; ) {

for (; ; ) {
try {
// Get the connection to the original data source, we don't care about the proxy data source in the rollback phase, we'll end up with the TargetConnection.
conn = dataSourceProxy.getPlainConnection(); // Get the connection to the native data source.

// Put the rollback operation in a local transaction and commit it manually, making sure that the final business SQL operation is committed along with the undo_log delete operation.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}

// Query undo_log based on xid and branchId, note the SQL statement SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId); selectPST.setString(1, branchId); selectPST.setString(1, branchId)
selectPST.setString(2, xid);
rs = selectPST.executeQuery(); boolean exists = false; rs = selectPST.

boolean exists = false; while (rs.next())
while (rs.next()) {
exists = true; boolean exists = false; while (rs.next()) {
// status == 1 undo_log is not processed, related to anti-suspension
if (!canUndo(state)) {
return; }
}

// Parsing the undo_log
byte[] rollbackInfo = getRollbackInfo(rs); // Parsing the undo_log.
BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance(serialiser).decode(rollbackInfo);
try {
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); if (sqlUndoLog.getSqlUndoLogs(parser.getName()); } }
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
// Execute the corresponding rollback operation
undoExecutor.executeOn(conn);
}
}
}

// If (exists) { undoExecutor.executeOn(conn); }
if (exists) {
LOGGER.error("\n delete from undo_log where xid={} AND branchId={} \n", xid, branchId);
deleteUndoLog(xid, branchId, conn);
conn.commit();
// and anti-suspension related If no undo_log is found based on xid and branchId, it means that there is an exception in the branch transaction: for example, the business process timed out, resulting in a global transaction rollback, but the business undo_log was not inserted at that time.
} else {
LOGGER.error("\n insert into undo_log xid={},branchId={} \n", xid, branchId);
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
}
return; }
} catch (Throwable e) {
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,branchId, e.getMessage()), e); }
}
}
}

There are several notes:

  1. rollback does not take into account data source proxying, and ends up using TargetConnection.
  2. set atuoCommit to false, i.e. you need to commit the transaction manually
  3. for update is added when querying the undo_log based on xid and branchId, i.e. the transaction will hold the lock for this undo_log until all rollbacks are complete, as it is not until they are done that the

Multi-Tier Proxy Issues

Several issues that can be caused by multi-tier proxying of data sources have been mentioned at the beginning of the article, focusing on analysing why the above issues are caused:

Impact on branch transaction commits

Let's start by analysing what happens if we use a two-tier proxy. Let's analyse it from two aspects: business SQL and undo_log

  1. business SQL
   PreparedStatementProxy1.executeUpdate =>
statementCallback#executeUpdate(PreparedStatementProxy2#executeUpdate) =>
PreparedStatement#executeUpdate

It doesn't seem to matter, it's just an extra loop, and it's still executed through PreparedStatement in the end.

  1. undo_log
ConnectionProxy1#getTargetConnection ->
ConnectionProxy2#prepareStatement ->
PreparedStatementProxy2#executeUpdate ->
PreparedStatement#executeUpdate (native undo_log write, before generating undo_log2 (the undo_log of undo_log) for that undo_log) ->
ConnectionProxy2#commit ->
ConnectionProxy2#processGlobalTransactionCommit(write undo_log2) ->
ConnectionProxy2#getTargetConnection ->
TargetConnection#prepareStatement ->
PreparedStatement#executeUpdate

Impact on branch transaction rollback

Why isn't the undo_log deleted after a transaction rollback?

It is not actually not deleted. As I said before, the two-tier proxy causes the undo_log to be treated as a branch transaction, so it generates an undo_log for that undo_log (assuming it's undo_log2), and undo_log2 is generated wrongly (which is fine, it should be generated this way), which results in the business-table-associated undo_log being deleted when rolling back. This leads to a rollback that deletes the undo_log associated with the business table, which ultimately leads to the business table corresponding to the transaction branch rolling back to find that the undo_log does not exist, thus generating one more undo_log with a status of 1.

Before the rollback

// undo_log
84 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.1KB 0
85 59734075254222849 172.16.120.59:23004:59734061438185472 serializer=jackson 4.0KB 0

// branch_table
59734070967644161 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware
59734075254222849 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware

// lock_table
jdbc:mysql://xx^^^seata_storage^^^1 59734070967644161 jdbc:mysql://172.16.248.10:3306/tuya_middleware seata_storage 1
jdbc:mysql://xx^^^^undo_log^^^^84 59734075254222849 jdbc:mysql://172.16.248.10:3306/tuya_middleware undo_log 84

After the rollback

// An undo_log with status 1 was generated, corresponding to the log: undo_log added with GlobalFinished
86 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.0Byte 1

Problem analysis

  1. find the corresponding undo_log log based on xid and branchId
  2. parse the undo_log, mainly its rollback_info field, rollback_info is a SQLUndoLog collection, each SQLUndoLog corresponds to an operation, which contains a snapshot before and after the operation, and then perform a corresponding rollback
  3. Delete undo_log logs based on xid and branchId.

Because of the two-tier proxy problem, an undo_log becomes a branch transaction, so when a rollback occurs, we also need to rollback the undo_log branch transaction: 1, first according to the xid and branchId to find the corresponding undo_log and parse its rollback_info attribute, here the parsed rollback_info contains two SQLUndoLog. Why are there two?

If you think about it, you can understand that the first layer of proxy operations on seata_storage are put into the cache, which should be cleared after execution, but because of the two-tier proxy, the process is not finished at this time. When it's the second tier proxy's turn to operate on undo_log, it puts that operation into the cache, and at that point there are two operations in the cache, UPDATE for seata_storage and INSERT for undo_log. So it's easy to see why the undo_log operation is extra large (4KB) because it has two operations in its rollback_info.

One thing to note is that the first SQLUndoLog corresponds to the after snapshot, which has branchId=59734070967644161 pk=84, i.e., branchIdcorresponding to theseata_storage branch and undo_log corresponding to the seata_storage PK. In other words, the undo_log rollback deletes the seata_storage corresponding undo_log`. How to delete the undo_log itself? In the next logic, it will be deleted according to xid and branchId.

  1. Parsing the first SQLUndoLog, it corresponds to the INSERToperation ofundo_log, so its corresponding rollback operation is DELETE. Because undo_logis treated as a business table at this point. So this step will delete the59734075254222849corresponding to theundo_log, **but this is actually the corresponding business table corresponding to the corresponding undo_log`**.

3, parse the second SQLUndoLog, at this time corresponds to the seata_storage UPDATE operation, this time will be through the snapshot of the seata_storage corresponding to the recovery of records

4、Delete the undo_log log according to xid and branchId, here the deletion is the undo_log of undo_log , i.e. undo_log2. So, by this point, both undo_logs have been deleted.

  1. Next, roll back seata_storage, because at this time its corresponding undo_log has been deleted in step 2, so at this time can not check the undo_log, and then regenerate a status == 1 undo_log.

Case Study

Background

  1. Three data sources are configured: two physical data sources and one logical data source, but the corresponding connection addresses of the two physical data sources are the same. Is this interesting?
@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}

@Bean("dsSlave")
DynamicDataSource dsSlave() {
return new DynamicDataSource(slaveDsRoute); }
}

@Primary
@Bean("masterSlave")
DataSource masterSlave(@Qualifier("dsMaster") DataSource dataSourceMaster,
@Qualifier("dsSlave") DataSource dataSourceSlave) throws SQLException {
Map<String, DataSource> dataSourceMap = new HashMap<>(2);
// Master database
dataSourceMap.put("dsMaster", dataSourceMaster);
//slave
dataSourceMap.put("dsSlave", dataSourceSlave); // Configure read/write separation rules.
// Configure read/write separation rules
MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(
"masterSlave", "dsMaster", Lists.newArrayList("dsSlave")
);
Properties shardingProperties = new Properties();
shardingProperties.setProperty("sql.show", "true");
shardingProperties.setProperty("sql.simple", "true");
// Get the data source object
DataSource dataSource = MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, masterSlaveRuleConfig, shardingProperties);
log.info("datasource initialised!");
return dataSource;˚
}

!

2, open seata's data source dynamic proxy, according to seata's data source proxy logic can be known, will eventually generate three proxy data sources, the relationship between the native data source and the proxy data source is cached in the DataSourceProxyHolder.dataSourceProxyMap, if the native data source and the proxy data source corresponds to the relationship is as follows:

dsMaster(DynamicDataSource) => dsMasterProxy(DataSourceProxy)
dsSlave(DynamicDataSource) => dsSlaveProxy(DataSourceProxy)
masterSlave(MasterSlaveDataSource) => masterSlaveProxy(DataSourceProxy)

So, ultimately, the three data sources that exist in the IOC container are: dsMasterProxy, dsSlaveProxy, masterSlaveProxy. According to the @Primary feature, when we get a DataSource from the container, the default data source returned is the proxy masterSlaveProxy.

I haven't studied shardingjdbc specifically, but just guessed its working mechanism based on the code I saw during the debug.

masterSlaveProxy can be seen as MasterSlaveDataSource wrapped by DataSourceProxy. We can venture to guess that MasterSlaveDataSource is not a physical data source, but a logical data source, which can simply be thought of as containing routing logic. When we get a connection, we will use the routing rules inside to select a specific physical data source, and then get a real connection through that physical data source. The routing rules should be able to be defined by yourself. According to the phenomenon observed when debugging, the default routing rules should be:

  1. for select read operations, will be routed to the slave library, that is, our dsSlave

  2. for update write operations, will be routed to the master library, that is, our dsMaster

  3. When each DataSourceProxy is initialised, it will parse the connection address of that real DataSource, and then maintain that connection address and the DataSourceProxy itself in DataSourceManager.dataSourceCache. The DataSourceManager.dataSourceCache is used for rollback: when rolling back, it finds the corresponding DataSourceProxy based on the connection address, and then does the rollback operation based on that DataSourceProxy. But we can find this problem, these three data sources are resolved to the same connection address, that is, the key is duplicated, so in the DataSourceManager.dataSourceCache, when the connection place is the same, after the registration of the data source will overwrite the existing one. That is: DataSourceManager.dataSourceCache ultimately exists masterSlaveProxy, that is to say, will ultimately be rolled back through the masterSlaveProxy, this point is very important.

4, the table involved: very simple, we expect a business table seata_account, but because of the duplicate proxy problem, resulting in seata will undo_log also as a business table

  1. seata_account
  2. undo_log

OK, here's a brief background, moving on to the Seata session

Requirements

We have a simple requirement to perform a simple update operation inside a branch transaction to update the count value of seata_account. After the update, manually throw an exception that triggers a rollback of the global transaction. To make it easier to troubleshoot and reduce interference, we use one branch transaction in the global transaction and no other branch transactions.SQL is as follows.

update seata_account set count = count - 1 where id = 100;

Problems

Client: In the console log, the following logs are printed over and over again

  1. the above logs are printed at 20s intervals, and I checked the value of the innodb_lock_wait_timeout property of the database, and it happens to be 20, which means that every time a rollback request comes through, the rollback fails because of the timeout for acquiring the lock (20).
  2. Why is it not printed once after 20s? Because the server side will have a timer to process the rollback request.
// Branch rollback starts
Branch rollback start: 172.16.120.59:23004:59991911632711680 59991915571163137 jdbc:mysql://172.16.248.10:3306/tuya_middleware

// undo_log transaction branch The original action corresponds to insert, so it rolls back to delete.
undoSQL undoSQL=DELETE FROM undo_log WHERE id = ? and PK=[[id,139]]
// Since the corresponding operation of the first-level agent is also in the context, when the undo_log branch transaction commits, the corresponding undo_log contains two actions
undoSQL undoSQL=UPDATE seata_account SET money = ? WHERE id = ? and PK=[[id,1]].

// After the branch transaction has been rolled back, delete the corresponding undo_log for that branch transaction
delete from undo_log where xid=172.16.120.59:23004:59991911632711680 AND branchId=59991915571163137

// Threw an exception indicating that the rollback failed because `Lock wait timeout exceeded`, and failed when deleting the undo_log based on the xid and branchId because a lock acquisition timeout occurred, indicating that there was another operation that held a lock on the record that was not released.
branchRollback failed. branchType:[AT], xid:[172.16.120.59:23004:59991911632711680], branchId:[59991915571163137], resourceId:[jdbc. mysql://172.16.248.10:3306/tuya_middleware], applicationData:[null]. reason:[Branch session rollback failed and try again later xid = 172.16.120.59:23004:59991911632711680 branchId = 59991915571163137 Lock wait timeout exceeded; try restarting transaction]

Server: the following log is printed every 20s, indicating that the server is constantly retrying to send a rollback request

Rollback branch transaction failed and will retry, xid = 172.16.120.59:23004:59991911632711680 branchId = 59991915571163137

The SQL involved in the process is roughly as follows:

1. SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE slaveDS
2. SELECT * FROM undo_log WHERE (id ) in ( (?) ) slaveDS
3. DELETE FROM undo_log WHERE id = ? masterDS
4. SELECT * FROM seata_account WHERE (id ) in ( (?) ) masterDS
5. UPDATE seata_account SET money = ? WHERE id = ? masterDS
6. DELETE FROM undo_log WHERE branch_id = ? AND xid = ? masterDS

At this point, check the database transaction status, lock status, lock wait relationship 1, check the current transaction being executed

SELECT * FROM information_schema.INNODB_TRX.

!

  1. Check the current lock status
SELECT * FROM information_schema.INNODB_LOCKs;

!

  1. Check the current lock wait relationship
SELECT * FROM information_schema.INNODB_LOCK_waits;

SELECT
block_trx.trx_mysql_thread_id AS sessionID that already holds a lock, request_trx.
request_trx.trx_mysql_thread_id AS the sessionID that is requesting the lock,
block_trx.trx_query AS the SQL statement that already holds the lock, request_trx.
request_trx.trx_query AS the SQL statement for which the lock is being requested,
waits.blocking_trx_id AS Transaction ID that already holds the lock, waits.requesting_trx.trx_query
waits.requesting_trx_id AS 正在申请锁的事务ID,
waits.requested_lock_id AS the ID of the lock object, waits.
locks.lock_table AS lock_table, -- table locked by the lock object
locks.lock_type AS lock_type, -- lock type
locks.lock_mode AS lock_mode -- lock mode
FROM
information_schema.innodb_lock_waits AS waits
INNER JOIN information_schema.innodb_trx AS block_trx ON waits.blocking_trx_id = block_trx.trx_id
INNER JOIN information_schema.innodb_trx AS request_trx ON waits.requesting_trx_id = request_trx.trx_id
INNER JOIN information_schema.innodb_locks AS locks ON waits.requested_lock_id = locks.lock_id;

!

  1. the record involved is branch_id = 59991915571163137 AND xid = 172.16.120.59:23004:59991911632711680.
  2. transaction ID 1539483284 holds the lock for this record, but its corresponding SQL is empty, so it should be waiting for a commit.
  3. transaction ID 1539483286 is trying to acquire a lock on this record, but the logs show that it is waiting for a lock timeout.

Probably a good guess is that select for update and delete from undo ... are in conflict. According to the logic in the code, these two operations should have been committed in a single transaction, so why have they been separated into two transactions?

Problem Analysis

In conjunction with the rollback process described above, let's look at what happens during the rollback of our example.

  1. first get the data source, at this time dataSourceProxy.getPlainConnection() to get the MasterSlaveDataSource data source
  2. during the select for update operation, get a Connection from the MasterSlaveDataSource, as I said before, the MasterSlaveDataSource is a logical datasource, which has a routing logic, according to the above, this time we get the dsSlave's Connection, and then we get the ddsSlave's Connection. dsSlave's Connection`.
  3. When executing the delete from undo ... 3. When performing the delete from undo ...' operation, you get the Connection from the `dsMaster'.
  4. Although dsSlave and dsMaster correspond to the same address, they must be getting different connections, so the two operations must be spread across two transactions.
  5. the transaction that executes select for update will wait until the deletion of the undo_log is complete before committing.
  6. the transaction that executes delete from undo ... The transaction executing delete from undo ...' waits for the select for update transaction to release the lock.
  7. Typical deadlock problem

Verify the conjecture

I tried to verify this problem in two ways:

  1. change the Seata code from select for update to select, then the query to undo_log does not need to hold a lock on the record, and will not cause a deadlock.

  2. change the data source proxy logic, this is the key to the problem, the main cause of the problem is not select for update. The main cause of the problem is not select for update. The multi-layer proxy problem has already been created before that, and then it will cause the deadlock problem. We should never have proxied the masterSlave datasource in the first place. It's just a logical data source, so why proxy it? If we proxy the masterSlave, we won't cause multiple layers of proxies, and we won't cause the deadlock problem when deleting the undo_log!

Final implementation

masterSlave is also a DataSource type, how to proxy just dsMaster and dsSlave but not masterSlave? Observing the SeataAutoDataSourceProxyCreator#shouldSkip method, we can solve this problem with the excludes attribute of the EnableAutoDataSourceProxy annotation

@Override
protected boolean shouldSkip(Class<? > beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}

i.e.: turn off the data source autoproxy, then add this annotation to the startup class

@EnableAutoDataSourceProxy(excludes = {"org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource"})

Autoproxy optimisation in new releases

Since Seata 1.4.0 has not been officially released yet, I'm currently looking at the 1.4.0-SNAPSHOT version of the code, which is the latest code in the ddevelop branch at the current time

Code changes

The main changes are as follows, but I won't go into too much detail on the minor ones:

  1. DataSourceProxyHolder adjustment
  2. DataSourceProxy adjustment
  3. SeataDataSourceBeanPostProcessor is added.

DataSourceProxyHolder

The most significant of the changes to this class are to its putDataSource method

public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource; if (dataSource instanceof SeataDataSource)
if (dataSource instanceof SeataDataSourceProxy) {
SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;
// If this is a proxy data source and it is the same as the current application's configured data source proxy mode (AT/XA), then return it directly
if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
return (SeataDataSourceProxy)dataSource; }
}

// If it's a proxy data source, and the data source proxy mode (AT/XA) is different from the one configured by the current application, then you need to get its TargetDataSource and create a proxy data source for it.
originalDataSource = dataSourceProxy.getTargetDataSource(); } else { dataSourceProxy.getTargetDataSource()
} else {
originalDataSource = dataSource; } else { originalDataSource = dataSource.
}

// If necessary, create a proxy data source based on the TargetDataSource.
return this.dataSourceProxyMap.computeIfAbsent(originalDataSource, originalDataSource, BranchType.
BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new); }
}

The DataSourceProxyHolder#putDataSource method is used in two main places: in the SeataAutoDataSourceProxyAdvice cutout; and in the SeataDataSourceBeanPostProcessor. What problem does this judgement solve for us? The problem of multi-tier proxying of data sources. Think about the following scenarios with automatic data source proxying turned on:

  1. If we manually injected a DataSourceProxy into our project, a call to the DataSourceProxyHolder#putDataSource method in a cutover would return the DataSourceProxy itself directly, without creating another ` DataSourceProxy
  2. if we manually inject a DruidSource into the project, then the DataSourceProxyHolder#putDataSource method will create another DataSourceProxy for it and return it when it is called from the facet.

It looks like the problem is solved, but is it possible that there are other problems? Take a look at the following code

@Bean
public DataSourceProxy dsA(){
return new DataSourceProxy(druidA)
}

@Bean
public DataSourceProxy dsB(DataSourceProxy dsA){
return new DataSourceProxy(dsA)
}
  1. this is definitely wrong, but you can't help it if he wants to write it this way
  2. there's nothing wrong with dsA, but dsB still has a double proxy problem, because the TargetDataSource of dsB is dsA.
  3. This brings us to the DataSourceProxy change.

DataSourceProxy

public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
// The following judgement ensures that we don't have a two-tier proxy problem even when we pass in a DataSourceProxy
if (targetDataSource instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
}
this.targetDataSource = targetDataSource;
init(targetDataSource, resourceGroupId);
}

SeataDataSourceBeanPostProcessor

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);

......

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy. if (!excludes.contains.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}

//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy."); return ((SeataDataSourceProxy); } }
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean.
}
}
  1. SeataDataSourceBeanPostProcessor implements the BeanPostProcessor interface, which executes the BeanPostProcessor#postProcessAfterInitialization method after a bean is initialised. That is, in the postProcessAfterInitialization method, the bean is already available at this point.
  2. Why provide such a class? From its code, it is just to initialise the corresponding DataSourceProxy for the data source after the bean has been initialised, but why is this necessary?

Because some data sources may not be initialised (i.e. the relevant methods of the data source will not be called) after the application is started. If the SeataDataSourceBeanPostProcessor class is not provided, then the DataSourceProxyHolder#putDataSource method will only be triggered in the SeataAutoDataSourceProxyAdvice cutout. If a client goes down during the rollback, after restarting, the Server sends it a rollback request via a timed task, at which point the client needs to first find the corresponding DatasourceProxy based on the rsourceId (connection address). However, if the client hasn't triggered the data source's related methods before then, it won't enter the SeataAutoDataSourceProxyAdvice cutover logic, and won't initialise the corresponding DataSourceProxy for the data source, which will result in the failure of the rollback.

Multi-Layer Proxy Summary

Through the above analysis, we probably already know some optimisations of seata in avoiding multi-layer proxies, but there is actually one more issue to pay attention to:** Logical data source proxies** !

The calling relationship at this point is: masterSlaveProxy -> masterSlave -> masterproxy/slaveProxy -> master/slave

At this point you can exclude the logical datasource via the excludes attribute so that no datasource proxy is created for it.

To summarise:

  1. when initialising the corresponding DataSourceProxy for a DataSource, determine whether it is necessary to create a corresponding DataSourceProxy for it, and if it is a DataSourceProxy itself, return it directly.
  2. For the case of manual injection of some DataSource, in order to avoid the problem of multi-layer proxy caused by human error, we add a judgement in the constructor of DataSourceProxy, If the input parameter TragetDatasource is a DataSourceProxy itself, then we get the target attribute of TragetDatasource as the target attribute of the new DataSourceProxy. TragetDatasource of the new DataSourceProxy.
  3. for other cases, such as logical data source proxy issues, add exclusions to the excludes attribute to avoid creating a DataSourceProxy for the logical data source.

Suggestions for using global and local transactions

There is a question, if there are multiple DB operations involved in a method, say 3 update operations are involved, do we need to use @Transactional annotation in spring for this method? We consider this question from two perspectives: without @Transactional annotation and with @Transactional annotation.

Not using the @Transactional annotation

  1. in the commit phase, since the branch transaction has 3 update operations, each time the update operation is executed, a branch transaction will be registered with the TC through the data broker and a corresponding undo_log will be generated for it, so that the 3 update operations will be treated as 3 branch transactions
  2. In the rollback phase, the three branch transactions need to be rolled back.
  3. data consistency is ensured by the seata global transaction.

Use the @Transactional annotation.

  1. in the commit phase, the three update operations are committed as one branch transaction, so only one branch transaction will be registered in the end
  2. in the rollback phase, 1 branch transaction needs to be rolled back.
  3. data consistency: the 3 update operations are guaranteed by the consistency of the local transaction; global consistency is guaranteed by the seata global transaction. At this point, the 3 updates are just a branch transaction.

Conclusion

Through the above comparison, the answer is obvious, the reasonable use of local transactions can greatly improve the processing speed of global transactions. The above is just 3 DB operations, what if there are more DB operations involved in a method, then the difference between the two ways is not greater?

Finally, thanks to @FUNKYE for answering a lot of questions and providing valuable suggestions!

· 11 min read

【Distributed Transaction Seata source code interpretation II】 Client-side startup process

In this paper, we analyse the Client-side startup process in AT mode from the source code point of view, the so-called Client-side, i.e. the business application side. Distributed transactions are divided into three modules: TC, TM, RM, where TC is located in the seata-server side, while TM, RM through the SDK way to run in the client side.

The following figure shows a distributed transaction scenario of Seata's official demo, divided into the following several microservices, which together implement a distributed transaction of placing an order, deducting inventory, and deducting balance.

  • **BusinessService: ** business service, the entrance to the order placing service
  • StorageService: Inventory microservice, used to deduct the inventory of goods
  • OrderService: Order microservice, to create orders
  • AccountService: Account microservice, debits the balance of the user's account

! [Insert image description here](https://img-blog.csdnimg.cn/20200820184156758.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10, text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTE0NTg0OA==,size_16,colour_FFFFFF,t_70#pic_center)

It can also be seen from the above figure that in AT mode Seata Client side implements distributed transactions mainly through the following three modules:

  • GlobalTransactionScanner: GlobalTransactionScanner is responsible for initialising the TM, RM module and adding interceptors for methods that add distributed transaction annotations, the interceptors are responsible for the opening, committing or rolling back of the global transaction
  • DatasourceProxy: DatasourceProxy for DataSource to add interception , the interceptor will intercept all SQL execution , and as RM transaction participant role in the distributed transaction execution .
  • Rpc Interceptor: In the previous article Distributed Transaction Seata Source Code Interpretation I there are a few core points of distributed transaction mentioned, one of which is Cross-Service Instance Propagation of Distributed Transactions The Rpc Interceptor is responsible for propagating transactions across multiple microservices.

seata-spring-boot-starter

There are two ways to refer to the seata Distributed Transaction SDK, relying on seata-all or seata-spring-boot-starter. It is recommended to use the seata-spring-boot-starter because the starter has automatically injected the three modules mentioned above, and the user only needs to add the corresponding configuration in the business code to add a global distributed transaction annotation can be. Here's how to start with the code in the seata-spring-boot-starter project:

The following figure shows the project structure of seata-spring-boot-starter: ! [Insert image description here](https://img-blog.csdnimg.cn/20200810204518853.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10, text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTE0NTg0OA==,size_16,colour_FFFFFF,t_70) It is mainly divided into the following modules:

  • properties: The properties directory contains the configuration classes that Springboot adapts to seata, i.e., you can use SpringBoot's configuration to configure the parameters of seata.
  • provider: The classes in the provider directory are responsible for adapting Springboot and SpringCloud configurations to the Seata configuration.
  • resources: There are two main files in the resources directory, spring.facts for registering Springboot auto-assembly classes and ExtConfigurationProvider for registering the SpringbootConfigurationProvider class, the Provider class is responsible for adapting SpringBoot related configuration classes to Seata.

For the springboot-starter project, let's first look at the resources/META-INF/spring.factors file:

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration

You can see that the autoconfiguration class is configured in spring.facts: SeataAutoConfiguration, in which two instances of GlobalTransactionScanner and seataAutoDataSourceProxyCreator are injected. The code is as follows:

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled",
havingValue = "true",
matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {

...

// GlobalTransactionScanner is responsible for adding interceptors to methods that add the GlobalTransaction annotation.
// and is responsible for initialising the RM, TM
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties,
FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(),
seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler); }
failureHandler); }
}

// The SeataAutoDataSourceProxyCreator is responsible for generating proxies for all DataSources in Spring.
// This enables the interception of all SQL execution.
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {
"enableAutoDataSourceProxy", "enable-auto" +
"-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(), seataProperties.getExpressionCreator(seataProperties.getExpressionCreator))
seataProperties.getExcludesForAutoProxying());
}
}

GlobalTransactionScanner

GlobalTransactionScanner inherits from AutoProxyCreator, which is a way to implement AOP in Spring to intercept all instances in Spring and determine whether they need to be proxied. Below is a list of some of the more important fields in GlobalTransactionScanner and the core methods for intercepting proxies:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitialisingBean, ApplicationContextAware,
DisposableBean {
...
// The interceptor field is the interceptor corresponding to a proxy object.
// It can be thought of as a temporary variable with an expiration date of a proxied object.
private MethodInterceptor interceptor; // globalTransactionalInterceptor.

// globalTransactionalInterceptor is the generic Interceptor.
// It is used by all non-TCC transactional methods.
private MethodInterceptor globalTransactionalInterceptor; // PROXYED_SETTING_OBJECT

// PROXYED_SET stores instances that have already been proxied to prevent duplicate processing.
private static final Set<String> PROXYED_SET = new HashSet<>(); // applicationId is the name of a service.

// applicationId is a unique identifier for a service.
// corresponds to spring.application.name in the springcloud project
private final String applicationId; // The group identifier of the transaction.
// Grouping identifier for the transaction, refer to the wiki article: https://seata.apache.org/zh-cn/docs/user/txgroup/transaction-group/
private final String txServiceGroup; // The group identifier of the transaction.

...

// Determine whether the target object needs to be proxied, and if so, generate an interceptor and assign it to the class variable interceptor.
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// Determine if distributed transactions are disabled
if (disableGlobalTransaction) {
return bean; }
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean; }
}

// Each time a proxied object is processed, the intermediary is set to null, so the intermediary's // lifecycle is that of a proxied object.
// lifecycle is a proxied object, and since the intermediary is used in a separate method, getAdvicesAndAdvisorsForBean
// Since the interceptor is used in a separate method getAdvicesAndAdvisorsForBean, the interceptor is defined as a class variable
interceptor = null; // Determine if this is a TCC transaction.

// Determine whether this is TCC transaction mode, primarily based on the presence of the TwoPhaseBusinessAction annotation on the method
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName,
applicationContext)) {
// Create an interceptor for the TCC transaction
interceptor =
new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
// Get the class type of the object to be processed
Class<? > serviceInterface = SpringProxyUtils.findTargetClass(bean); } else { // Get the class type of the object to be processed.
// Get all interfaces inherited by the object to be processed
Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); // Get all interfaces inherited by the pending object.

// If there is a GlobalTransactional annotation on the class of the pending object or on the inherited interfaces.
// or any of the methods of the class of the object to be handled have a GlobalTransactional or
// GlobalLock annotation on any of the methods of the class of the object to be handled returns true, i.e., it needs to be proxied.
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}

// If the interceptor is null, i.e. not in TCC mode.
// then use globalTransactionalInterceptor as the interceptor
if (interceptor == null) {
// globalTransactionalInterceptor will only be created once
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor =
new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener.addConfigListener(
(ConfigurationChangeListener) globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}

if (!AopUtils.isAopProxy(bean)) {
// If the bean itself is not a Proxy object, then the parent class wrapIfNecessary is called to generate the proxy object
// In the parent class, getAdvicesAndAdvisorsForBean is called to get the interceptor defined above.
bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { getAdvicesAndAdvisorsForBean(bean, beanName, cacheKey); }
} else {
// If the bean is already a proxy, add a new interceptor directly to the proxy's interceptor call chain, AdvisedSupport
// and add the new interceptor directly to the proxy's interception invocation chain.
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName,
getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
// Mark that the beanName has been processed
PROXYED_SET.add(beanName);
return bean; }
}
} catch (Exception exx) {
throw new RuntimeException(exx); }
}
}

// Return the interceptor object computed in the wrapIfNecessary method.
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName,
TargetSource customTargetSource)
throws BeansException {
return new Object[]{interceptor};
}
}

The above describes how GlobalTransactionScanner intercepts global transactions via annotations, the specific interceptor implementations are TccActionInterceptor and GlobalTransactionalInterceptor, for the AT pattern we are mainly concerned with the GlobalTransactionalInterceptor, in subsequent articles will introduce the specific implementation of GlobalTransactionalInterceptor.

In addition GloabalTransactionScanner is also responsible for the initialisation of TM, RM, which is implemented in the initClient method:

private void initClient() {
...

// Initialise the TM
TMClient.init(applicationId, txServiceGroup); ...
...

//Initialise RM
RMClient.init(applicationId, txServiceGroup); ...
...

// Register the Spring shutdown callback to free up resources.
registerSpringShutdownHook(); ... // Register the Spring shutdown callback for releasing resources.

}

TMClient, RMClient are Seata based on Netty implementation of the Rpc framework of the client class, just business logic is different, due to TMClient is relatively more simple, we take RMClient as an example to see the source code:

public class RMClient {
// RMClient's init is a static method that creates an instance of RmNettyRemotingClient and calls the init method.
public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient =
RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}
}

RmNettyRemotingClient is implemented as follows:

@Sharable
public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
// ResourceManager is responsible for handling transaction participants, supports AT, TCC and Saga modes.

// RmNettyRemotingClient singleton.
private static volatile RmNettyRemotingClient instance; // RmNettyRemotingClient instance; // RmNettyRemotingClient instance.
private final AtomicBoolean initialised = new AtomicBoolean(false); // The unique identifier of the microservice.
// Unique identifier of the microservice
private String applicationId; // Distributed transaction group name.
// Distributed transaction group name
private String transactionServiceGroup; // The name of the distributed transaction group.

// The init method is called by the init method in RMClient.
public void init() {
// Register the Processor for Seata's custom Rpc.
registerProcessor(); // If (initialised.compareAndAndroid)
if (initialised.compareAndSet(false, true)) {
// Call the init method of the parent class, which is responsible for initialising Netty and establishing a connection to the Seata-Server in the parent class
super.init();
}
}

// Register the Processor for the Seata custom Rpc.
private void registerProcessor() {
// 1. Register the Processor for the Seata-Server initiating the branchCommit.
RmBranchCommitProcessor rmBranchCommitProcessor =
new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor,
messageExecutor); messageExecutor

// 2. Register the Processor for the Seata-Server initiating the branchRollback.
RmBranchRollbackProcessor rmBranchRollbackProcessor =
new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor
, messageExecutor);

// 3. Register the Processor for the Seata-Server initiating the deletion of the undoLog.
RmUndoLogProcessor rmUndoLogProcessor =
new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor,
rmUndoLogProcessor, rmUndoLogProcessor); messageExecutor);

// 4. Register the Processor for the response returned by Seata-Server, ClientOnResponseProcessor.
// Used to process the Request initiated by the Client and the Response returned by the Seata-Server.
The ClientOnResponseProcessor // is responsible for processing the Request sent by the Client and the Response returned by the Seata-Server.
// Response returned by the Seata-Server, thus implementing Rpc.
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(),
getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor,
null); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null)
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.
onResponseProcessor, null); super.registerProcessor(MessageType.
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT,
onResponseProcessor, null); super.registerProcessor(MessageType.
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);

// 5. Processing Pong messages returned by Seata-Server
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor,
null);
}
}

The above logic seems to be quite complex, and there are many related classes, such as Processor, MessageType, TransactionMessageHandler, ResourceManager, etc. In fact, it's essentially an Rpc call, which can be divided into Rm-initiated and Seata-initiated calls.

  • Rm active call methods: such as: registering branches, reporting branch status, applying global locks, etc. Rm active call methods need to be in the ClientOnResponseProcessor to handle the Response returned by Seata-Server.
  • Seata-Server active call methods: such as: commit branch transactions, rollback branch transactions, delete undolog log. Seata-Server active call methods, the Client side corresponds to a different Processor to deal with, and after the end of processing to return to the Seata-Server processing results. Response. The core implementation logic of transaction commit and rollback are in TransactionMessageHandler and ResourceManager.

About TransactionMessageHandler, ResourceManager implementation will also be described in detail in subsequent chapters.

The next article will introduce the SeataAutoDataSourceProxyCreator, Rpc Interceptor is how to initialise and intercept.

· 8 min read

Seata Demo environment build under Mac (AT mode)

Preface

Recently, because of work needs, research and study Seata distributed transaction framework, this article to learn the knowledge of their own record!

Seata overview

cloc code statistics

First look at the seata project cloc code statistics (as of 2020-07-20)

! cloc-seata

The number of Java code lines is about 97K

Code quality

Unit test coverage 50%

! cloc-seata

Demo code

The demo code in this article is the seata-samples-dubbo module under the seata-samples project at the following address:

https://github.com/apache/incubator-seata-samples/tree/master/dubbo

Core problem solved

The AT pattern Demo example gives a typical distributed transaction scenario:

  • In a purchase transaction, it is necessary to:
  1. deduct the inventory of a product
  2. deduct the user account balance
  3. generate a purchase order
  • Obviously, all three steps must either succeed or fail, otherwise the system's data will be messed up.
  • With the popular microservices architecture, generally speaking, inventory, account balance, and purchase order are three separate systems.
  • Each microservice has its own database and is independent of each other.

Here is the scenario for distributed transactions.

! Design diagram

Solution

The idea of the AT pattern to solve this problem is actually quite simple and is summarised in one sentence:

In the distributed transaction process, record the data to be modified before and after the modification of the value to the undo_log table, in case of abnormalities in the transaction, through the data in this to do a rollback!

Of course, the specific code to implement, I believe that many details are far from being so simple.

Demo code structure

Clone the latest code from github.

git clone git@github.com:apache/incubator-seata-samples.git
``

Read the Demo code structure

```sh
$ cd seata-samples/dubbo/
$ tree -C -I 'target' .
.
├── README.md
├─ pom.xml
├── seata-samples-dubbo.iml
└── src
└── main
├─ java
│ └── io
│ └── seata
│ └── samples
│ └─ dubbo
│ ├── ApplicationKeeper.java
│ ├── Order.java
│ ├── service
│ │ ├── AccountService.java
│ │ ├── BusinessService.java
│ ├── OrderService.java │ ├── OrderService.java
│ │ ├── StorageService.java
│ │ └── impl
│ │ ├── AccountServiceImpl.java
│ │ ├── BusinessServiceImpl.java
│ │ ├── OrderServiceImpl.java
│ │ └── StorageServiceImpl.java
│ └── starter
│ ├── DubboAccountServiceStarter.java │ ├── DubboAccountServiceStarter.java
│ ├── DubboBusinessTester.java
│ ├── DubboOrderServiceStarter.java
│ └── DubboStorageServiceStarter.java
└── resources
├── file.conf
├── jdbc.properties
├── log4j.properties
├── registry.conf
├─ spring
│ ├── dubbo-account-service.xml
│ ├── dubbo-business.xml
│ ├── dubbo-order-service.xml
│ └── dubbo-storage-service.xml
└── sql
├── dubbo_biz.sql
└── undo_log.sql

13 directories, 27 files
  • The four *Starter classes under the io.seata.samples.dubbo.starter package emulate each of the four microservices described above

  • Account

  • Business

  • Order

  • Storage

  • 4 services are standard dubbo services, configuration files in the seata-samples/dubbo/src/main/resources/spring directory

  • To run the demo, you need to start all four services, and Business is the last one to start.

  • The main logic is in io.seata.samples.dubbo.service, and the four implementation classes correspond to the business logic of the four microservices.

  • Configuration file for database information: src/main/resources/jdbc.properties

Timing diagram

! cloc-seata

Ok, get going, Make It Happen!

Run the demo

MySQL

Create a table

Execute the scripts dubbo_biz.sql and undo_log.sql in seata-samples/dubbo/src/main/resources/sql.

mysql> show tables;
+-----------------+
| Tables_in_seata |
+-----------------+
| account_tbl |
| order_tbl |
| storage_tbl |
| undo_log |
+-----------------+
4 rows in set (0.01 sec)

After execution, there should be 4 tables in the database

Modify the seata-samples/dubbo/src/main/resources/jdbc.properties file

Modify the values of the variables according to the environment in which you are running MySQL

jdbc.account.url=jdbc:mysql://localhost:3306/seata
jdbc.account.username=your_username
jdbc.account.password=your_password
jdbc.account.driver=com.mysql.jdbc.
# storage db config
jdbc.storage.url=jdbc:mysql://localhost:3306/seata
jdbc.storage.username=your_username
jdbc.storage.password=your_password
jdbc.storage.driver=com.mysql.jdbc.
# order db config
jdbc.order.url=jdbc:mysql://localhost:3306/seata
jdbc.order.username=your_username
jdbc.order.password=your_password
jdbc.order.driver=com.mysql.jdbc.

ZooKeeper

Start ZooKeeper, my local Mac is using Homebrew installation to start it

$ brew services start zookeeper
==> Successfully started `zookeeper` (label: homebrew.

$ brew services list
Name Status User Plist
docker-machine stopped
elasticsearch stopped
kafka stopped
kibana stopped
mysql started portman /Users/portman/Librar
y/LaunchAgents/homebrew.mxcl.mysql.plist
nginx stopped
postgresql stopped
postgresql stopped
zookeeper started portman /Users/portman/Librar
y/LaunchAgents/homebrew.mxcl.zookeeper.plist

Start the TC transaction coordinator

In this link page, download the corresponding version of seata-server, I downloaded version 1.2.0 locally.

  1. Go to the directory where the file is located and extract the file.
  2. Enter the seata directory
  3. Execute the startup script
$ tar -zxvf seata-server-1.2.0.tar.gz
$ cd seata
$ bin/seata-server.sh

Observe the startup log for error messages, if everything is fine and you see the following Server started message, the startup was successful.

2020-07-23 13:45:13.810 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...

Starting a simulated microservice in the IDE

  1. First import the seata-samples project into your local IDE, I'm using IntelliJ IDEA here.
  2. Refresh the Maven project dependencies.
  3. Start the Account, Order and Storage services before Business can invoke them, the corresponding startup classes are:

The corresponding startup classes are:

io.seata.samples.dubbo.starter.DubboStorageServiceStarter
io.seata.samples.dubbo.starter.DubboOrderServiceStarter
io.seata.samples.dubbo.starter.DubboStorageServiceStarter

After each service is started, you see this message indicating that the service was started successfully

Application is keep running ...

! cloc-seata

After successful startup, the account_tbl, storage_tbl tables will have two initialised data, the account balance and the product inventory respectively

mysql> SELECT * FROM account_tbl; SELECT * FROM storage_tbl;
+----+---------+-------+
| id | user_id | money |
+----+---------+-------+ | id | user_id | money | ----+---------+-------+
| 1 | U100001 | 999 |
+----+---------+-------+ | 1 row in set (0.00.00)
1 row in set (0.00 sec)

+----+----------------+-------+
| id | commodity_code | count |
+----+----------------+-------+ | id | commodity_code | count | ----+----------------+-------+
| 1 | C00321 | 100 |
+----+----------------+-------+
1 row in set (0.00 sec)

Use Business to verify results

Normal

Still executing the main function of the DubboBusinessTester class in the IDE, the programme will exit automatically after running.

If everything is working properly, everything should be committed for each microservice, and the data should be consistent.

Let's take a look at the data changes in MySQL

mysql> SELECT * FROM account_tbl; SELECT * FROM order_tbl; SELECT * FROM storage_tbl.
+----+---------+-------+
| id | user_id | money |
+----+---------+-------+ | id | user_id | money | ----+---------+-------+
| 1 | U100001 | 599 |
+----+---------+-------+ | 1 row in set (0.00.00)
1 row in set (0.00 sec)

+----+---------+----------------+-------+-------+
| id | user_id | commodity_code | count | money |
+----+---------+----------------+-------+-------+
| 1 | U100001 | C00321 | 2 | 400 |
+----+---------+----------------+-------+-------+
1 row in set (0.00 sec)

+----+----------------+-------+
| id | commodity_code | count |
+----+----------------+-------+ | id | commodity_code | count | ----+----------------+-------+
| 1 | C00321 | 98 |
+----+----------------+-------+
1 row in set (0.00 sec)

From the data of the 3 tables, we can see: account balance is deducted by 400; the order table is increased by 1 record; the product inventory is deducted by 2

This result is consistent with the logic of the programme, which means that there is no problem with the transaction.

exception

In fact, even if you do not join the distributed transaction control, everything is normal, the transaction itself will not be a problem

So let's focus on what happens when an exception occurs.

Now I'm going to comment out the exception-throwing code in BusinessServiceImpl and execute DubboBusinessTester once more to see what happens.

		@Override
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
storageService.deduct(commodityCode, orderCount); orderService.create(userId)
orderService.create(userId, commodityCode, orderCount); // release this exception throw.

//Leave this exception comment alone to simulate an exception in the application.
throw new RuntimeException("portman's foooooobar error.");;

}

Next, I executed DubboBusinessTester once again, and during the execution I could see the exception message on the console

Exception in thread "main" java.lang.RuntimeException: portman's foooooobar error.

Now we look again at the data changes in MySQL and see that there are no changes in the data, indicating that the distributed transaction control has worked

Questions to ponder

The above steps just demonstrates seata's simplest demo programme, more complex cases can be discussed and verified later!

There are still some questions and doubts in the learning process, followed by further study

  • Global lock on the performance of the degree of impact
  • undo_log log can be rolled back to the original state, but if the data state has changed how to deal with (for example, increased user points have been spent by other local transactions)

References

  • [What is Seata?] (/docs/overview/what-is-seata)
  • [Quickstart] (/docs/user/quickstart)

Author information

Xu Xiaoga, Software Architect, Kingdee

Github