Skip to main content

· 11 min read

Project address: https://gitee.com/itCjb/springboot-dubbo-mybatisplus-seata

Author: FUNKYE (Chen Jianbin), Hangzhou, an Internet company programmer.

Introduction

Mybatis-Plus: MyBatis-Plus (MP for short) is an MyBatis enhancement tool in the MyBatis on the basis of only enhancements do not change , in order to simplify the development , improve efficiency and born .

MP configuration:

<bean id="sqlSessionFactory" class="com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
</bean

Seata: Seata is an open source distributed transaction solution , is committed to providing high-performance and easy to use distributed transaction services . Seata will provide users with AT, TCC, SAGA and XA transaction patterns , to create a one-stop distributed solution for users .

AT mode mechanism:

  • Phase I: Business data and rollback log records are committed in the same local transaction, releasing local locks and connection resources.
  • Phase II:
  • Commit asynchronised and completed very quickly.
  • Rollbacks are back-compensated by the phase 1 rollback log.

Analyse the causes

  1. First of all, through the introduction, we can see that mp is required to register the sqlSessionFactory and inject the data source, while Seata is to ensure the normal rollback and commit of the transaction through the proxy data source.

  2. Let's look at the SeataAutoConfig code based on the official Seata demo.

package org.test.config;

import javax.sql.DataSource;

import org.apache.ibatis.session.
import org.slf4j.Logger; import org.slf4j.
import org.slf4j.LoggerFactory; import org.springframework.
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.autoconfigure.jdbc.
import org.springframework.context.annotation.
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.
import org.springframework.context.annotation.

import com.alibaba.druid.pool.DruidDataSource; import com.baomidou.pool.
import com.alibaba.druid.pool.DruidDataSource; import com.baomidou.mybatisplus.extension.spring.

import io.seata.rm.datasource.DataSourceProxy; import io.seata.rm.datasource.
import io.seata.rm.datasource.DataSourceProxy; import io.seata.spring.annotation.

@Configuration
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties; private final static Logger logger; @Autowired(required = true)
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);

@Bean(name = "dataSource") // Declare it as a bean instance.
@Primary // In the same DataSource, first use the labelled DataSource
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}",dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName()); druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0); druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false); druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false); druidDataSource.
druidDataSource.setTestWhileIdle(true); druidDataSource.
druidDataSource.setTimeBetweenEvictionRunsMillis(60000); druidDataSource.
druidDataSource.setMinEvictableIdleTimeMillis(25200000); druidDataSource.
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800); druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("Loading dataSource ........") ;
return druidDataSource;
}

/**
* init datasource proxy
* @Param: druidDataSource datasource bean instance
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
logger.info("Proxy dataSource ........") ;
return new DataSourceProxy(dataSource);
}

@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
MybatisSqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); factory.setDataSource(dataSourceProxy);
factory.setMapperLocations(new PathMatchingResourcePatternResolver()); factory.setMapperLocations(new PathMatchingResourcePatternResolver())
.getResources("classpath*:/mapper/*.xml")); factory.setMapperLocations(new PathMatchingResourcePatternResolver())
return factory.getObject();
}

/**
* init global transaction scanner
* @Return: GlobalTransactionScanner
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("Configuring seata........") ;
return new GlobalTransactionScanner("test-service", "test-group");
}
}

First of all, we see that in our seata configuration datasource class, we have configured a datasource, and then we have configured a seata proxy datasource bean, and this time.

Then we if we directly start the mp integration seata project will find that paging and other plug-ins will be directly invalid , even scanning mapper have to write from the code , this is why?

By reading the above code, because we have another configuration of a sqlSessionFactory, resulting in mp's sqlSessionFactory failure, this time we found the problem, even if we do not configure sqlSessionFactoryl, but also because of the mp data source used is not seata proxy After the data source used by mp is not proxied by seata, resulting in distributed transaction failure. But how to solve this problem?

We need to read the source code of mp and find its startup class.

/* /* /* /* /* /* /* /*
* Copyright (c) 2011-2020, baomidou (jobob@qq.com).
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the Licence. You may obtain a copy of * the Licence at
* the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software * distributed under the Licence is distributed on an "AS IS" BASIS.
* distributed under the Licence is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either expressed or implied.
* Licence for the specific language governing permissions and limitations under
* the Licence.
*/
package com.baomidou.mybatisplus.autoconfigure;


import com.baomidou.mybatisplus.core.MybatisConfiguration; import com.baomidou.mybatisplus.core.config.
import com.baomidou.mybatisplus.core.config.GlobalConfig; import com.baomidou.mybatisplus.core.
import com.baomidou.mybatisplus.core.handlers.
import com.baomidou.mybatisplus.core.incrementer.IKeyGenerator; import com.baomidou.mybatisplus.core.
import com.baomidou.mybatisplus.core.injector.ISqlInjector; import com.baomidou.mybatisplus.core.injector.
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import org.apache.ibache.
import org.apache.ibatis.annotations.
import org.apache.ibatis.mapping.DatabaseIdProvider; import org.apache.ibatis.mapping.
import org.apache.ibatis.mapping.DatabaseIdProvider; import org.apache.ibatis.plugin.
import org.apache.ibatis.scripting.LanguageDriver; import org.apache.ibatis.scripting.
import org.apache.ibatis.scripting.LanguageDriver; import org.apache.ibatis.session.
import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.session.
import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.type.
import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.
import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.
import org.mybatis.spring.mapper.MapperFactoryBean; import org.mybatis.spring.mapper.
import org.mybatis.spring.mapper.MapperScannerConfigurer; import org.mybatis.spring.mapper.
import org.slf4j.Logger; import org.slf4j.
import org.slf4j.LoggerFactory; import org.springframework.
import org.springframework.beans.BeanWrapper; import org.springframework.beans.
import org.springframework.beans.BeanWrapperImpl; import org.springframework.beans.
import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.
import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.
import org.springframework.beans.factory.InitialisingBean; import org.springframework.beans.factory.
import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.
import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.
import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.
import org.springframework.boot.autoconfigure.AutoConfigurationPackages; import org.springframework.boot.autoconfigure.
import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; import org.springframework.boot.autoconfigure.condition.
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.
import org.springframework.context.ApplicationContext; import org.springframework.context.
import org.springframework.context.annotation.
import org.springframework.context.annotation.
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; import org.springframework.context.annotation.
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; import org.springframework.core.io.
import org.springframework.core.io.ResourceLoader; import org.springframework.core.io.
import org.springframework.core.type.AnnotationMetadata; import org.springframework.core.io.
import org.springframework.core.type.AnnotationMetadata; import org.springframework.util.
import org.springframework.util.CollectionUtils; import org.springframework.util.
import org.springframework.util.ObjectUtils; import org.springframework.util.
import org.springframework.util.StringUtils; import org.springframework.util.

import javax.sql.DataSource; import java.util.
import java.util.List; import java.util.
import java.util.Optional; import java.util.
import java.util.stream.

/**
* {@link EnableAutoConfiguration Auto-Configuration} for Mybatis. Contributes a
* {@link SqlSessionFactory} and a {@link SqlSessionTemplate}.
* <p>
* If {@link org.mybatis.spring.annotation.MapperScan} is used, or a
* configuration file is specified as a property, those will be considered, * otherwise this auto-configuration will be considered.
* otherwise this auto-configuration will attempt to register mappers based on
* the interface definitions in or under the root auto-configuration package.
* </p
* <p> copy from {@link org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration}</p>
* @author Eddú Melén
* @author Eddú Meléndez
* @author Josh Long
* @author Kazuki Shimizu
* @author Eduardo Macarrón
*/
@Configuration
@ConditionalOnClass({SqlSessionFactory.class, SqlSessionFactoryBean.class})
@ConditionalOnSingleCandidate(DataSource.class)
@EnableConfigurationProperties(MybatisPlusProperties.class)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
public class MybatisPlusAutoConfiguration implements InitialisingBean {

private static final Logger logger = LoggerFactory.getLogger(MybatisPlusAutoConfiguration.class);

private final MybatisPlusProperties properties.

private final Interceptor[] interceptors; private final

private final TypeHandler[] typeHandlers; private final MybatisPlusProperties properties; private final

private final LanguageDriver[] languageDrivers.

private final ResourceLoader resourceLoader;

private final DatabaseIdProvider databaseIdProvider; private final

private final List<ConfigurationCustomizer> configurationCustomizers; private final List<ConfigurationCustomizer> configurationCustomizers.

private final List<MybatisPlusPropertiesCustomizer> mybatisPlusPropertiesCustomizers;

private final ApplicationContext applicationContext;


public MybatisPlusAutoConfiguration(MybatisPlusProperties properties, MybatisPlusPropertiesCustomizers)
ObjectProvider<Interceptor[]> interceptorsProvider, ObjectProvider<TypeHandler[]> interceptorsProvider, MybatisPlusAutoConfiguration(MybatisPlusProperties)
ObjectProvider<TypeHandler[]> typeHandlersProvider, ObjectProvider<LanguageProvider
ObjectProvider<LanguageDriver[]> languageDriversProvider,
ResourceLoader resourceLoader,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider,
ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mybatisPlusPropertiesCustomizerProvider,
ApplicationContext applicationContext) {
this.properties = properties; this.interceptors = interceptors
this.interceptors = interceptorsProvider.getIfAvailable();
this.typeHandlers = typeHandlersProvider.getIfAvailable(); this.
this.languageDrivers = languageDriversProvider.getIfAvailable(); this.
this.resourceLoader = resourceLoader; this.databaseIdProvider.getIfAvailable()
this.databaseIdProvider = databaseIdProvider.getIfAvailable(); this.
this.configurationCustomizers = configurationCustomizersProvider.getIfAvailable(); this.
this.mybatisPlusPropertiesCustomizers = mybatisPlusPropertiesCustomizerProvider.getIfAvailable(); this.
this.applicationContext = applicationContext;
}

@Override
public void afterPropertiesSet() {
if (!CollectionUtils.isEmpty(mybatisPlusPropertiesCustomizers)) {
mybatisPlusPropertiesCustomizers.forEach(i -> i.customise(properties));
}
checkConfigFileExists();
}

private void checkConfigFileExists() {
if (this.properties.isCheckConfigLocation() && StringUtils.hasText(this.properties.getConfigLocation())) {
Resource resource = this.resourceLoader.getResource(this.properties.getConfigLocation());
Assert.state(resource.exists(),
"Cannot find config location: " + resource + " (please add config file or check your Mybatis configuration)");
}
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
// TODO uses MybatisSqlSessionFactoryBean instead of SqlSessionFactoryBean.
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
factory.setDataSource(dataSource); factory.setVfs(SpringBean); factory.setVfs(SpringBean)
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation())); }
}
applyConfiguration(factory).
if (this.properties.getConfigurationProperties() ! = null) {
factory.setConfigurationProperties(this.properties.getConfigurationProperties());
}
if (!ObjectUtils.isEmpty(this.interceptors)) {
factory.setPlugins(this.interceptors); }
}
if (this.databaseIdProvider ! = null) {
factory.setDatabaseIdProvider(this.databaseIdProvider); }
}
if (StringUtils.hasLength(this.properties.getTypeAliasesPackage())) {
factory.setTypeAliasesPackage(this.properties.getTypeAliasesPackage()); }
}
if (this.properties.getTypeAliasesSuperType() ! = null) {
factory.setTypeAliasesSuperType(this.properties.getTypeAliasesSuperType()); }
}
if (StringUtils.hasLength(this.properties.getTypeHandlersPackage())) {
factory.setTypeHandlersPackage(this.properties.getTypeHandlersPackage()); }
}
if (!ObjectUtils.isEmpty(this.typeHandlers)) {
factory.setTypeHandlers(this.typeHandlers); }
}
if (!ObjectUtils.isEmpty(this.properties.resolveMapperLocations())) {
factory.setMapperLocations(this.properties.resolveMapperLocations()); }
}

// TODO makes some changes to the source code (because it adapts to an older version of mybatis, but we don't need to).
Class<? extends LanguageDriver> defaultLanguageDriver = this.properties.getDefaultScriptingLanguageDriver(); if (!
if (!ObjectUtils.isEmpty(this.languageDrivers)) {
factory.setScriptingLanguageDrivers(this.languageDrivers); }
}
Optional.ofNullable(defaultLanguageDriver).ifPresent(factory::setDefaultScriptingLanguageDriver);

// TODO custom enum package
if (StringUtils.hasLength(this.properties.getTypeEnumsPackage())) {
factory.setTypeEnumsPackage(this.properties.getTypeEnumsPackage());
}
// TODO This must be non-NULL.
GlobalConfig globalConfig = this.properties.getGlobalConfig(); // TODO inject the filler.
// TODO inject the filler
if (this.applicationContext.getBeanNamesForType(MetaObjectHandler.class,
false, false).length > 0) {
MetaObjectHandler metaObjectHandler = this.applicationContext.getBean(MetaObjectHandler.class);
globalConfig.setMetaObjectHandler(metaObjectHandler);
}
// TODO inject the primary key generator
if (this.applicationContext.getBeanNamesForType(IKeyGenerator.class, false
false).length > 0) {
IKeyGenerator keyGenerator = this.applicationContext.getBean(IKeyGenerator.class);
globalConfig.getDbConfig().setKeyGenerator(keyGenerator);
}
// TODO injecting the sql injector
if (this.applicationContext.getBeanNamesForType(ISqlInjector.class, false,
false).length > 0) {
ISqlInjector iSqlInjector = this.applicationContext.getBean(ISqlInjector.class);
globalConfig.setSqlInjector(iSqlInjector);
}
// TODO set GlobalConfig to MybatisSqlSessionFactoryBean
factory.setGlobalConfig(globalConfig); return factory.getObject(MybatisSqlSessionFactoryBean); }
factory.setGlobalConfig(globalConfig); return factory.getObject();
}

// TODO entry using MybatisSqlSessionFactoryBean.
private void applyConfiguration(MybatisSqlSessionFactoryBean factory) {
// TODO using MybatisConfiguration
MybatisConfiguration configuration = this.properties.getConfiguration(); if (configuration == null & null); }
if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation()) {
configuration = new MybatisConfiguration();
}
if (configuration ! = null && !CollectionUtils.isEmpty(this.configurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.configurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration); }
}

@Bean
@ConditionalOnMissingBean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
ExecutorType executorType = this.properties.getExecutorType(); if (executorType !
if (executorType ! = null) {
return new SqlSessionTemplate(sqlSessionFactory, executorType); if (executorType !
} else {
return new SqlSessionTemplate(sqlSessionFactory); } else { new SqlSessionTemplate(sqlSessionFactory); }
}
}

/**} }
* This will just scan the same base package as Spring Boot does. If you want more power, you can explicitly use
* {@link org.mybatis.spring.annotation.MapperScan} but this will get typed mappers working correctly, out-of-the-box, * similar to using Spring Data JPA repositories.
* similar to using Spring Data JPA repositories.
*/
public static class AutoConfiguredMapperScannerRegistrar implements BeanFactoryAware, ImportBeanDefinitionRegistrar {

private BeanFactory beanFactory;

private BeanFactory beanFactory; @Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

if (!AutoConfigurationPackages.has(this.beanFactory)) {
logger.debug("Could not determine auto-configuration package, automatic mapper scanning disabled."); return; { if (!AutoConfigurationPackages.has(this.beanFactory)) { if (!
return;
}

logger.debug("Searching for mappers annotated with @Mapper");

List<String> packages = AutoConfigurationPackages.get(this.beanFactory);
if (logger.isDebugEnabled()) {
packages.forEach(pkg -> logger.debug("Using auto-configuration base package '{}'", pkg));
}

BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MapperScannerConfigurer.class);
builder.addPropertyValue("ProcessPropertyPlaceHolders", true);
builder.addPropertyValue("annotationClass", Mapper.class); builder.addPropertyValue("processPropertyPlaceHolders", true);
builder.addPropertyValue("basePackage", StringUtils.collectionToCommaDelimitedString(packages));
BeanWrapper beanWrapper = new BeanWrapperImpl(MapperScannerConfigurer.class);
Stream.of(beanWrapper.getPropertyDescriptors())
// Need to mybatis-spring 2.0.2+
.filter(x -> x.getName().equals("lazyInitialisation")).findAny()
.ifPresent(x -> builder.addPropertyValue("lazyInitialization", "${mybatis.lazy-initialization:false}"));
registry.registerBeanDefinition(MapperScannerConfigurer.class.getName(), builder.getBeanDefinition());
}

@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory; } @Override public void setBeanFactory(beanFactory) { this.
}
}

/**
* If mapper registering configuration or mapper scanning configuration not present, this configuration allow to scan
* mappers based on the same component-scanning path as Spring Boot itself.
*/
@Configuration
@Import(AutoConfiguredMapperScannerRegistrar.class)
@ConditionalOnMissingBean({MapperFactoryBean.class, MapperScannerConfigurer.class})
public static class MapperScannerRegistrarNotFoundConfiguration implements InitialisingBean {

public void afterPropertiesSet
public void afterPropertiesSet() {
logger.debug(
"Not found configuration for registering mapper bean using @MapperScan, MapperFactoryBean and MapperScannerConfigurer.");
}
}
}

See the sqlSessionFactory method in the mp startup class, it injects a data source in the same way, at this point you should know the solution, right?

That's right, is to be proxied to the data source to the mp sqlSessionFactory.

It's very simple, we need to slightly change our seata configuration class on the line

package org.test.config; import javax.sql.

import javax.sql.DataSource; import org.mybatis.

import org.mybatis.spring.annotation.
import org.slf4j.Logger; import org.slf4j.
import org.slf4j.LoggerFactory; import org.springframework.
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.autoconfigure.jdbc.
import org.springframework.context.annotation.
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.
import org.springframework.context.annotation.

import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.

import io.seata.rm.datasource.DataSourceProxy; import io.seata.rm.datasource.
import io.seata.spring.annotation.GlobalTransactionScanner; import io.seata.rm.datasource.

@Configuration
@MapperScan("com.baomidou.springboot.mapper*")
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties; private final static Logger logger; @Autowired(required = true)
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);
private DataSourceProxy dataSourceProxy;

@Bean(name = "dataSource") // Declare it as a bean instance.
@Primary // In the same DataSource, the labelled DataSource is used first
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}", dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName()); druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0); druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false); druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false); druidDataSource.
druidDataSource.setTestWhileIdle(true); druidDataSource.
druidDataSource.setTimeBetweenEvictionRunsMillis(60000); druidDataSource.
druidDataSource.setMinEvictableIdleTimeMillis(25200000); druidDataSource.
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800); druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("Loading dataSource ........") ;
dataSourceProxy = new DataSourceProxy(druidDataSource);
return dataSourceProxy;
}

/**
* init datasource proxy
} /** * init datasource proxy
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy() {
logger.info("Proxy dataSource ........") ;
return dataSourceProxy;
}

/**
* init global transaction scanner
* @Return: GlobalTransactionScanner
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("Configuring seata........") ;
return new GlobalTransactionScanner("test-service", "test-group");
}
}

Look at the code, we removed their own configuration of the sqlSessionFactory, directly let the DataSource bean return is a proxied bean, and we added @Primary, resulting in mp priority to use our configuration of the data source, which solves the problem of mp because of seata proxy data source with the creation of a new sqlSessionFactory, resulting in mp's plug-ins and components fail the bug!

Summary

stepping into the pit is not terrible, the main and patience along the principle of each component implementation, and then go to think, look for the corresponding conflict of the code block, you will be able to find a compatible method of the two.

· 21 min read

Project address

This article was written by FUNKYE (Chen Jianbin), Hangzhou, an Internet company main program.

Preface

Transaction: Transaction is a reliable independent unit of work composed of a set of operations, the transaction has the characteristics of ACID, namely atomicity, consistency, isolation and persistence. Distributed Transaction: When an operation involves multiple services, multiple databases to collaborate on the completion (such as sub-tables and libraries, business split), multiple services, the local Transaction has been unable to cope with this situation , in order to ensure data consistency, you need to use distributed transactions. Seata : is an open source distributed transaction solution , is committed to providing high performance and ease of use in the microservices architecture of distributed transaction services . Purpose of this article : Nowadays, microservices are becoming more and more popular , and the market can be described as a number of distributed transaction solutions , uneven , more popular to MQ on behalf of the guarantee is the ultimate consistency of the message solution ( consumption confirmation , message lookback , message compensation mechanism , etc.) , and TX-LCN LCN mode to coordinate local transactions to ensure that the transaction unified commit or rollback (has stopped updating , incompatible with Dubbo2.7). MQ's distributed transactions are too complex, TX-LCN break more, this time the need for an efficient and reliable and easy to get started with the distributed transaction solution, Seata stands out, this article is to introduce how to quickly build a Demo project to integrate Seata, together!

Preparation

  1. First of all, install mysql, eclipse and other commonly used tools, which does not expand.

  2. visit the seata download centre address we use version 0.9.0

  3. Download and unzip seata-server.

Build the library and table

1.first we link mysql to create a database named seata, and then run the table building sql, this in the seata-server conf folder db_store.sql is what I need to use the sql.

/*
Navicat MySQL Data Transfer
Source Server : mysql
Source Server Version : 50721
Source Host : localhost:3306
Source Database : seata
Target Server Type : MYSQL
Target Server Version : 50721
File Encoding : 65001
Date: 2019-11-23 22:03:18
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------

-- Table structure for branch_table

-- ----------------------------

DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL, `xid` varchar
`xid` varchar(128) NOT NULL, `transaction_id` bigint(20)
`transaction_id` bigint(20) DEFAULT NULL, `resource_group_id
`resource_group_id` varchar(32) DEFAULT NULL, `resource_id` varchar(32)
`resource_id` varchar(256) DEFAULT NULL, `lock_key` varchar(256)
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL, `status` tinyint(8)
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL, `application_data` tinyint(4)
`application_data` varchar(2000) DEFAULT NULL, `gmt_create` tinyint(4) DEFAULT NULL, `gmt_create
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL, `gmt_modified` datetime
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of branch_table

-- ----------------------------

-- ----------------------------

-- Table structure for global_table

-- ----------------------------

DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL, `transaction_id` varchar(128)
`transaction_id` bigint(20) DEFAULT NULL, `status` tinyint(20)
`status` tinyint(4) NOT NULL, `application_id` varchar(4)
`application_id` varchar(32) DEFAULT NULL, `transaction_service` bigint(20)
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL, `timeout` int(11.0)
`timeout` int(11) DEFAULT NULL, `begin_time` big
`begin_time` bigint(20) DEFAULT NULL, `application_data` int(11)
`application_data` varchar(2000) DEFAULT NULL, `gmt_create` bigint(20)
`gmt_create` datetime DEFAULT NULL, `gmt_modify` datetime
`gmt_modified` datetime DEFAULT NULL, `gmt_modified` datetime
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_tmt_status
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of global_table

-- ----------------------------

-- ----------------------------

-- Table structure for lock_table

-- ----------------------------

DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL, `xid` varchar(128)
`xid` varchar(96) DEFAULT NULL,
`transaction_id` mediumtext, `branch_id` mediumtext, `transaction_id` mediumtext
`branch_id` mediumtext,
`resource_id` varchar(256) DEFAULT NULL, `table_name` varchar(256)
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL, `gmt_create
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL, `gmt_modified` datetime
PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of lock_table

-- ----------------------------

-- ----------------------------

-- Table structure for undo_log

-- ----------------------------

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20)
`branch_id` bigint(20) NOT NULL, `xid` varchar
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL, `rollback_info` bigint(20)
`rollback_info` longblob NOT NULL, `log_status` int
`log_status` int(11) NOT NULL, `log_created` datasheet
`log_created` datetime NOT NULL, `log_modified` longblob NOT NULL, `log_status` int(11)
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------

-- Records of undo_log
  1. After running the database needed for the above seata, we build the library we need to write the demo, create a database named test, and then execute the following sql code.
/*
Navicat MySQL Data Transfer
Source Server : mysql
Source Server Version : 50721
Source Host : localhost:3306
Source Database : test
Target Server Type : MYSQL
Target Server Version : 50721
File Encoding : 65001
Date: 2019-11-23 22:03:24
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------

-- Table structure for test

-- ----------------------------

DROP TABLE IF EXISTS `test`.
CREATE TABLE `test` (
`id` int(11) NOT NULL AUTO_INCREMENT, `one` varchar(255) DEFATE TABLE (
`one` varchar(255) DEFAULT NULL,
`two` varchar(255) DEFAULT NULL, `createTime` datetime, `createTime` datetime, `createTime` datetime
`createTime` datetime DEFAULT NULL, `two` varchar(255) DEFAULT NULL, `createTime` datetime
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of test

-- ----------------------------

INSERT INTO `test` VALUES ('1', '1', '2', '2019-11-23 16:07:34');

-- ----------------------------

-- Table structure for undo_log

-- ----------------------------

DROP TABLE IF EXISTS `undo_log`;.
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20)
`branch_id` bigint(20) NOT NULL, `xid` varchar
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL, `rollback_info` bigint(20)
`rollback_info` longblob NOT NULL, `log_status` int
`log_status` int(11) NOT NULL, `log_created` datasheet
`log_created` datetime NOT NULL, `log_modified` longblob NOT NULL, `log_status` int(11)
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------

-- Records of undo_log
  1. we find the file inside the seata-server/conf folder and edit it:20191129132933

  2. again find the db configuration method block, change the method as follows:

Well, you can go to the bin directory./seata-server.bat run to see the

Create a project

first of all, we use eclipse, of course, you can also use idea and other tools, please run in detail according to the following steps

  1. create a new maven project, and delete the extra folder:2019112913335420191129133441

  2. Open the project's pom.xml and add the following dependency.

<properties
<webVersion>3.1</webVersion
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding
<maven.compiler.source>1.8</maven.compiler.source
<maven.compiler.target>1.8</maven.compiler.target
<HikariCP.version>3.2.0</HikariCP.version
<mybatis-plus-boot-starter.version>3.2.0</mybatis-plus-boot-starter.version>
</properties
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>.
</parent
<dependencies
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId
<version>4.2.0</version>
</dependency
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>.
</dependency
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.4.1</version>
</dependency
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency
<dependency>
<groupId>com.alibaba</groupId
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency
<! -- <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId>
<version>7.0</version> <scope>provided</scope> </dependency> -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>.
</dependency
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>.
</dependency

<! -- mybatis-plus begin -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency
<! -- mybatis-plus end -->
<! -- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0.1</version>
</dependency
<! -- Zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
<exclusions
<exclusion
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion
</exclusions
</dependency
<! -- <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</ artifactId>
<version>2.5.4</version> </dependency> -->

<! -- <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId>
<version>3.1.0</version> </dependency> -->
<! -- https://mvnrepository.com/artifact/org.freemarker/freemarker -->
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency
<! -- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency
<! -- Add this to recognise the log4j2.yml file -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency
<dependency> <! -- Introducing the log4j2 dependency -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency
<! -- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId
<exclusions
<exclusion>
<groupId>org.springframework.boot</groupId
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion
<exclusion
<groupId>org.slf4j</groupId
<artifactId>slf4j-log4j12</artifactId>
</exclusion
</exclusions
</dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId
<scope>test</scope
</dependency
<! -- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId>
<version>2.11.0</version> </dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional
</dependency
</dependencies

<optional>true</optional> </dependencies>
  1. and then switch the parent project for pom mode, or pom file, switch to overview , do as shown in the operation:20191129134127

  2. create our demo sub-project, test-service:20191129135935

The directory is as follows.

20191129140048

Create EmbeddedZooKeeper.java file, along with ProviderApplication.java, with the following code.

package org.test;

import java.io.File;
import java.lang.reflect.Method;
import java.util.Properties;
import java.util.UUID;

import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.ErrorHandler;
import org.springframework.util.SocketUtils;

/**
* from:
* https://github.com/spring-projects/spring-xd/blob/v1.3.1.RELEASE/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/zookeeper/ZooKeeperUtils.java
*
* Helper class to start an embedded instance of standalone (non clustered) ZooKeeper.
*
* NOTE: at least an external standalone server (if not an ensemble) are recommended, even for
* {@link org.springframework.xd.dirt.server.singlenode.SingleNodeApplication}
*
* @author Patrick Peralta
* @author Mark Fisher
* @author David Turanski
*/
public class EmbeddedZooKeeper implements SmartLifecycle {

/**
* Logger.
*/
private static final Logger logger = LoggerFactory.getLogger(EmbeddedZooKeeper.class);

/**
* ZooKeeper client port. This will be determined dynamically upon startup.
*/
private final int clientPort;

/**
* Whether to auto-start. Default is true.
*/
private boolean autoStartup = true;

/**
* Lifecycle phase. Default is 0.
*/
private int phase = 0;

/**
* Thread for running the ZooKeeper server.
*/
private volatile Thread zkServerThread;

/**
* ZooKeeper server.
*/
private volatile ZooKeeperServerMain zkServer;

/**
* {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread.
*/
private ErrorHandler errorHandler;

private boolean daemon = true;

/**
* Construct an EmbeddedZooKeeper with a random port.
*/
public EmbeddedZooKeeper() {
clientPort = SocketUtils.findAvailableTcpPort();
}

/**
* Construct an EmbeddedZooKeeper with the provided port.
*
* @param clientPort
* port for ZooKeeper server to bind to
*/
public EmbeddedZooKeeper(int clientPort, boolean daemon) {
this.clientPort = clientPort;
this.daemon = daemon;
}

/**
* Returns the port that clients should use to connect to this embedded server.
*
* @return dynamically determined client port
*/
public int getClientPort() {
return this.clientPort;
}

/**
* Specify whether to start automatically. Default is true.
*
* @param autoStartup
* whether to start automatically
*/
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

/**
* {@inheritDoc}
*/
public boolean isAutoStartup() {
return this.autoStartup;
}

/**
* Specify the lifecycle phase for the embedded server.
*
* @param phase
* the lifecycle phase
*/
public void setPhase(int phase) {
this.phase = phase;
}

/**
* {@inheritDoc}
*/
public int getPhase() {
return this.phase;
}

/**
* {@inheritDoc}
*/
public boolean isRunning() {
return (zkServerThread != null);
}

/**
* Start the ZooKeeper server in a background thread.
* <p>
* Register an error handler via {@link #setErrorHandler} in order to handle any exceptions thrown during startup or
* execution.
*/
public synchronized void start() {
if (zkServerThread == null) {
zkServerThread = new Thread(new ServerRunnable(), "ZooKeeper Server Starter");
zkServerThread.setDaemon(daemon);
zkServerThread.start();
}
}

/**
* Shutdown the ZooKeeper server.
*/
public synchronized void stop() {
if (zkServerThread != null) {
// The shutdown method is protected...thus this hack to invoke it.
// This will log an exception on shutdown; see
// https://issues.apache.org/jira/browse/ZOOKEEPER-1873 for details.
try {
Method shutdown = ZooKeeperServerMain.class.getDeclaredMethod("shutdown");
shutdown.setAccessible(true);
shutdown.invoke(zkServer);
}

catch (Exception e) {
throw new RuntimeException(e);
}

// It is expected that the thread will exit after
// the server is shutdown; this will block until
// the shutdown is complete.
try {
zkServerThread.join(5000);
zkServerThread = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for embedded ZooKeeper to exit");
// abandoning zk thread
zkServerThread = null;
}
}
}

/**
* Stop the server if running and invoke the callback when complete.
*/
public void stop(Runnable callback) {
stop();
callback.run();
}

/**
* Provide an {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. If none
* is provided, only error-level logging will occur.
*
* @param errorHandler
* the {@link ErrorHandler} to be invoked
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Runnable implementation that starts the ZooKeeper server.
*/
private class ServerRunnable implements Runnable {

public void run() {
try {
Properties properties = new Properties();
File file = new File(System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID());
file.deleteOnExit();
properties.setProperty("dataDir", file.getAbsolutePath());
properties.setProperty("clientPort", String.valueOf(clientPort));

QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
quorumPeerConfig.parseProperties(properties);

zkServer = new ZooKeeperServerMain();
ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumPeerConfig);

zkServer.runFromConfig(configuration);
} catch (Exception e) {
if (errorHandler != null) {
errorHandler.handleError(e);
} else {
logger.error("Exception running embedded ZooKeeper", e);
}
}
}
}

}
package org.test;

import org.apache.dubbo.config.spring.context.annotation.DubboComponentScan; import org.apache.dubbo.config.spring.context.annotation.
import org.springframework.boot.SpringApplication; import org.springframework.boot.
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.
import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.

/**
* @author cjbc.annotation.EnableTransactionManagement; /**
* @author cjb
* @date 2019/10/24
*/
@EnableTransactionManagement.
@ComponentScan(basePackages = {"org.test.config", "org.test.service.impl"})
@DubboComponentScan(basePackages = "org.test.service.impl")
@SpringBootApplication
public class ProviderApplication {

public static void main(String[] args) {
new EmbeddedZooKeeper(2181, false).start();
SpringApplication app = new SpringApplication(ProviderApplication.class);
app.run(args);
}

}

create entity package org.test.entity and the creation of entity class Test used to lombok, details of Baidu, eclipse installed lombok plug-in

package org.test.entity;

import java.io.Serializable;
import java.time.LocalDateTime;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

/**
* <p>
* Functions
* </p
*
* @author Funkye
* @since 2019-04-23
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value = "test对象", description = "功能")
public class Test implements Serializable {

private static final long serialVersionUID = 1L;

@ApiModelProperty(value = "主键")
@TableId(value = "id", type = IdType.AUTO)
private Integer id;

@ApiModelProperty(value = "one")
@TableField("one")
private String one;

@ApiModelProperty(value = "two")
@TableField("two")
private String two;

@ApiModelProperty(value = "createTime")
@TableField("createTime")
private LocalDateTime createTime;

}

Create service, service.impl, mapper and other packages, in turn create ITestservice, and the implementation class, mapper.

package org.test.service;

import org.test.entity.Test;

import com.baomidou.mybatisplus.extension.service.IService;

/**
* <p>
* Function Service class
* </p
*
* @author Funkye
* @since 2019-04-10
*/
public interface ITestService extends IService<Test> {

}

import org.apache.dubbo.config.annotation.Service;
import org.test.entity.Test;
import org.test.mapper.TestMapper;
import org.test.service.ITestService;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

@Service(version = "1.0.0",interfaceClass =ITestService.class )
public class TestServiceImpl extends ServiceImpl<TestMapper, Test> implements ITestService {

}

 package org.test.mapper;

import org.test.entity.Test;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
* <p>
* Functional Mapper interface
* </p>
*
* @author Funkye
* @since 2019-04-10
*/
public interface TestMapper extends BaseMapper<Test> {

}

Create org.test.config package, create SeataAutoConfig.java, configuration information are here, the main role for the proxy data, connect to the transaction service grouping

package org.test.config;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import com.alibaba.druid.pool.DruidDataSource;

import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties;
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);

@Bean(name = "druidDataSource")
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}", dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("load dataSource........");
return druidDataSource;
}

/**
* init datasource proxy
* @Param: druidDataSource datasource bean instance
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean(name = "dataSource")
@Primary // In the same DataSource, first use the labelled DataSource
public DataSourceProxy dataSourceProxy(@Qualifier(value = "druidDataSource") DruidDataSource druidDataSource) {
logger.info("Proxy dataSource ........") ;
return new DataSourceProxy(druidDataSource);
}

/**
* init global transaction scanner
* @Return: GlobalTransactionScanner
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("Configuring seata........") ;
return new GlobalTransactionScanner("test-service", "test-group");
}
}

Then create the configuration file MybatisPlusConfig, which is required for mybatisplus.

package org.test.config;

import java.util.ArrayList;
import java.util.List;

import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.baomidou.mybatisplus.core.parser.ISqlParser;
import com.baomidou.mybatisplus.extension.parsers.BlockAttackSqlParser;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;

@Configuration
// @MapperScan("com.baomidou.springboot.mapper*") // This annotation is equivalent to @Bean below.
// MapperScannerConfigurer, 2 configurations of a copy can be
public class MybatisPlusConfig {

/**
* mybatis-plus paging plugin <br
* Documentation: http://mp.baomidou.com<br>
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
List<ISqlParser> sqlParserList = new ArrayList<ISqlParser>();
// Attack the SQL blocking parser and join the parse chain.
sqlParserList.add(new BlockAttackSqlParser());
paginationInterceptor.setSqlParserList(sqlParserList);
return paginationInterceptor;
}

/**
* Equivalent to the top: {@code @MapperScan("com.baomidou.springboot.mapper*")} Here it can be extended, e.g., using a configuration file to configure the path to scan the Mapper
*/

@Bean
public MapperScannerConfigurer mapperScannerConfigurer() {
MapperScannerConfigurer scannerConfigurer = new MapperScannerConfigurer();
scannerConfigurer.setBasePackage("org.test.mapper");
return scannerConfigurer;
}

}

Create the resources directory, create the mapper folder, application.yml and other files.

server:
port: 38888
spring:
application:
name: test-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
dubbo:
protocol:
loadbalance: leastactive
threadpool: cached
scan:
base-packages: org。test.service
application:
qos-enable: false
name: testserver
registry:
id: my-registry
address: zookeeper://127.0.0.1:2181?client=curator
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config:
db-config:
field-strategy: not-empty
id-type: auto
db-type: mysql
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
auto-mapping-unknown-column-behavior: none

create file.conf, here the service within the vgroup_mapping. your transaction grouping, for example, on the ** face SeataAutoConfig configured within the test-group, then here should also be changed to test-group **, and then the following ip port are seata running ip and port on the line!

transport {
type = "TCP"
server = "NIO"
heartbeat = true
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
boss-thread-size = 1
worker-thread-size = 8
}
shutdown {
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
vgroup_mapping.test-group = "default"
default.grouplist = "127.0.0.1:8091"
enableDegrade = false
disable = false
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}

client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
tm.commit.retry.count = 1
tm.rollback.retry.count = 1
undo.log.table = "undo_log"
}

recovery {
committing-retry-period = 1000
asyn-committing-retry-period = 1000
rollbacking-retry-period = 1000
timeout-retry-period = 1000
}

transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
undo.log.save.days = 7
undo.log.delete.period = 86400000
undo.log.table = "undo_log"
}

metrics {
enabled = false
registry-type = "compact"
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}

support {
spring {
datasource.autoproxy = false
}
}

Create registry.conf to specify ip ports for file, zk and so on.

registry {
type = "file"
file {
name = "file.conf"
}
}
config {
type = "file"
file {
name = "file.conf"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
}

Great success, you can directly run it, this time to observe the seata-server20191129142115

Next, we create test-client project, here will not repeat, with the above test-service the same way to create

Next, we copy the test-service service and entities within the past, of course, you are too much trouble, you can get a separate sub-project to put a general service and entities, some tools and so on, I'm here in order to quickly build this demo, the choice of copy and paste the way.

Directory structure:

Then we create ClientApplication.

package org.test;

import java.util.TimeZone;
import java.util.concurrent.Executor;

import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration;

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, MybatisPlusAutoConfiguration.class})
@EnableScheduling
@EnableAsync
@Configuration
@EnableDubbo(scanBasePackages = {"org.test.service"})
@ComponentScan(basePackages = {"org.test.service", "org.test.controller", "org.test.config"})
public class ClientApplication {
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
SpringApplication app = new SpringApplication(ClientApplication.class);
app.run(args);
}

@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
return new ThreadPoolTaskExecutor();
}
}

Then go to the config package and create SwaggerConfig :

package org.test.config;

import java.util.ArrayList;
import java.util.List;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.service.Parameter;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
public class SwaggerConfig {
// swagger2 configuration file, here you can configure the swagger2 some basic content, such as scanning packages and so on
@Bean
public Docket createRestApi() {
List<Parameter> pars = new ArrayList<Parameter>(); return new Docket(DocumentationText)
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
// Path to the current package
.apis(RequestHandlerSelectors.basePackage("org.test.controller")).paths(PathSelectors.any()).build()
.globalOperationParameters(pars);
}

// Build the api document's details function, noting which annotation is referenced here
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
// The title of the page
.title("Project Interface")
// Creator
.contact(new Contact("FUNKYE", "", ""))
// Version number
.version("1.0")
// Description
.description("API description").build();
}
}

and then create SpringMvcConfigure, and then put inside the configuration of seata, I'm lazy in order to directly integrated in the mvc configuration of the class, you can standardise the point can be created in addition to the configuration of a seata class, you can find the following is still a group name, I have two projects are assigned to a group to go, it seems that another take a also It's okay.

package org.test.config;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.dubbo.config.annotation.Reference;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.filter.CorsFilter;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.servlet.view.InternalResourceViewResolver;

import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson.support.config.FastJsonConfig;
import com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter;
import com.google.common.collect.Maps;

import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SpringMvcConfigure implements WebMvcConfigurer {

@Bean
public FilterRegistrationBean corsFilter() {
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedOrigin("*");
config.addAllowedHeader(CorsConfiguration.ALL); config.addAllowedHeader(CorsConfiguration.ALL);
config.addAllowedMethod(CorsConfiguration.ALL); config.addAllowedMethod(CorsConfiguration.ALL);
source.registerCorsConfiguration("/**", config);
FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(new CorsFilter(source));
filterRegistrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE);
filterRegistrationBean.setOrder(1);
filterRegistrationBean.setEnabled(true);
filterRegistrationBean.addUrlPatterns("/**");
Map<String, String> initParameters = Maps.newHashMap();
initParameters.put("excludes", "/favicon.ico,/img/*,/js/*,/css/*");
initParameters.put("isIncludeRichText", "true");
filterRegistrationBean.setInitParameters(initParameters); return filterRegistrationBean.
return filterRegistrationBean; }
}

@Bean
public InternalResourceViewResolver viewResolver() {
InternalResourceViewResolver viewResolver = new InternalResourceViewResolver(); viewResolver.setPrefix("/WEB-INF")
viewResolver.setPrefix("/WEB-INF/jsp/");
viewResolver.setSuffix(".jsp");
// viewResolver.setViewClass(JstlView.class); // This property does not usually need to be configured manually.
// This property does not usually need to be configured manually, as higher versions of Spring will automatically detect it.
return viewResolver; // viewResolver.setViewClass(JstlView.class)
}



/**
* Replacing frame json with fastjson
*/
@Override
public void configureMessageConverters(List<HttpMessageConverter<? >> converters) {
FastJsonHttpMessageConverter fastConverter = new FastJsonHttpMessageConverter();
FastJsonConfig fastJsonConfig = new FastJsonConfig();
fastJsonConfig.setSerializerFeatures(SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue,
SerializerFeature.WriteNullStringAsEmpty, SerializerFeature.DisableCircularReferenceDetect);
// Handle garbled Chinese characters
List<MediaType> fastMediaTypes = new ArrayList<>();
fastMediaTypes.add(MediaType.APPLICATION_JSON_UTF8);
fastConverter.setSupportedMediaTypes(fastMediaTypes);
fastConverter.setFastJsonConfig(fastJsonConfig);
// Handle strings, avoiding quotes when returning strings directly.
StringHttpMessageConverter smc = new StringHttpMessageConverter(Charset.forName("UTF-8"));
converters.add(smc);
converters.add(fastConverter);
}

@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("test-client", "test-group"); }
}

}

Create the controller package, and then create the TestController under the package.

package org.test.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.test.service.DemoService;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;

/**
* <p>
* Documentation table Front-end controller
* </p
*
* @author funkye
* @since 2019-03-20
*/
@RestController
@RequestMapping("/test")
@Api(tags = "test interface")
public class TestController {

private final static Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
@Lazy
DemoService demoService;

@GetMapping(value = "testSeataOne")
@ApiOperation(value = "Test the manual rollback distributed transaction interface")
public Object testSeataOne() {
return demoService.One();
}

@GetMapping(value = "testSeataTwo")
@ApiOperation(value = "Test Exception Rollback Distributed Transaction Interface")
public Object testSeataTwo() {
return demoService.Two();
}

}

Then go to service and create the demoService you need to depend on.

package org.test.service;

import java.time.LocalDateTime;

import org.apache.dubbo.config.annotation.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.test.controller.TestController;
import org.test.entity.Test;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;

@Service
public class DemoService {
@Reference(version = "1.0.0", timeout = 60000)
private ITestService testService;
private final static Logger logger = LoggerFactory.getLogger(DemoService.class);

/**
* manual rollback example
*
* @return
*/
@GlobalTransactional
public Object One() {
logger.info("seata distribute transaction Id:{}", RootContext.getXID());
Test t = new Test();
t.setOne("1");
t.setTwo("2");
t.setCreateTime(LocalDateTime.now());
testService.save(t);
try {
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
try {
logger.info("load transaction id for rollback");
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
} catch (TransactionException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
return false;
}

/**
* throw exception and rollback
*
* @return
*/
@GlobalTransactional
public Object Two() {
logger.info("seata分布式事务Id:{}", RootContext.getXID());
logger.info("seata distribute transaction Id:{}", RootContext.getXID());
Test t = new Test();
t.setOne("1");
t.setTwo("2");
t.setCreateTime(LocalDateTime.now());
testService.save(t);
try {
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException();
}
}
}

Create the resources folder as usual, starting with the common application.yml.

spring:
application:
name: test
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test?userSSL=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai
username: root
password: 123456
mvc:
servlet:
load-on-startup: 1
http:
encoding:
force: true
charset: utf-8
enabled: true
multipart:
max-file-size: 10MB
max-request-size: 10MB
dubbo:
registry:
id: my-registry
address: zookeeper://127.0.0.1:2181?client=curator
# address: zookeeper://127.0.0.1:2181?client=curator
application:
name: dubbo-demo-client
qos-enable: false
server:
port: 28888
max-http-header-size: 8192
address: 0.0.0.0
tomcat:
max-http-post-size: 104857600

Copy the service configuration file and registry file, if your client group name is changed in the configuration class, then the group name in the file file needs to be changed as well.

The complete directory structure as above, this time you can start test-service, then start test-client, to swagger test it!

  1. Visit 127.0.0.1:28888/swagger-ui.html to do the final finish

20191129143124

Here's the data I've saved a record, let's see if we'll successfully rollback:

20191129143252

Refresh the database, found that there is still only one data:

20191129143124

And then check the log.

20191129143407

It shows that it has been rolled back, let's look at the log from seata-server again:

Display rollback success, transaction id is also consistent, this is our distributed transaction on the run through, through the interruption point way, you can view the undo_log, you will find that before the transaction is committed, will be deposited into a transaction information data, if the rollback is successful, the information will be deleted.

Summary

seata's integration is still relatively simple and easy to start, a little more attentive you must write better than me!

Welcome to read more seata, dubbo and other source code, can solve the business encountered a lot of pit oh!

· 3 min read

When analysing the source code of the startup section, I found that GlobalTransactionScanner will start both RM and TM client, but according to Seata's design, TM is responsible for global transaction operation, if a service does not need to open global transaction, then there is no need to start TM client, that is to say, if there is no global transaction annotation in the project, then there is no need to initialize TM client, because not every microservice needs GlobalTransactional, it just acts as an RM client. That is to say, if there is no global transaction annotation in the project, there is no need to initialise the TM client at this time, because not every microservice needs GlobalTransactional, and it is only used as an RM client at this time.

So I proceeded to change the initialisation rules of GlobalTransactionScanner slightly, since previously GlobalTransactionScanner called the initialisation method in the afterPropertiesSet() method of InitializingBean, the afterPropertySet() method was used to initialise the TM client. AfterPropertySet() is only called after the current bean is initialised, there is no way to know if the Spring container has a global transaction annotation.

Therefore, I removed the InitializingBean and implemented ApplicationListener instead, checking for GlobalTransactional annotations during bean instantiation, and then calling RM and TM client initialisation methods after the Spring container initialisation is complete. Finally, after the Spring container is initialised, the RM and TM client initialisation methods are called, and then you can decide whether to start the TM client or not, depending on whether the GlobalTransactional annotation is used in the project or not.

Here is the PR address: https://github.com/apache/incubator-seata/pull/1936

As we discussed in pr, the current design of Seata is that only the TM at the initiator can initiate GlobalRollbackRequest, and the RM can only send BranchReport(false) to report the branch status to the TC server, and cannot send GlobalRollbackRequest directly to perform global rollback. operation. The interaction logic is as follows:

!

According to the above design model, the TM client can be started on demand.

However, in the later optimisation iterations of Seata, there is one more point that needs to be considered:

When an exception occurs in a participant, is it possible to initiate a global rollback directly from the participant's TM client? This also means that the cycle time of distributed transactions can be shortened, and global locks can be released as soon as possible so that other transactions with conflicting data can acquire locks and execute as soon as possible.

!

That is to say, in a global transaction, as long as one RM client fails to execute a local transaction, the TM client of the current service will directly initiate a global transaction rollback, so there is no need to wait for the TM of the initiator to initiate a resolution rollback notification. To achieve this optimisation, each service needs to start both the TM client and the RM client.

Zhang Chenghui, currently working in the Information Centre of China Communication Technology, Technology Platform Department, as a Java engineer, mainly responsible for the development of China Communication messaging platform and the whole link pressure test project, love to share technology, WeChat public number "back-end advanced" author, technology blog (https://objcoding.com/) Blogger, Seata Contributor, GitHub ID: objcoding.

· 13 min read

From the previous article "Design Principles of Distributed Transaction Middleware Seata", we talked about some design principles of Seata AT pattern, from which we also know the three roles of AT pattern (RM, TM, TC). I will update the Seata source code analysis series. Today, we are going to analyse what Seata AT mode does at startup.

Client Startup Logic

TM is responsible for the whole global transaction manager, so a global transaction is started by TM, TM has a global management class GlobalTransaction, the structure is as follows:

io.seata.tm.api.GlobalTransaction

public interface GlobalTransaction {

void begin() throws TransactionException.

void begin(int timeout) throws TransactionException.

void begin(int timeout, String name) throws TransactionException; void commit() throws TransactionException.

void commit() throws TransactionException.

void rollback() throws TransactionException.

GlobalStatus getStatus() throws TransactionException; // ...

// ...
}

It is possible to create a GlobalTransaction via GlobalTransactionContext and then use the GlobalTransaction to open, commit, rollback, etc., a global transaction, so we're using the Seata AT mode directly in an API way:

//init seata; TMClient.init(application)
TMClient.init(applicationId, txServiceGroup); RMClient.init(applicationId, txServiceGroup)
RMClient.init(applicationId, txServiceGroup);
//trx
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
tx.begin(60000, "testBiz");
// Transaction
// ...
tx.commit(); } catch (Exception exx)
} catch (Exception exx) {
tx.rollback(); } catch (Exception exx) { tx.rollback(); }
throw exx; } catch (Exception exx) { tx.rollback(); throw exx; }
}

If you write this every time you use a global transaction, it will inevitably cause code redundancy, our projects are based on the Spring container, we can use the characteristics of Spring AOP, with template patterns to encapsulate this redundant code in the template, reference Mybatis-spring also does this thing, so let's analyse what a Spring-based So let's analyse what a Spring-based project does when it starts Seata and registers a global transaction.

We enable a global transaction by adding the @GlobalTransactional annotation to the method. Seata's Spring module has a GlobalTransactionScanner, which has the following inheritance relationship:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitialisingBean, ApplicationContextAware, DisposableBean {
// ...
}

During the startup of a Spring-based project, the following initialisation process occurs for this class:

! image-20191124155455309

The afterPropertiesSet() method of InitialisingBean calls the initClient() method:

io.seata.spring.annotation.GlobalTransactionScanner#initClient

TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup).

Initialisation operations are done for TM and RM.

  • TM initialisation

io.seata.tm.TMClient#init

public static void init(String applicationId, String transactionServiceGroup) {
// Get the TmRpcClient instance.
TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup); // Initialise the TM Client.
// Initialise the TM Client
tmRpcClient.init();
}

Calling the TmRpcClient.getInstance() method acquires an instance of the TM Client, and during the acquisition process creates the Netty Client Profile object, as well as the messageExecutor thread pool, which is used to handle various message interactions with the server, and during the creation of the TmRpcClient instance, the Create a ClientBootstrap, which is used to manage the start and stop of the Netty service, and a ClientChannelManager, which is used to manage the Netty client object pool, which is used in conjunction with the Netty part of Seata, and which will be discussed later in the Analysing Networks module.

io.seata.core.rpc.netty.AbstractRpcRemotingClient#init

public void init() {
clientBootstrap.start();
// Timer to try to connect to the server
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME
KEEP_ALIVE_TIME, TimeUnit.
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
super.init();
}

Calling the TM client init() method will eventually start the netty client (it's not really started yet, it will be started when the object pool is called); a timed task is started to resend the RegisterTMRequest (the RM client sends the RegisterRMRequest) request to try to connect to the server, the logic for this is The logic is that the client channel is cached in channels in the NettyClientChannelManager, so if the channels don't exist and are out of date, then it will try to connect to the server in order to fetch the channel again and cache it in channels; a separate thread is started to handle asynchronous request sending. This is a very clever use of the network module, which will be analysed later in the analysis.

io.seata.core.rpc.netty.AbstractRpcRemoting#init

public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
public void run() { scheduleAtFixedRate(new Runnable() {
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
if (entry.getValue().isTimeout()) {
futures.remove(entry.getKey());
entry.getValue().setResultMessage(null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}

nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

In the init method of AbstractRpcRemoting, it opens a timer task, which is mainly used to clear the expired futrue of futures. futures is a future object that saves the results of sending requests, and this object has a timeout period, after which an exception will be thrown. Therefore, you need to clear the expired futures regularly.

  • RM Initialisation
io.seata.rm.RMClient#init
public static void init(String applicationId, String transactionServiceGroup) {
RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
rmRpcClient.setResourceManager(DefaultResourceManager.get());
rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get());
rmRpcClient.init();
}

RmRpcClient.getInstance handles the same logic as the TM; ResourceManager is the RM resource manager responsible for branch transaction registration, commit, report, and rollback operations, as well as global lock querying operations, and DefaultResourceManager will hold all current RM resource managers. The DefaultResourceManager holds all current RM resource managers. DefaultResourceManager will hold all the current RM resource managers for unified call processing, and get() method is mainly to load the current resource manager, mainly using a mechanism similar to SPI, for flexible loading, as shown in the following figure, Seata will scan the META- INF/services/ directory for configuration classes and load them dynamically.

ClientMessageListener is a RM message listener, which is responsible for processing commands sent from TC and performing branch commit, branch rollback, and undo log deletion operations on the branch; finally, the init method follows the same logic as the TM; DefaultRMHandler encapsulates some of the specific operation logic of RM branching transactions. logic.

Let's take a look at what the wrapIfNecessary method does.

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // Determine if there is a global transaction scanner turned on?
// Determine if global transactions are enabled
if (disableGlobalTransaction) {
Returns the bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
Return the bean;
}
Interceptor = null;
// Check the TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { //TCC interceptor, proxy bean for sofa:reference/dubbo.
//TCC interceptor, proxy bean for sofa:reference/dubbo:reference and LocalTCC.
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<? > serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

// Determine if the bean has the GlobalTransactional and GlobalLock annotations.
if (!existsAnnotation(new Class[]{serviceInterface}))
&& !existsAnnotation(interfacesIfJdk)) {
Return the bean;
}

if (interceptor == null) { // create the proxy class
// Create the proxy class
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}

LOGGER.info("Bean [{}] with name [{}] would use interceptor [{}]",
bean.getClass().getName(),beanName,interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// Perform wrapping the target object to the proxy object
Advisor[] advisor = super.buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
Returns the bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

GlobalTransactionScanner inherits AbstractAutoProxyCreator for Spring AOP support, and as you can see from the code, GlobalTransactionalInterceptor is used instead of the methods annotated with GlobalTransactional and GlobalLock annotated methods.

GlobalTransactionalInterceptor implements MethodInterceptor: method interceptor.


io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<? > targetClass = methodInvocation.getThis() ! = null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation ! = null) { // globalTransactionalAnnotation !
// globalTransactionalAnnotation
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation ! = null) { // globalLockAnnotation !
// global lock annotation
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}

The above is the logic executed by the proxy method, where the handleGlobalTransaction() method calls the TransactionalTemplate template inside: io.seata.spring.annotation.GlobalTransactionalInterceptor #handleGlobalTransaction()

io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction

private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public TransactionInfo getTransactionInfo() {
// ...
}
});
} catch (TransactionalExecutor.ExecutionException e) {
// ...
}
}

The handleGlobalTransaction() method executes the execute method of the TransactionalTemplate template class:

io.seata.tm.api.TransactionalTemplate#execute

public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.1 get transactionInfo = GlobalTransactionContext.

// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo = txInfo.getCurrentOrCreate())
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist"); }
}
try {

// 2. begin transaction


Object rs = null; } try { // 2.
try {

// Do Your Business
rs = business.execute(); } catch (Throwable ex) { // Do Your Business.

} catch (Throwable ex) {

// 3. the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex); } throw ex; }
throw ex; } catch (Throwable ex) { // 3.
}

// 4. everything is fine, commit.
commitTransaction(tx); return rs; }

commitTransaction(tx); return rs.
} finally {
} finally { // 5. clear
triggerAfterCompletion(); cleanUp(); }
cleanUp();
}
}

Doesn't the above give you a sense of déjà vu? That's right, the above is often written when we use the API redundant code, now Spring through the proxy model, the redundant code are encapsulated with the template inside it, it will be those redundant code is encapsulated in a unified process processing, and do not need to show you write out, interested can also go to look at the source code of the Mybatis-spring, is also written very exciting.

server-side processing logic

The server receives the client's connection, that is, of course, the channel is also cached up, also said that the client will send RegisterRMRequest/RegisterTMRequest request to the server, the server receives the ServerMessageListener listener will be called to deal with:

io.seata.core.rpc.ServerMessageListener

public interface ServerMessageListener {
// Handles various transactions, such as branch registration, branch commit, branch report, branch rollback, etc.
void onTrxMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender); // Handle the registration of RM clients.
// Handle the registration of the RM client's connection
void onRegRmMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender); // Handle the registration of the RM client.
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler); // Handle the registration of the TM client.
// Handle the registration of the TM client's connection.
void onRegTmMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler)
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler); // Handle TM client's registered connection.
// The server maintains a heartbeat with the client
void onCheckMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender)

}

ChannelManager is the manager of the server channel, every time the server communicates with the client, it needs to get the corresponding channel of the client from the ChannelManager, which is used to save the cache structure of the TM and RM client channel as follows:

/**
* resourceId -> applicationId -> ip -> port -> RpcContext
*/
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer.
RpcContext>>>>
RM_CHANNELS = new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext
RpcContext>>>>();

/**
* ip+appname,port
*/
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS
= new ConcurrentHashMap<String, ConcurrentMap<Integer, RpcContext>>();

The above Map structure is a bit complicated:

RM_CHANNELS:

  1. resourceId refers to the database address of the RM client;
  2. applicationId refers to the service Id of the RM client, for example, account-service in springboot's configuration spring.application.name=account-service is the applicationId. 3. ip refers to the service Id of the RM client, for example, account-service in spring.application.name=account-service is the applicationId;
  3. ip refers to the RM client service address. 4. port refers to the RM client service address;
  4. port refers to the RM client service address;
  5. RpcContext saves the information of this registration request.

TM_CHANNELS:

  1. ip+appname: the comment here should be written wrongly, it should be appname+ip, that is, the first key of the Map structure of TM_CHANNELS is appname+ip;
  2. port: the port number of the client.

The following is the RM Client registration logic:

io.seata.core.rpc.ChannelManager#registerRMChannel

public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(resourceManagerRequest.getVersion()); // Register the ResourceIds database.
// Put the ResourceIds database connection connection information into a set
Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds()); // put the ResourceIds database connection connection information into a set.
RpcContext rpcContext;
// Determine if the channel information is available from the cache
if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
// Build the rpcContext based on the request registration information.
rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
resourceManagerRequest.getTransactionServiceGroup(), resourceManagerRequest.getResourceIds(), channel);
// Put the rpcContext into the cache
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS); } else { rpcContext.
} else {
rpcContext = IDENTIFIED_CHANNELS.get(channel);
rpcContext.addResources(dbkeySet);
}
if (null == dbkeySet || dbkeySet.isEmpty()) { return; }
for (String resourceId : dbkeySet) {
String clientIp; // Store the request information into RM_Request.
// Store the request information into RM_CHANNELS, using java8's computeIfAbsent method.
ConcurrentMap<Integer, RpcContext> portMap = RM_CHANNELS.computeIfAbsent(resourceId, resourceIdKey -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceManagerRequest.getApplicationId(), applicationId -> new ConcurrentHashMap<>())
.computeIfAbsent(clientIp = getClientIpFromChannel(channel), clientIpKey -> new ConcurrentHashMap<>());
// Put the current rpcContext into the portMap.
rpcContext.holdInResourceManagerChannels(resourceId, portMap);
updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId()); }
}
}

From the above code logic, we can see that the registration of RM client is mainly to put the registration request information into RM_CHANNELS cache, and at the same time, we will also judge from IDENTIFIED_CHANNELS whether the channel of this request has been verified or not, and the structure of IDENTIFIED_CHANNELS is as follows:

private static final ConcurrentMap<Channel, RpcContext> IDENTIFIED_CHANNELS
= new ConcurrentHashMap<>();

IDENTIFIED_CHANNELS contains all TM and RM registered channels.

The following is the TM registration logic:

io.seata.core.rpc.ChannelManager#registerTMChannel

public static void registerTMChannel(RegisterTMRequest request, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(request.getVersion());
// Build the RpcContext based on the request registration information.
RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
request.getApplicationId(), request.getTransactionServiceHolder(NettyPoolKey.TransactionRole.
request.getApplicationId(), request.getTransactionServiceGroup(),
null, channel);
// Put the RpcContext into the IDENTIFIED_CHANNELS cache.


rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS); // put RpcContext into IDENTIFIED_CHANNELS cache; rpcContext.
// account-service:127.0.0.1:63353
String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
+ getClientIpFromChannel(channel);
// Store the request information in the TM_CHANNELS cache.
TM_CHANNELS.putIfAbsent(clientIdentified, new ConcurrentHashMap<Integer, RpcContext>()); // put the request information into the TM_CHANNELS cache.
// Create the get from the previous step, and then put the rpcContext into the value of the map.
ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = TM_CHANNELS.get(clientIdentified);
rpcContext.holdInClientChannels(clientIdentifiedMap);
}

The registration of TM client is similar, the information registered is put into the corresponding cache, but the registration logic is simpler than that of RM client, mainly because RM client involves the information of branch transaction resources, and the information needed to be registered will be more than that of TM client.

The above source code analysis is based on version 0.9.0.

About the Author

Zhang Chenghui, currently working in the Information Centre of China Communication Technology, Technology Platform Department, as a Java engineer, mainly responsible for the development of China Communication messaging platform and the whole link pressure test project, love to share technology, WeChat public number "back-end advanced" author, technology blog (https://objcoding.com/) Blogger, Seata Contributor, GitHub ID: objcoding.

· 20 min read

Seata, short for Simple Extensible Autonomous Transaction Architecture, is an all-in-one distributed transaction solution. It provides AT, TCC, Saga, and XA transaction modes. This article provides a detailed explanation of the Saga mode within Seata, with the project hosted on GitHub.

Author: Yiyuan (Chen Long), Core Developer of Distributed Transactions at Ant Financial.

Pain Points in Financial Distributed Application Development

Distributed systems face a prominent challenge where a business process requires a composition of various services. This challenge becomes even more pronounced in a microservices architecture, as it necessitates consistency guarantees at the business level. In other words, if a step fails, it either needs to roll back to the previous service invocation or continuously retry to ensure the success of all steps. - From "Left Ear Wind - Resilient Design: Compensation Transaction"

In the domain of financial microservices architecture, business processes are often more complex. Processes are lengthy, such as a typical internet microloan business process involving calls to more than ten services. When combined with exception handling processes, the complexity increases further. Developers with experience in financial business development can relate to these challenges.

During the development of financial distributed applications, we encounter several pain points:

  • Difficulty Ensuring Business Consistency

    In many of the systems we encounter (e.g., in channel layers, product layers, and integration layers), ensuring eventual business consistency often involves adopting a "compensation" approach. Without a coordinator to support this, the development difficulty is significant. Each step requires handling "rollback" operations in catch blocks, resulting in a code structure resembling an "arrow," with poor readability and maintainability. Alternatively, retrying exceptional operations, if unsuccessful, might lead to asynchronous retries or even manual intervention. These challenges impose a significant burden on developers, reducing development efficiency and increasing the likelihood of errors.

  • Difficulty Managing Business State

    With numerous business entities and their corresponding states, developers often update the entity's state in the database after completing a business activity. Lack of a state machine to manage the entire state transition process results in a lack of intuitiveness, increases the likelihood of errors, and causes the business to enter an incorrect state.

  • Difficulty Ensuring Idempotence

    Idempotence of services is a fundamental requirement in a distributed environment. Ensuring the idempotence of services often requires developers to design each service individually, using unique keys in databases or distributed caches. There is no unified solution, creating a significant burden on developers and increasing the chances of oversight, leading to financial losses.

  • Challenges in Business Monitoring and Operations; Lack of Unified Error Guardian Capability

    Monitoring the execution of business operations is usually done by logging, and monitoring platforms are based on log analysis. While this is generally sufficient, in the case of business errors, these monitors lack immediate access to the business context and require additional database queries. Additionally, the reliance on developers for log printing makes it prone to omissions. For compensatory transactions, there is often a need for "error guardian triggering compensation" and "worker-triggered compensation" operations. The lack of a unified error guardian and processing standard requires developers to implement these individually, resulting in a heavy development burden.

Theoretical Foundation

In certain scenarios where strong consistency is required for data, we may adopt distributed transaction schemes like "Two-Phase Commit" at the business layer. However, in other scenarios, where such strong consistency is not necessary, ensuring eventual consistency is sufficient.

For example, Ant Financial currently employs the TCC (Try, Confirm, Cancel) pattern in its financial core systems. The characteristics of financial core systems include high consistency requirements (business isolation), short processes, and high concurrency.

On the other hand, in many business systems above the financial core (e.g., systems in the channel layer, product layer, and integration layer), the emphasis is on achieving eventual consistency. These systems typically have complex processes, long flows, and may need to call services from other companies (such as financial networks). Developing Try, Confirm, Cancel methods for each service in these scenarios incurs high costs. Additionally, when there are services from other companies in the transaction, it is impractical to require those services to follow the TCC development model. Long processes can negatively impact performance if transaction boundaries are too extensive.

When it comes to transactions, we are familiar with ACID, and we are also acquainted with the CAP theorem, which states that at most two out of three—Consistency (C), Availability (A), and Partition Tolerance (P)—can be achieved simultaneously. To enhance performance, a variant of ACID known as BASE emerged. While ACID emphasizes consistency (C in CAP), BASE emphasizes availability (A in CAP). Achieving strong consistency (ACID) is often challenging, especially when dealing with multiple systems that are not provided by a single company. BASE systems are designed to create more resilient systems. In many situations, particularly when dealing with multiple systems and providers, BASE systems acknowledge the risk of data inconsistency in the short term. This allows new transactions to occur, with potentially problematic transactions addressed later through compensatory means to ensure eventual consistency.

Therefore, in practical development, we make trade-offs. For many business systems above the financial core, compensatory transactions can be adopted. The concept of compensatory transactions has been proposed for about 30 years, with the Saga theory emerging as a solution for long transactions. With the recent rise of microservices, Saga has gradually gained attention in recent years. Currently, the industry generally recognizes Saga as a solution for handling long transactions.

https://github.com/aphyr/dist-sagas/blob/master/sagas.pdf[1] > http://microservices.io/patterns/data/saga.html[2]

Community and Industry Solutions

Apache Camel Saga

Camel is an open-source product that implements Enterprise Integration Patterns (EIP). It is based on an event-driven architecture and offers good performance and throughput. In version 2.21, Camel introduced the Saga EIP.

The Saga EIP provides a way to define a series of related actions through Camel routes. These actions either all succeed or all roll back. Saga can coordinate distributed services or local services using any communication protocol, achieving global eventual consistency. Saga does not require the entire process to be completed in a short time because it does not occupy any database locks. It can support requests that require long processing times, ranging from seconds to days. Camel's Saga EIP is based on MicroProfile's LRA[3] (Long Running Action). It also supports the coordination of distributed services implemented in any language using any communication protocol.

The implementation of Saga does not lock data. Instead, it defines "compensating operations" for each operation. When an error occurs during the normal process execution, the "compensating operations" for the operations that have already been executed are triggered to roll back the process. "Compensating operations" can be defined on Camel routes using Java or XML DSL (Definition Specific Language).

Here is an example of Java DSL:

// Java DSL example goes here

// action
from("direct:reserveCredit")
.bean(idService, "generateCustomId") // generate a custom Id and set it in the body
.to("direct:creditReservation")

// delegate action
from("direct:creditReservation")
.saga()
.propagation(SagaPropagation.SUPPORTS)
.option("CreditId", body()) // mark the current body as needed in the compensating action
.compensation("direct:creditRefund")
.bean(creditService, "reserveCredit")
.log("Credit ${header.amount} reserved. Custom Id used is ${body}");

// called only if the saga is cancelled
from("direct:creditRefund")
.transform(header("CreditId")) // retrieve the CreditId option from headers
.bean(creditService, "refundCredit")
.log("Credit for Custom Id ${body} refunded");

XML DSL sample:

<route>
<from uri="direct:start"/>
<saga>
<compensation uri="direct:compensation" />
<completion uri="direct:completion" />
<option optionName="myOptionKey">
<constant>myOptionValue</constant>
</option>
<option optionName="myOptionKey2">
<constant>myOptionValue2</constant>
</option>
</saga>
<to uri="direct:action1" />
<to uri="direct:action2" />
</route>

Eventuate Tram Saga

Eventuate Tram Saga[4] The framework is a Saga framework for Java microservices using JDBC/JPA. Similar to Camel Saga, it also adopts Java DSL to define compensating operations:

public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {

private SagaDefinition<CreateOrderSagaData> sagaDefinition =
step()
.withCompensation(this::reject)
.step()
.invokeParticipant(this::reserveCredit)
.step()
.invokeParticipant(this::approve)
.build();


@Override
public SagaDefinition<CreateOrderSagaData> getSagaDefinition() {
return this.sagaDefinition;
}


private CommandWithDestination reserveCredit(CreateOrderSagaData data) {
long orderId = data.getOrderId();
Long customerId = data.getOrderDetails().getCustomerId();
Money orderTotal = data.getOrderDetails().getOrderTotal();
return send(new ReserveCreditCommand(customerId, orderId, orderTotal))
.to("customerService")
.build();

...

Apache ServiceComb Saga

ServiceComb Saga[5] is also a solution for achieving data eventual consistency in microservices applications. In contrast to TCC, Saga directly commits transactions in the try phase, and the subsequent rollback phase is completed through compensating operations in reverse. What sets it apart is the use of Java annotations and interceptors to define "compensating" services.

Architecture:

Saga consists of alpha and omega, where:

  • Alpha acts as the coordinator, primarily responsible for managing and coordinating transactions;
  • Omega is an embedded agent in microservices, responsible for intercepting network requests and reporting transaction events to alpha;

The diagram below illustrates the relationship between alpha, omega, and microservices:

ServiceComb Saga

sample:

public class ServiceA extends AbsService implements IServiceA {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Autowired
private IServiceB serviceB;

@Autowired
private IServiceC serviceC;

@Override
public String getServiceName() {
return "servicea";
}

@Override
public String getTableName() {
return "testa";
}

@Override
@SagaStart
@Compensable(compensationMethod = "cancelRun")
@Transactional(rollbackFor = Exception.class)
public Object run(InvokeContext invokeContext) throws Exception {
LOG.info("A.run called");
doRunBusi();
if (invokeContext.isInvokeB(getServiceName())) {
serviceB.run(invokeContext);
}
if (invokeContext.isInvokeC(getServiceName())) {
serviceC.run(invokeContext);
}
if (invokeContext.isException(getServiceName())) {
LOG.info("A.run exception");
throw new Exception("A.run exception");
}
return null;
}

public void cancelRun(InvokeContext invokeContext) {
LOG.info("A.cancel called");
doCancelBusi();
}

Ant Financial's Practice

Ant Financial extensively uses the TCC mode for distributed transactions, mainly in scenarios where high consistency and performance are required, such as in financial core systems. In upper-level business systems with complex and lengthy processes, developing TCC can be costly. In such cases, most businesses opt for the Saga mode to achieve eventual business consistency. Due to historical reasons, different business units have their own set of "compensating" transaction solutions, basically falling into two categories:

  1. When a service needs to "retry" or "compensate" in case of failure, a record is inserted into the database with the status before executing the service. When an exception occurs, a scheduled task queries the database record and performs "retry" or "compensation." If the business process is successful, the record is deleted.

  2. Designing a state machine engine and a simple DSL to orchestrate business processes and record business states. The state machine engine can define "compensating services." In case of an exception, the state machine engine invokes "compensating services" in reverse. There is also an "error guardian" platform that monitors failed or uncompensated business transactions and continuously performs "compensation" or "retry."

Solution Comparison

Generally, there are two common solutions in the community and industry: one is based on a state machine or a process engine that orchestrates processes and defines compensation through DSL; the other is based on Java annotations and interceptors to implement compensation. What are the advantages and disadvantages of these two approaches?

ApproachProsCons
State Machine + DSL
- Business processes can be defined using visual tools, standardized, readable, and can achieve service orchestration functionality
- Improves communication efficiency between business analysts and developers
- Business state management: Processes are essentially state machines, reflecting the flow of business states
- Enhances flexibility in exception handling: Can implement "forward retry" or "backward compensation" after recovery from a crash
- Naturally supports asynchronous processing engines such as Actor model or SEDA architecture, improving overall throughput

- Business processes are composed of JAVA programs and DSL configurations, making development relatively cumbersome
- High intrusiveness into existing business if it is a transformation
- High implementation cost of the engine
Interceptor + Java Annotation
- Programs and annotations are integrated, simple development, low learning curve
- Easy integration into existing businesses
- Low framework implementation cost

- The framework cannot provide asynchronous processing modes such as the Actor model or SEDA architecture to improve system throughput
- The framework cannot provide business state management
- Difficult to achieve "forward retry" after crash recovery due to the inability to restore thread context

Seata Saga Approach

The introduction of Seata Saga can be found in Seata Saga Official Documentation[6].

Seata Saga adopts the state machine + DSL approach for the following reasons:

  • The state machine + DSL approach is more widely used in practical production scenarios.
  • Can use asynchronous processing engines such as the Actor model or SEDA architecture to improve overall throughput.
  • Typically, business systems above the core system have "service orchestration" requirements, and service orchestration has transactional eventual consistency requirements. These two are challenging to separate. The state machine + DSL approach can simultaneously meet these two requirements.
  • Because Saga mode theoretically does not guarantee isolation, in extreme cases, it may not complete the rollback operation due to dirty writing. For example, in a distributed transaction, if you recharge user A first and then deduct the balance from user B, if A user consumes the balance before the transaction is committed, and the transaction is rolled back, there is no way to compensate. Some business scenarios may allow the business to eventually succeed, and in cases where rollback is impossible, it can continue to retry the subsequent process. The state machine + DSL approach can achieve the ability to "forward" recover context and continue execution, making the business eventually successful and achieving eventual consistency.

In cases where isolation is not guaranteed: When designing business processes, follow the principle of "prefer long 款, not short 款." Long 款 means fewer funds for customers and more funds for institutions. Institutions can refund customers based on their credibility. Conversely, short 款 means less funding for institutions, and the funds may not be recovered. Therefore, in business process design, deduction should be done first.

State Definition Language (Seata State Language)

  1. Define the service call process through a state diagram and generate a JSON state language definition file.

  2. In the state diagram, a node can be a service call, and the node can configure its compensating node.

  3. The JSON state diagram is driven by the state machine engine. When an exception occurs, the state engine executes the compensating node corresponding to the successfully executed node to roll back the transaction.

    Note: Whether to compensate when an exception occurs can also be user-defined.

  4. It can meet service orchestration requirements, supporting one-way selection, concurrency, asynchronous, sub-state machine, parameter conversion, parameter mapping, service execution status judgment, exception capture, and other functions.

Assuming a business process calls two services, deducting inventory (InventoryService) and deducting balance (BalanceService), to ensure that in a distributed scenario, either both succeed or both roll back. Both participant services have a reduce method for inventory deduction or balance deduction, and a compensateReduce method for compensating deduction operations. Let's take a look at the interface definition of InventoryService:

public interface InventoryService {

/**
* reduce
* @param businessKey
* @param amount
* @param params
* @return
*/
boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);

/**
* compensateReduce
* @param businessKey
* @param params
* @return
*/
boolean compensateReduce(String businessKey, Map<String, Object> params);
}

This is the state diagram corresponding to the business process:

Example State Diagram
Corresponding JSON

{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": ["$.[businessKey]", "$.[count]"],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Expression": "[reduceInventoryResult] == true",
"Next": "ReduceBalance"
}
],
"Default": "Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException": "$.[mockReduceBalanceFail]"
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": ["java.lang.Throwable"],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}

This is the state language to some extent referring to AWS Step Functions[7].

Introduction to "State Machine" Attributes:

  • Name: Represents the name of the state machine, must be unique;
  • Comment: Description of the state machine;
  • Version: Version of the state machine definition;
  • StartState: The first "state" to run when starting;
  • States: List of states, a map structure, where the key is the name of the "state," which must be unique within the state machine;

Introduction to "State" Attributes:

  • Type: The type of the "state," such as:
    • ServiceTask: Executes the service task;
    • Choice: Single conditional choice route;
    • CompensationTrigger: Triggers the compensation process;
    • Succeed: Normal end of the state machine;
    • Fail: Exceptional end of the state machine;
    • SubStateMachine: Calls a sub-state machine;
  • ServiceName: Service name, usually the beanId of the service;
  • ServiceMethod: Service method name;
  • CompensateState: Compensatory "state" for this state;
  • Input: List of input parameters for the service call, an array corresponding to the parameter list of the service method, $. represents using an expression to retrieve parameters from the state machine context. The expression uses SpringEL[8], and if it is a constant, write the value directly;
  • Output: Assigns the parameters returned by the service to the state machine context, a map structure, where the key is the key when placing it in the state machine context (the state machine context is also a map), and the value uses $. as a SpringEL expression, indicating the value is taken from the return parameters of the service, #root represents the entire return parameters of the service;
  • Status: Mapping of the service execution status, the framework defines three statuses, SU success, FA failure, UN unknown. We need to map the execution status of the service into these three statuses, helping the framework judge the overall consistency of the transaction. It is a map structure, where the key is a condition expression, usually based on the return value of the service or the exception thrown for judgment. The default is a SpringEL expression to judge the return parameters of the service. Those starting with $Exception{ indicate judging the exception type, and the value is mapped to this value when this condition expression is true;
  • Catch: Route after catching an exception;
  • Next: The next "state" to execute after the service is completed;
  • Choices: List of optional branches in the Choice type "state," where Expression is a SpringEL expression, and Next is the next "state" to execute when the expression is true;
  • ErrorCode: Error code for the Fail type "state";
  • Message: Error message for the Fail type "state";

For more detailed explanations of the state language, please refer to Seata Saga Official Documentation[6http://seata.io/zh-cn/docs/user/saga.html].

State Machine Engine Principle:

State Machine Engine Principle

  • The state diagram in the image first executes stateA, then executes stateB, and then executes stateC;
  • The execution of "states" is based on an event-driven model. After stateA is executed, a routing message is generated and placed in the EventQueue. The event consumer takes the message from the EventQueue and executes stateB;
  • When the entire state machine is started, Seata Server is called to start a distributed transaction, and the xid is generated. Then, the start event of the "state machine instance" is recorded in the local database;
  • When a "state" is executed, Seata Server is called to register a branch transaction, and the branchId is generated. Then, the start event of the "state instance" is recorded in the local database;
  • After a "state" is executed, the end event of the "state instance" is recorded in the local database, and Seata Server is called to report the status of the branch transaction;
  • When the entire state machine is executed, the completion event of the "state machine instance" is recorded in the local database, and Seata Server is called to commit or roll back the distributed transaction;

Design of State Machine Engine:

Design of State Machine Engine

The design of the state machine engine is mainly divided into three layers, with the upper layer depending on the lower layer. From bottom to top, they are:

  • Eventing Layer:

    • Implements an event-driven architecture that can push events and be consumed by a consumer. This layer does not care about what the event is or what the consumer executes; it is implemented by the upper layer.
  • ProcessController Layer:

    • Driven by the above Eventing to execute a "empty" process. The behavior and routing of "states" are not implemented. It is implemented by the upper layer.

      Based on the above two layers, theoretically, any "process" engine can be customly extended. The design of these two layers is based on the internal design of the financial network platform.

  • StateMachineEngine Layer:

    • Implements the behavior and routing logic of each type of state in the state machine engine;
    • Provides API and state machine language repository;

Practical Experience in Service Design under Saga Mode

Below are some practical experiences summarized in the design of microservices under Saga mode. Of course, these are recommended practices, not necessarily to be followed 100%. There are "workaround" solutions even if not followed.

Good news: Seata Saga mode has no specific requirements for the interface parameters of microservices, making Saga mode suitable for integrating legacy systems or services from external institutions.

Allow Empty Compensation

  • Empty Compensation: The original service was not executed, but the compensation service was executed;
  • Reasons:
    • Timeout (packet loss) of the original service;
    • Saga transaction triggers a rollback;
    • The request of the original service is not received, but the compensation request is received first;

Therefore, when designing services, it is necessary to allow empty compensation, that is, if the business primary key to be compensated is not found, return compensation success and record the original business primary key.

Hang Prevention Control

  • Hang: Compensation service is executed before the original service;
  • Reasons:
    • Timeout (congestion) of the original service;
    • Saga transaction rollback triggers a rollback;
    • Congested original service arrives;

Therefore, check whether the current business primary key already exists in the business primary keys recorded by empty compensation. If it exists, reject the execution of the service.

Idempotent Control

  • Both the original service and the compensation service need to ensure idempotence. Due to possible network timeouts, a retry strategy can be set. When a retry occurs, idempotent control should be used to avoid duplicate updates to business data.

Summary

Many times, we don't need to emphasize strong consistency. We design more resilient systems based on the BASE and Saga theories to achieve better performance and fault tolerance in distributed architecture. There is no silver bullet in distributed architecture, only solutions suitable for specific scenarios. In fact, Seata Saga is a product with the capabilities of "service orchestration" and "Saga distributed transactions." Summarizing, its applicable scenarios are:

  • Suitable for handling "long transactions" in a microservices architecture;
  • Suitable for "service orchestration" requirements in a microservices architecture;
  • Suitable for business systems with a large number of composite services above the financial core system (such as systems in the channel layer, product layer, integration layer);
  • Suitable for scenarios where integration with services provided by legacy systems or external institutions is required (these services are immutable and cannot be required to be modified).

Related Links Mentioned in the Article

[1]https://github.com/aphyr/dist-sagas/blob/master/sagas.pdf
[2]http://microservices.io/patterns/data/saga.html
[3]Microprofile 的 LRAhttps://github.com/eclipse/microprofile-sandbox/tree/master/proposals/0009-LRA
[4]Eventuate Tram Sagahttps://github.com/eventuate-tram/eventuate-tram-sagas
[5]ServiceComb Sagahttps://github.com/apache/servicecomb-pack
[6]Seata Saga 官网文档http://seata.io/zh-cn/docs/user/saga.html
[7]AWS Step Functionshttps://docs.aws.amazon.com/zh_cn/step-functions/latest/dg/tutorial-creating-lambda-state-machine.html
[8]SpringELhttps://docs.spring.io/spring/docs/4.3.10.RELEASE/spring-framework-reference/html/expressions.html

· 19 min read

Author: Yi Yuan (Chen Long), Ant Gold Services distributed transaction framework core development.
This article is based on the topic of "Distributed Transaction Seata and its Three Patterns" shared at SOFA Meetup#3 on 11 August in Guangzhou, focusing on the background and theoretical foundation of distributed transaction, as well as the principle of Seata distributed transaction and the implementation of distributed transaction in three patterns (AT, TCC, and Saga).

The video and PPT are at the end of this article.

3 Distributed Transaction Seata Three Modes Explained-Eiyuan.jpg

I. Background of the emergence of distributed transactions

1.1 Distributed Architecture Evolution - Horizontal Splitting of Database

AntGold's business database was initially a single database with a single table, but with the rapid development of the business data scale, the data volume is getting bigger and bigger, and the single database with a single table is gradually becoming a bottleneck. So we split the database horizontally, splitting the original single database and single table into database slices.

As shown in the figure below, after splitting the database and table, the original write operation that can be completed on a database may be across multiple databases, which gives rise to cross-database transaction problems.

image.png

1.2 Distributed Architecture Evolution - Business Service Splitting

In the early stage of business development, the single business system architecture of "one piece of cake" can meet the basic business needs. However, with the rapid development of the business, the system's access and business complexity are growing rapidly, single-system architecture has gradually become the bottleneck of business development, to solve the problem of high coupling and scalability of the business system demand is becoming stronger and stronger.

As shown in the figure below, Ant Financial Services splits the single business system into multiple business systems in accordance with the design principles of Service Oriented Architecture (SOA), which reduces the coupling between the systems and enables different business systems to focus on their own business, which is more conducive to the development of the business and the scaling of the system capacity.

image.png

After the business system is split according to services, a complete business often needs to call multiple services, how to ensure data consistency between multiple services becomes a difficult problem.

II. Theoretical foundation of distributed transaction

2.1 Two-stage commit protocols

16_16_18__08_13_2019.jpg

Two phase commit protocol: transaction manager coordinates resource manager in two phases, the first phase prepares resources, that is, reserve the resources needed for the transaction, if every resource manager resource reservation succeeds, the second phase resource commit is performed, otherwise the coordinated resource manager rolls back the resources.

2.2 TCC

16_16_51__08_13_2019.jpg

TCC (Try-Confirm-Cancel) is actually a two-phase commit protocol for servitisation, business developers need to implement these three service interfaces, the first phase of the service is choreographed by the business code to call the Try interface for resource reservation, the Try interface for all participants is successful, the transaction manager will commit the transaction and call the Confirm interface for each participant The transaction manager will commit the transaction and call the Confirm interface of each participant to actually commit the business operation, otherwise the Cancel interface of each participant will be called to rollback the transaction.

2.3 Saga

3 Distributed Transactions Seata Three Patterns Explained - Yi Yuan-9.jpg

Saga is a compensation protocol. In Saga mode, there are multiple participants within a distributed transaction, and each participant is an offsetting compensation service that requires the user to implement its forward and reverse rollback operations according to the business scenario.

During the execution of a distributed transaction, the forward operations of each participant are executed sequentially, and if all forward operations are executed successfully, the distributed transaction commits. If any of the forward operations fails, the distributed transaction backs out and performs a reverse rollback on the previous participants, rolling back the committed participants and returning the distributed transaction to its initial state.

Saga theory is from the paper Sagas published by Hector & Kenneth in 1987.

Saga Positive Service and Compensation Service also need to be implemented by business developers.

III. Seata and its three patterns explained in detail

3.1 Distributed transaction Seata introduction

Seata (Simple Extensible Autonomous Transaction Architecture) is a distributed transaction solution jointly open-sourced by Ant Financial Services and Alibaba in January 2019.Seata has been open-sourced for about half a year, and currently has more than 11,000 stars. Seata has been open source for about half a year, and now has more than 11,000 stars and a very active community. We warmly welcome you to participate in the Seata community construction, together will Seata become the open source distributed transaction benchmark product.

Seata: https://[github.com/apache/incubator-seata](https://github.com/apache/incubator -seata)

image.png

3.2 Distributed Transactions Seata Product Module

As shown in the figure below, there are three major modules in Seata, namely TM, RM and TC. TM and RM are integrated with the business system as clients of Seata, and TC is deployed independently as the server of Seata.

TC is deployed independently as a Seata server. image.png

The execution flow of a distributed transaction in Seata:

  • TM opens distributed transaction (TM registers global transaction record with TC);
  • According to the business scenario, arrange the resources in the transaction such as database and service (RM reports the resource readiness status to TC);
  • TM ends the distributed transaction, and the transaction phase ends (TM notifies TC to commit/rollback the distributed transaction);
  • TC aggregates the transaction information and decides whether the distributed transaction should be committed or rolled back;
  • TC notifies all RMs to commit/rollback resources, transaction phase 2 ends;

3.3 Distributed Transactions Seata Solution

Seata has four distributed transaction solutions, AT mode, TCC mode, Saga mode and XA mode.

15_49_23__08_13_2019.jpg

2.3.1 AT Mode

In January, Seata open sourced AT Mode, a non-intrusive distributed transaction solution. In AT mode, users only need to focus on their own "business SQL", the user's "business SQL" as a phase, Seata framework will automatically generate the transaction of the two-phase commit and rollback operations.

image.png

How the AT model is non-intrusive to business :
  • Phase I:

In phase 1, Seata intercepts the "business SQL", first parses the semantics of the SQL, finds the business data to be updated by the "business SQL", and then saves it as a "before image" before updating the business data. Before the business data is updated, it will save it as "before image", then execute "business SQL" to update the business data, and after the business data is updated, it will save it as "after image", and finally generate row locks. The above operations are all done within a single database transaction, which ensures the atomicity of one phase of operation.

This ensures the atomicity of a phase of operations. image3.png

  • Second-phase commit:

If the second phase is a commit, since the "business SQL" has already been committed to the database in the first phase, the Seata framework only needs to delete the snapshot data and row locks saved in the first phase to complete the data cleanup.

image 4.png

  • Phase 2 rollback:

If the second phase is a rollback, Seata needs to rollback the "business SQL" that has been executed in the first phase to restore the business data. The way to rollback is to use "before image" to restore the business data; however, before restoring, we must first verify the dirty writing, compare the "current business data in the database" and the "after image", if the two data are not in the same state, then we will use the "after image" to restore the business data. However, before restoring, we should first check the dirty writing, compare the "current business data in database" and "after image", if the two data are completely consistent, it means there is no dirty writing, and we can restore the business data, if it is inconsistent, it means there is dirty writing, and we need to transfer the dirty writing to manual processing.

image 5.png

AT mode one phase, two phase commit and rollback are automatically generated by Seata framework, user only need to write "business SQL", then can easily access distributed transaction, AT mode is a kind of distributed transaction solution without any intrusion to business.

2.3.2 TCC Mode

In March 2019, Seata open-sourced the TCC pattern, which is contributed by Ant Gold. the TCC pattern requires users to implement Try, Confirm and Cancel operations according to their business scenarios; the transaction initiator executes the Try method in the first stage, the Confirm method in the second-stage commit, and the Cancel method in the second-stage rollback.

The transaction initiator performs Try in the first stage, Confirm in the second stage, and Cancel in the second stage. image 6.png

TCC Three method descriptions:

  • Try: detection and reservation of resources;
  • Confirm: the execution of the business operation submitted; require Try success Confirm must be successful;
  • Cancel: the release of the reserved resources;

Ant Gold's practical experience in TCC
**
16_48_02__08_13_2019.jpg

1 TCC Design - Business model is designed in 2 phases:

The most important thing for users to consider when accessing TCC is how to split their business model into two phases.

Take the "debit" scenario as an example, before accessing TCC, the debit of account A can be completed with a single SQL for updating the account balance; however, after accessing TCC, the user needs to consider how to split the original one-step debit operation into two phases and implement it into three methods, and to ensure that the first-phase Try will be successful and the second-phase Confirm will be successful if Try is successful. If Try succeeds in the first stage, Confirm will definitely succeed in the second stage.

image 7.png

As shown above, the

Try method as a one-stage preparation method needs to do resource checking and reservation. In the deduction scenario, what Try has to do is to check whether the account balance is sufficient and reserve funds for transfer, and the way to reserve is to freeze the transfer funds of account A. After the execution of the Try method, although the balance of account A is still 100, but $30 of it has been frozen and cannot be used by other transactions.

The second stage, the Confirm method, performs the real debit operation; Confirm will use the funds frozen in the Try stage to perform the debit operation; after the Confirm method is executed, the $30 frozen in the first stage has been deducted from account A, and the balance of account A becomes $70.

If the second stage is a rollback, you need to release the $30 frozen in the first stage of Try in the Cancel method, so that account A is back to the initial state, and all $100 is available.

The most important thing for users to access TCC mode is to consider how to split the business model into 2 phases, implement it into 3 methods of TCC, and ensure that Try succeeds and Confirm succeeds. Compared to AT mode, TCC mode is somewhat intrusive to the business code, but TCC mode does not have the global line locks of AT mode, and the performance of TCC will be much higher than AT mode.

2 TCC Design - Allow Null Rollback:
**
16_51_44__08_13_2019.jpg

The Cancel interface needs to be designed to allow null rollbacks. When the Try interface is not received due to packet loss, the transaction manager triggers a rollback, which triggers the Cancel interface, which needs to return to the success of the rollback when it finds that there is no corresponding transaction xid or primary key during the execution of Cancel. If the transaction service manager thinks it has been rolled back, otherwise it will keep retrying, and Cancel has no corresponding business data to roll back.

3 TCC Design - Anti-Suspension Control:
**
16_51_56__08_13_2019.jpg

The implication of the suspension is that the Cancel is executed before the Try interface, which occurs because the Try times out due to network congestion, the transaction manager generates a rollback that triggers the Cancel interface, and the Try interface call is eventually received, but the Cancel arrives before the Try. According to the previous logic of allowing empty rollback, the rollback will return successfully, the transaction manager thinks the transaction has been rolled back successfully, then the Try interface should not be executed at this time, otherwise it will generate data inconsistency, so we record the transaction xid or business key before the Cancel empty rollback returns successfully, marking this record has been rolled back, the Try interface checks the transaction xid or business key first. The Try interface first checks the transaction xid or business key to identify that the record has been rolled back, and then does not perform the business operation of Try if it has already been marked as rolled back successfully.

4 TCC Design - Power Control:
**
16_52_07__08_13_2019.jpg

Idempotence means that for the same system, using the same conditions, a single request and repeated multiple requests have the same impact on system resources. Because network jitter or congestion may timeout, transaction manager will retry operation on resources, so it is very likely that a business operation will be called repeatedly, in order not to occupy resources many times because of repeated calls, it is necessary to control idempotency when designing the service, usually we can use the transaction xid or the business primary key to judge the weight to control.

2.3.3 Saga Patterns

Saga mode is Seata's upcoming open source solution for long transactions, which will be mainly contributed by Ant Gold. In Saga mode, there are multiple participants within a distributed transaction, and each participant is an offsetting compensation service that requires users to implement its forward and reverse rollback operations according to business scenarios.

During the execution of a distributed transaction, the forward operations of each participant are executed sequentially, and if all forward operations are executed successfully, the distributed transaction commits. If any of the forward operations fails, the distributed transaction will go back and execute the reverse rollback operations of the previous participants to roll back the committed participants and bring the distributed transaction back to the initial state.

image 8.png

Saga Pattern Distributed transactions are usually event-driven and executed asynchronously between the various participants, Saga Pattern is a long transaction solution.

1 Saga pattern usage scenario
**
16_44_58__08_13_2019.jpg

Saga pattern is suitable for business systems with long business processes and the need to ensure the final consistency of transactions. Saga pattern commits local transactions at one stage, and performance can be guaranteed in the case of lock-free and long processes.

Transaction participants may be services from other companies or legacy systems that cannot be transformed and provide the interfaces required by TCC, and can use the Saga pattern.

The advantages of the Saga pattern are:

  • One-stage commit of local database transactions, lock-free, high performance;
  • Participants can use transaction-driven asynchronous execution, high throughput;
  • The compensation service is the "reverse" of the forward service, which is easy to understand and implement;

Disadvantages: The Saga pattern does not guarantee isolation because the local database transaction has already been committed in the first phase and no "reservation" action has been performed. Later we will talk about the lack of isolation of the countermeasures.
2 Saga implementation based on a state machine engine*
2 Saga implementation based on a state machine engine*
**3

17_13_19__08_13_2019.jpg

Currently there are generally two types of Saga implementations, one is achieved through event-driven architecture, and the other is based on annotations plus interceptors to intercept the business of the positive service implementation.Seata is currently implemented using an event-driven mechanism, Seata implements a state machine, which can orchestrate the call flow of the service and the compensation service of the positive service, generating a state diagram defined by a json file, and the state machine The state machine engine is driven to the operation of this map, when an exception occurs, the state machine triggers a rollback and executes the compensation services one by one. Of course, it is up to the user to decide when to trigger the rollback. The state machine can achieve the needs of service orchestration, it supports single selection, concurrency, asynchrony, sub-state machine call, parameter conversion, parameter mapping, service execution state judgement, exception catching and other functions.

3 State Machine Engine Principles

16_45_32__08_13_2019.jpg

The basic principle of this state machine engine is that it is based on an event-driven architecture, where each step is executed asynchronously, and steps flow through an event queue between steps,
greatly improving system throughput. Transaction logs are recorded at the time of execution of each step for use when rolling back in the event of an exception. Transaction logs are recorded in the database where the business tables are located to improve performance.

**4 State Machine Engine Design

16_45_46__08_13_2019.jpg

The state machine engine is divided into a three-tier architecture design, the bottom layer is the "event-driven" layer, the implementation of the EventBus and the consumption of events in the thread pool, is a Pub-Sub architecture. The second layer is the "process controller" layer, which implements a minimalist process engine framework that drives an "empty" process execution. node does, it just executes the process method of each node and then executes the route method to flow to the next node. This is a generic framework, based on these two layers, developers can implement any process engine. The top layer is the "state machine engine" layer, which implements the "behaviour" and "route" logic code of each state node, provides APIs and statechart repositories, and has some other components, such as expression languages, logic languages, and so on. There are also a number of other components, such as expression languages, logic calculators, flow generators, interceptors, configuration management, transaction logging, and so on.

5 The Saga Service Design Experience

Similar to TCC, Saga's forward and reverse services need to follow the following design principles:

1) Saga Service Design - Allow Null Compensation
**
16_52_22__08_13_2019.jpg

2) Saga Service Design - Anti-Suspension Control
**
16_52_52__08_13_2019.jpg

3) Saga Service Design - Power Control
**
3 Distributed Transactions Seata Three Patterns Explained - Yi Yuan-31.jpg

4) Saga Design - Custom Transaction Recovery Strategies
**
16_53_07__08_13_2019.jpg

As mentioned earlier, the Saga pattern does not guarantee transaction isolation, and dirty writes can occur in extreme cases. For example, in the case of a distributed transaction is not committed, the data of the previous service was modified, and the service behind the anomaly needs to be rolled back, may not be able to compensate for the operation due to the data of the previous service was modified. One way to deal with this situation is to "retry" and continue forward to complete the distributed transaction. Since the entire business process is arranged by the state machine, even after the recovery can continue to retry. So you can configure the transaction policy of the process according to the business characteristics, whether to give priority to "rollback" or "retry", when the transaction timeout, the Server side will continue to retry according to this policy.

Since Saga does not guarantee isolation, we need to achieve the principle of "long money rather than short money" in business design. Long money refers to the situation when there is a mistake and the money is too much from our point of view, and the money is too little, because if the money is too long, we can refund the money to the customer, but if it is too short, the money may not be recovered, which means that in the business design, we must give priority to "rollback" or "retry". That is, when the business is designed, it must be deducted from the customer's account before crediting the account, and if the override update is caused by the isolation problem, there will not be a case of less money.

6 Annotation and Interceptor Based Saga Implementation
**
17_13_37__08_13_2019.jpg

There is another implementation of Saga that is based on annotations + interceptors, which Seata does not currently implement. You can look at the pseudo-code above to understand it, the @SagaCompensable annotation is defined on the one method, and the compensation method used to define the one method is the compensateOne method. Then the @SagaTransactional annotation is defined on the processA method of the business process code, which starts a Saga distributed transaction, intercepts each forward method with an interceptor, and triggers a rollback operation when an exception occurs, calling the compensation method of the forward method.

**7 Comparison of Advantages and Disadvantages of the Two Saga Implementations

The following table compares the advantages and disadvantages of the two Saga implementations:

17_13_49__08_13_2019.jpg

The biggest advantage of the state machine engine is that it can be executed asynchronously through an event-driven approach to improve system throughput, service scheduling requirements can be achieved, and in the absence of isolation in the Saga model, there can be an additional "retry forward" strategy to recover from things. The biggest advantage of annotations and interceptors is that they are easy to develop and low cost to learn.

Summary

This article first reviewed the background and theoretical basis of distributed transactions, and then focused on the principles of Seata distributed transactions and three patterns (AT, TCC, Saga) of distributed transaction implementation.

Seata's positioning is a full-scenario solution for distributed transactions, and in the future there will also be XA mode of distributed transaction implementation, each mode has its own application scenarios, AT mode is a non-intrusive distributed transaction solution for scenes that do not want to transform the business, with almost zero learning cost. TCC mode is a high-performance distributed transaction solution for core systems and other scenes that have a high demand for performance. Saga mode is a long transaction solution for business systems that have long business processes and need to ensure the ultimate consistency of transactions. Saga mode submits local transactions at one stage, with no locks, and can ensure performance in the case of long processes, and is mostly used in the channel layer and integration layer of business systems. Transaction participants may be services from other companies or legacy systems that can't be transformed to provide the interfaces required by TCC, Saga mode can also be used.

The video review and PPT of this sharing can be viewed at: [https://tech.antfin.com/community/activities/779/review](https://tech.antfin.com/community/activities/779/ review)

· 12 min read

Under the microservices architecture system, we can layered design according to business modules, deployed separately, reducing the pressure of service deployment, but also decoupled from the business coupling, to avoid the application gradually become a monster, so that it can be easily scaled up, and in the case of failure of some services will not affect the normal operation of other services. In short, microservices in the rapid development of business brings us more and more advantages, but microservices are not perfect, so we can not blindly over-abuse, it has a lot of shortcomings, and will bring a certain degree of complexity to the system, which is accompanied by distributed transactions, is a microservices architectural system is bound to need to deal with a pain point, but also the industry has always been concerned about a field, and therefore there is a Theories such as CAP and BASE have emerged.

At the beginning of this year, Ali open source a distributed transaction middleware, initially named Fescar, later renamed Seata, in the beginning of its open source, I know it must be fire, because this is an open source project to solve the pain points, Seata began to rush to the business of non-intrusive and high-performance direction to go, which is exactly the solution to the problem of distributed transactions of the urgent needs of us. Because several companies have stayed with the microservices architecture, but in solving the problem of distributed transactions are not very elegant, so I have been concerned about the development of Seata, today it is briefly about some of its design principles, followed by the various modules I will be in-depth analysis of the source code , interested in can continue to pay attention to my public number or blog, do not lose with.

What are the solutions for distributed transaction resolution?

Currently distributed transaction solutions mainly have no invasion of business and invasive solutions, no invasive solutions are mainly based on the database XA protocol two-part submission (2PC) scheme, its advantage is no invasion of business code, but its shortcomings are also very obvious: the database must be required to support the XA protocol, and because of the characteristics of the XA protocol itself, it will result in a long period of time without the release of transactional resources, the locking cycle is long, and in the case of the XA protocol, it will cause a long period of time, but it will not be released. release, locking cycle is long, and in the application layer can not intervene, so it is very poor performance, its existence is equivalent to the fist of seven injuries as "hurt seven points, the loss of their own three points", so in the Internet project is not very popular this solution.

In order to make up for the low performance of this solution, the big boys have come up with a variety of solutions to solve the problem, but this invariably need to be done through the application layer, that is, invasion of the business approach, such as the well-known TCC programme, based on the TCC, there are also many mature frameworks, such as ByteTCC, tcc-transaction and so on. As well as based on the ultimate consistency of reliable messages to achieve, such as RocketMQ transaction messages.

Invasive code solutions are based on the existing situation of "last resort" solution, in fact, they are very inelegant to implement, a transaction call is usually accompanied by a series of reverse operations on the transaction interface to add a series of, for example, TCC three-stage commit, the logic of the inevitable rollback of the logic of the logic of the logic of the commit, so that the code will make the project very bloated. code will make the project very bloated, high maintenance costs.

Relationships between Seata modules

In response to the above pain points of distributed transaction solutions, it is clear that our ideal distributed transaction solution must be good performance and no intrusion into the business, the business layer does not need to care about the constraints of the distributed transaction mechanism, Seata is precisely in this direction, so it is very much worth looking forward to, it will bring qualitative improvements to our microservices architecture.

So how does Seata do it? Here's how its modules relate to each other.

Seata's design idea is that a distributed transaction can be understood as a global transaction, under which a number of branch transactions are hung, and a branch transaction is a local transaction that meets the ACID, so we can operate the distributed transaction as if it were a local transaction.

Seata internally defines three modules to deal with the relationship and processing of global and branch transactions, these three components are:

  • Transaction Coordinator (TC): The transaction coordinator maintains the state of the global transaction and is responsible for coordinating and driving the commit or rollback of the global transaction.
  • Transaction Manager (TM): Controls the boundaries of the global transaction and is responsible for opening a global transaction and ultimately initiating a global commit or global rollback resolution.
  • Resource Manager (RM): Controls branch transactions and is responsible for branch registration, status reporting, and receiving instructions from the Transaction Coordinator to drive the commit and rollback of branch (local) transactions.

Briefly describe the execution steps of the whole global transaction:

  1. TM requests TC to open a global transaction. TC creates the global transaction and returns a globally unique XID, which is propagated in the context of the global transaction;
  2. the RM registers a branch transaction with the TC, which is attributed to the global transaction with the same XID;
  3. the TM initiates a global commit or rollback to the TC;
  4. TC schedules the branch transaction under the XID to complete the commit or rollback.

How is it different from the XA scheme?

Seata's transaction commit method is basically the same as the XA protocol's two-stage commit in general, so what is the difference between them?

We all know that the XA protocol relies on the database level to ensure the consistency of transactions, that is, XA branch transactions are driven at the database level, because XA branch transactions need to have XA drivers, on the one hand, it will lead to the database and the XA driver coupling, on the other hand, it will lead to a long period of locking the resources of the transaction of the various branches, which is not popular in the Internet company! This is also an important factor that it is not popular in Internet companies.

Based on the above problems of the XA protocol, Seata another way, since the dependence on the database layer will lead to so many problems, then I'll do from the application layer to do the trick, which also has to start from Seata's RM module, the previous also said that the main role of RM, in fact, RM in the database operation of the internal agent layer, as follows:

Seata in the data source to do a layer of proxy layer, so we use Seata, we use the data source is actually using Seata's own data source proxy DataSourceProxy, Seata in this layer of the proxy to add a lot of logic, mainly parsing SQL, business data before and after the update of the data mirror organised into a rollback log, and insert the undo log log into the undo_log table to ensure that every business sql that updates data has a corresponding rollback log.

The advantage of doing this is that after the local transaction is executed, the resources locked by the local transaction can be released immediately, and then the branch status can be reported to the TC. When the TM decides to commit globally, there is no need for synchronous coordination, the TC will asynchronously schedule each RM branch transaction to delete the corresponding undo log, which is a very fast step; when the TM decides to roll back globally, the RM receives a rollback request from the TC, and then finds the corresponding undo log through the XID, and then executes the log to complete the rollback operation. operation.

RM will find the corresponding undo log through XID and execute the rollback log to complete the rollback operation.

As shown in the above figure, the RM of the XA scheme is placed in the database layer, and it relies on the XA driver of the database.

The XA scenario RM is placed at the database level as shown in the figure above.

As shown above, Seata's RM is actually placed in the application layer as middleware, and does not rely on the database for protocol support, completely stripping out the protocol support requirements of the database for distributed transaction scenarios.

How are branching transactions committed and rolled back?

Here is a detailed description of how branching transactions are committed and rolled back:

  • Stage one:

Branching transactions make use of the JDBC data source proxy in the RM module to join several processes, interpret business SQL, organise the data mirroring of business data before and after updates into a rollback log, generate an undo log log, check global transaction locks, and register branching transactions, etc., and make use of the ACID feature of the local transaction to write the business SQL and the undo log into the same The local transaction ACID feature is used to write the business SQL and undo log into the same thing and submit them to the database together to ensure that the corresponding rollback log must exist for the business SQL, and finally the branch transaction status is reported to the TC.

  • Phase II:

TM resolution global commit:

When the TM resolution is committed, there is no need for synchronous orchestration, the TC will asynchronously schedule each RM branch transaction to delete the corresponding undo logs, and this step can be completed very quickly. This mechanism is critical for performance improvement. We know that the success rate of transaction execution is very high during normal business operation, so it is possible to commit directly in the local transaction, which is a very significant step for performance improvement.

This step is very significant for performance improvement.

TM resolution global rollback:

When TM resolves to rollback, RM receives the rollback request from TC, RM finds the corresponding undo log through XID, then uses the ACID feature of the local transaction to execute the rollback log to complete the rollback operation and delete the undo log, and finally reports the rollback result to TC.

The last step is to report the rollback result to the TC.

The business is not aware of all the above processes, the business does not care about the specific global transaction commit and rollback, and the most important point is that Seata will be two-stage commit synchronisation coordination is decomposed into various branch transactions, branch transactions and ordinary local transactions are not any different, which means that after we use Seata, distributed transactions like the use of local transactions, the database layer of transaction coordination mechanism to the middleware layer. transaction coordination mechanism to the middleware layer Seata to do , so that although the transaction coordination moved to the application layer , but still can do zero intrusion into the business , thus stripping the distributed transaction scheme on the database in the protocol support requirements , and Seata in the branch transaction is completed directly after the release of resources , greatly reducing the branch transaction on the resources of the locking time , perfectly avoiding the XA protocol needs to be The problem of long resource locking time due to synchronous coordination of XA protocol is perfectly avoided.

Supplementation of other solutions

The above is actually the default mode of Seata, also known as AT mode, which is similar to the XA scheme of the two-stage submission scheme, and is non-intrusive on the business, but this mechanism still needs to rely on the database local transaction ACID characteristics, have you noticed that I have stressed in the above chart must be to support the ACID characteristics of relational databases, then the problem is, non-relational or databases that do not support ACID can not use Seata, do not panic, Seata is now prepared for us another mode, called MT mode, which is a business invasive solution, commit rollback and other operations need to be defined by us, the business logic needs to be broken down into Prepare/Commit/Rollback 3 parts, forming a MT branch The purpose of the MT model is to reach more scenarios for Seata by adding global transactions.

The point of this is to reach more scenarios for Seata.

Only, it is not Seata's "main" model, it exists only as a complementary solution, from the above official development vision can be seen, Seata's goal is to always be a non-invasive solution to the business.

Note: The design of the pictures in this article refers to the official Seata diagram.

Author Bio:

Zhang Chenghui, currently working in the technology platform department of Zhongtong Technology Information Centre as a Java engineer, mainly responsible for the development of Zhongtong messaging platform and all-links pressure testing project, love to share technology, WeChat public number "back-end progression" author, technology blog (https://objcoding.com/) Blogger, Seata Contributor, GitHub ID: objcoding.

· 5 min read

Preface

TaaS is a high-availability implementation of the Seata server (TC, Transaction Coordinator), written in Golang. Taas has been contributed to the Seata open-source community by InfiniVision (http://infinivision.cn) and is now officially open source.

Before Seata was open-sourced, we began to reference GTS and some open-source projects to implement the distributed transaction solution TaaS (Transaction as a Service).

After we completed the development of the TaaS server, Seata (then called Fescar) was open-sourced and attracted widespread attention from the open-source community. With Alibaba's platform influence and community activity, we believe that Seata will become the standard for open-source distributed transactions in the future. Therefore, we decided to make TaaS compatible with Seata.

Upon discovering that Seata's server implementation was single-node and lacked high availability, we contacted the Seata community leaders and decided to open-source TaaS to contribute to the open-source community. We will also maintain it in the long term and keep it synchronized with Seata versions.

Currently, the official Java high-availability version of Seata is also under development. TaaS and this high-availability version have different design philosophies and will coexist in the future.

TaaS has been open-sourced on GitHub (https://github.com/apache/incubator-seata-go-server). We welcome everyone to try it out.

Design Principles

  1. High Performance: Performance scales linearly with the number of machines. Adding new machines to the cluster can improve performance.
  2. High Availability: If a machine fails, the system can still provide services externally, or the service can be restored externally in a short time (the time it takes to switch leaders).
  3. Auto-Rebalance: When new machines are added to the cluster or machines are offline, the system can automatically perform load balancing.
  4. Strong Consistency: The system's metadata is stored consistently in multiple replicas.

Design

TaaS Design

High Performance

TaaS's performance scales linearly with the number of machines. To support this feature, TaaS handles the smallest unit of global transactions called a Fragment. The system sets the maximum concurrency of active global transactions supported by each Fragment upon startup. The system also samples each Fragment, and when it becomes overloaded, it generates new Fragments to handle more concurrency.

High Availability

Each Fragment has multiple replicas and one leader to handle requests. When the leader fails, the system generates a new leader to handle requests. During the election process of the new leader, the Fragment does not provide services externally, typically for a few seconds.

Strong Consistency

TaaS itself does not store the metadata of global transactions. The metadata is stored in Elasticell (https://github.com/deepfabric/elasticell), a distributed KV storage compatible with the Redis protocol. Elasticell ensures data consistency based on the Raft protocol.

Auto-Rebalance

As the system runs, there will be many Fragments and their replicas, resulting in uneven distribution of Fragments on each machine, especially when old machines are offline or new machines come online. When TaaS starts, it selects three nodes as schedulers, responsible for scheduling these Fragments to ensure that the number of Fragments and the number of leaders on each machine are roughly equal. It also ensures that the number of replicas for each Fragment remains at the specified number.

Fragment Replication Creation

Fragment Replication Creation

  1. At time t0, Fragment1 is created on machine Seata-TC1.
  2. At time t1, a replica of Fragment1, Fragment1', is created on machine Seata-TC2.
  3. At time t2, another replica of Fragment1, Fragment1", is created on machine Seata-TC3.

By time t2, all three replicas of Fragment1 are created.

Fragment Replication Migration

Fragment Replication Migration

  1. At time t0, the system has four Fragments, each existing on machines Seata-TC1, Seata-TC2, and Seata-TC3.
  2. At time t1, a new machine, Seata-TC4, is added.
  3. At time t2, replicas of three Fragments are migrated to machine Seata-TC4.

Online Quick Experience

We have set up an experience environment on the public network:

Local Quick Experience

Quickly experience TaaS functionality using docker-compose.

git clone https://github.com/seata/taas.git
docker-compse up -d

Due to the many component dependencies, the docker-compose takes about 30 seconds to start and become available for external services.

Seata Server Address

The service listens on the default port 8091. Modify the Seata server address accordingly to experience.

Seata UI

Access the WEB UI at http://127.0.0.1:8084/ui/index.html

About InfiniVision

InfiniVision is a technology-driven enterprise service provider dedicated to assisting traditional enterprises in digital transformation and upgrading using technologies such as artificial intelligence, cloud computing, blockchain, big data, and IoT edge computing. InfiniVision actively embraces open source culture and open sources core algorithms and architectures. Notable open-source products include the facial recognition software InsightFace (https://github.com/deepinsight/insightface), which has repeatedly won large-scale facial recognition challenges, and the distributed storage engine Elasticell (https://github.com/deepfabric/elasticell).

About the Author

The author, Zhang Xu, is the creator of the open-source Gateway (https://github.com/fagongzi/gateway) and currently works at InfiniVision, focusing on infrastructure-related development.

· 17 min read

Fescar

Common distributed transaction approaches include XA based on 2PC (e.g., Atomikos), TCC (e.g., ByteTCC) focusing on the business layer, and transactional messaging (e.g., RocketMQ Half Message). XA is a protocol for distributed transactions that requires support from local databases. However, the resource locking at the database level can lead to poor performance. On the other hand, TCC, introduced by Alibaba as a preacher, requires a significant amount of business code to ensure transactional consistency, resulting in higher development and maintenance costs.

Distributed transactions are a widely discussed topic in the industry, and this is one of the reasons why Fescar has gained 6k stars in a short period of time. The name "Fescar" stands for Fast & Easy Commit And Rollback. In simple terms, Fescar drives global transactions by coordinating local RDBMS branch transactions. It is a middleware that operates at the application layer. The main advantages of Fescar are better performance compared to XA, as it does not occupy connection resources for a long time, and lower development cost and business invasiveness compared to TCC.

Similar to XA, Fescar divides roles into TC (Transaction Coordinator), RM (Resource Manager), and TM (Transaction Manager). The overall transaction process model of Fescar is as follows:

Fescar事务过程

1.The TM (Transaction Manager) requests the TC (Transaction Coordinator) to start a global transaction. The global transaction is successfully created, and a globally unique XID (Transaction ID) is generated.
2.The XID is propagated in the context of the microservice invocation chain.
3.The RM (Resource Manager) registers the branch transaction with the TC, bringing it under the jurisdiction of the global transaction corresponding to the XID.
4.The TM initiates a global commit or rollback resolution for the XID with the TC.
5.The TC schedules the completion of commit or rollback requests for all branch transactions under the jurisdiction of the XID.

In the current implementation version, the TC (Transaction Coordinator) is deployed as a separate process. It is responsible for maintaining the operation records and global lock records of the global transaction, as well as coordinating and driving the global transaction's commit or rollback. On the other hand, the TM (Transaction Manager) and RM (Resource Manager) work in the same application process as the application.

The RM manages the underlying database through proxying the JDBC data source. It uses syntax parsing to retain snapshots and generate undo logs during transaction execution. This ensures that the transaction can be rolled back to its previous state if needed.

This covers the general flow and model division of Fescar. Now, let's proceed with the analysis of Fescar's transaction propagation mechanism.

Fescar Transaction Propagation Mechanism

The transaction propagation in Fescar includes both nested transaction calls within an application and transaction propagation across different services. So, how does Fescar propagate transactions in a microservices call chain? Fescar provides a transaction API that allows users to manually bind a transaction's XID and join it to the global transaction. Therefore, depending on the specific service framework mechanism, we can propagate the XID in the call chain to achieve transaction propagation.

The RPC request process consists of two parts: the caller and the callee. We need to handle the XID during the request and response. The general process is as follows: the caller (or the requester) retrieves the XID from the current transaction context and passes it to the callee through the RPC protocol. The callee extracts the XID from the request and binds it to its own transaction context, thereby participating in the global transaction. Common microservices frameworks usually provide corresponding Filter and Interceptor mechanisms. Now, let's analyze the integration process of Spring Cloud and Fescar in more detail.

Partial Source Code Analysis of Fescar Integration with Spring Cloud Alibaba

This section of the source code is entirely from spring-cloud-alibaba-fescar. The source code analysis mainly includes three parts: AutoConfiguration, the microservice provider, and the microservice consumer. Regarding the microservice consumer, it can be further divided into two specific approaches: RestTemplate and Feign. For the Feign request approach, it is further categorized into usage patterns that integrate with Hystrix and Sentine.

Fescar AutoConfiguration

For the AutoConfiguration analysis, this section will only cover the parts related to the startup of Fescar. The analysis of other parts will be interspersed in the 'Microservice Provider' and 'Microservice Consumer' sections.

The startup of Fescar requires the configuration of GlobalTransactionScanner. The GlobalTransactionScanner is responsible for initializing Fescar's RM client, TM client, and automatically proxying classes annotated with the GlobalTransactional annotation. The startup of the GlobalTransactionScanner bean is loaded and injected through GlobalTransactionAutoConfiguration, which also injects FescarProperties.

FescarProperties contains important properties of Fescar, such as txServiceGroup. The value of this property can be read from the application.properties file using the key 'spring.cloud.alibaba.fescar.txServiceGroup', with a default value of '${spring.application.name}-fescar-service-group'. txServiceGroup represents the logical transaction group name in Fescar. This group name is obtained from the configuration center (currently supporting file and Apollo) to retrieve the TC cluster name corresponding to the logical transaction group name. The TC cluster's service name is then constructed based on the cluster name. The RM client, TM client, and TC interact through RPC by using the registry center (currently supporting Nacos, Redis, ZooKeeper, and Eureka) and the service name to find available TC service nodes.

Microservice Provider

Since the logic of the consumer is a bit more complex, let's first analyze the logic of the provider. For Spring Cloud projects, the default RPC transport protocol is HTTP, so the HandlerInterceptor mechanism is used to intercept HTTP requests.

HandlerInterceptor is an interface provided by Spring, and it has three methods that can be overridden.

    /**
* Intercept the execution of a handler. Called after HandlerMapping determined
* an appropriate handler object, but before HandlerAdapter invokes the handler.
*/
default boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {

return true;
}

/**
* Intercept the execution of a handler. Called after HandlerAdapter actually
* invoked the handler, but before the DispatcherServlet renders the view.
* Can expose additional model objects to the view via the given ModelAndView.
*/
default void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
@Nullable ModelAndView modelAndView) throws Exception {
}

/**
* Callback after completion of request processing, that is, after rendering
* the view. Will be called on any outcome of handler execution, thus allows
* for proper resource cleanup.
*/
default void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
@Nullable Exception ex) throws Exception {
}

According to the comments, we can clearly see the timing and common use cases of each method. For Fescar integration, it overrides the preHandle and afterCompletion methods as needed.

The purpose of FescarHandlerInterceptor is to bind the XID passed from the service chain to the transaction context of the service node and clean up related resources after the request is completed. FescarHandlerInterceptorConfiguration is responsible for configuring the interception of all URLs. This interceptor will be executed for all incoming requests to perform XID conversion and transaction binding.

/**
* @author xiaojing
*
* Fescar HandlerInterceptor, Convert Fescar information into
* @see com.alibaba.fescar.core.context.RootContext from http request's header in
* {@link org.springframework.web.servlet.HandlerInterceptor#preHandle(HttpServletRequest , HttpServletResponse , Object )},
* And clean up Fescar information after servlet method invocation in
* {@link org.springframework.web.servlet.HandlerInterceptor#afterCompletion(HttpServletRequest, HttpServletResponse, Object, Exception)}
*/
public class FescarHandlerInterceptor implements HandlerInterceptor {

private static final Logger log = LoggerFactory
.getLogger(FescarHandlerInterceptor.class);

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {

String xid = RootContext.getXID();
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (log.isDebugEnabled()) {
log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
}

if (xid == null && rpcXid != null) {
RootContext.bind(rpcXid);
if (log.isDebugEnabled()) {
log.debug("bind {} to RootContext", rpcXid);
}
}
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception e) throws Exception {

String rpcXid = request.getHeader(RootContext.KEY_XID);

if (StringUtils.isEmpty(rpcXid)) {
return;
}

String unbindXid = RootContext.unbind();
if (log.isDebugEnabled()) {
log.debug("unbind {} from RootContext", unbindXid);
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
log.warn("bind {} back to RootContext", unbindXid);
}
}
}

}

The preHandle method is called before the request is executed. The xid parameter represents the unique identifier of the global transaction already bound to the current transaction context, while rpcXid represents the global transaction identifier that needs to be bound to the request and is passed through the HTTP header. In the preHandle method, it checks if there is no XID in the current transaction context and if rpcXid is not empty. If so, it binds rpcXid to the current transaction context.

The afterCompletion method is called after the request is completed and is used to perform resource cleanup actions. Fescar uses the RootContext.unbind() method to unbind the XID involved in the transaction context. The logic in the if statement is for code robustness. If rpcXid and unbindXid are not equal, it rebinds unbindXid.

For Spring Cloud, the default RPC method is HTTP. Therefore, for the provider, there is no need to differentiate the request interception method. It only needs to extract the XID from the header and bind it to its own transaction context. However, for the consumer, due to the variety of request components, including circuit breakers and isolation mechanisms, different situations need to be distinguished and handled. We will analyze this in more detail later.

Microservice Consumer

Fescar categorizes the request methods into RestTemplate, Feign, Feign+Hystrix, and Feign+Sentinel. Different components are automatically configured through Spring Boot's Auto Configuration. The specific configuration classes can be found in the spring.factories file, and we will also discuss the relevant configuration classes later in this document.

RestTemplate

Let's take a look at how Fescar passes XID if the consumer is using RestTemplate for requests.

public class FescarRestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);

String xid = RootContext.getXID();

if (!StringUtils.isEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
}

The FescarRestTemplateInterceptor implements the intercept method of the ClientHttpRequestInterceptor interface. It wraps the outgoing request and, if there is an existing Fescar transaction context XID, retrieves it and adds it to the HTTP headers of the request.

FescarRestTemplateInterceptor is configured in RestTemplate through FescarRestTemplateAutoConfiguration.

@Configuration
public class FescarRestTemplateAutoConfiguration {

@Bean
public FescarRestTemplateInterceptor fescarRestTemplateInterceptor() {
return new FescarRestTemplateInterceptor();
}

@Autowired(required = false)
private Collection<RestTemplate> restTemplates;

@Autowired
private FescarRestTemplateInterceptor fescarRestTemplateInterceptor;

@PostConstruct
public void init() {
if (this.restTemplates != null) {
for (RestTemplate restTemplate : restTemplates) {
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
restTemplate.getInterceptors());
interceptors.add(this.fescarRestTemplateInterceptor);
restTemplate.setInterceptors(interceptors);
}
}
}

}

The init method iterates through all the RestTemplate instances, retrieves the original interceptors from each RestTemplate, adds the fescarRestTemplateInterceptor, and then reorders the interceptors.

Feign

Feign 类关系图

Next, let's take a look at the code related to Feign. There are quite a few classes in this package, so let's start with its AutoConfiguration.

@Configuration
@ConditionalOnClass(Client.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class FescarFeignClientAutoConfiguration {

@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")
@ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")
Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {
return FescarHystrixFeignBuilder.builder(beanFactory);
}

@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU")
@ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true")
Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {
return FescarSentinelFeignBuilder.builder(beanFactory);
}

@Bean
@ConditionalOnMissingBean
@Scope("prototype")
Feign.Builder feignBuilder(BeanFactory beanFactory) {
return FescarFeignBuilder.builder(beanFactory);
}

@Configuration
protected static class FeignBeanPostProcessorConfiguration {

@Bean
FescarBeanPostProcessor fescarBeanPostProcessor(
FescarFeignObjectWrapper fescarFeignObjectWrapper) {
return new FescarBeanPostProcessor(fescarFeignObjectWrapper);
}

@Bean
FescarContextBeanPostProcessor fescarContextBeanPostProcessor(
BeanFactory beanFactory) {
return new FescarContextBeanPostProcessor(beanFactory);
}

@Bean
FescarFeignObjectWrapper fescarFeignObjectWrapper(BeanFactory beanFactory) {
return new FescarFeignObjectWrapper(beanFactory);
}
}

}

The FescarFeignClientAutoConfiguration is enabled when the Client.class exists and requires it to be applied before FeignAutoConfiguration. Since FeignClientsConfiguration is responsible for generating the FeignContext and is enabled by FeignAutoConfiguration, based on the dependency relationship, FescarFeignClientAutoConfiguration is also applied before FeignClientsConfiguration.

FescarFeignClientAutoConfiguration customizes the Feign.Builder and adapts it for feign.sentinel, feign.hystrix, and regular feign cases. The purpose is to customize the actual implementation of the Client in Feign to be FescarFeignClient.

HystrixFeign.builder().retryer(Retryer.NEVER_RETRY)
.client(new FescarFeignClient(beanFactory))
SentinelFeign.builder().retryer(Retryer.NEVER_RETRY)
.client(new FescarFeignClient(beanFactory));
Feign.builder().client(new FescarFeignClient(beanFactory));

FescarFeignClient is an enhancement of the original Feign client proxy.

public class FescarFeignClient implements Client {

private final Client delegate;
private final BeanFactory beanFactory;

FescarFeignClient(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
this.delegate = new Client.Default(null, null);
}

FescarFeignClient(BeanFactory beanFactory, Client delegate) {
this.delegate = delegate;
this.beanFactory = beanFactory;
}

@Override
public Response execute(Request request, Request.Options options) throws IOException {

Request modifiedRequest = getModifyRequest(request);

try {
return this.delegate.execute(modifiedRequest, options);
}
finally {

}
}

private Request getModifyRequest(Request request) {

String xid = RootContext.getXID();

if (StringUtils.isEmpty(xid)) {
return request;
}

Map<String, Collection<String>> headers = new HashMap<>();
headers.putAll(request.headers());

List<String> fescarXid = new ArrayList<>();
fescarXid.add(xid);
headers.put(RootContext.KEY_XID, fescarXid);

return Request.create(request.method(), request.url(), headers, request.body(),
request.charset());
}

In the above process, we can see that FescarFeignClient modifies the original Request. It first retrieves the XID from the current transaction context and, if the XID is not empty, adds it to the request's header.

FeignBeanPostProcessorConfiguration defines three beans: FescarContextBeanPostProcessor, FescarBeanPostProcessor, and FescarFeignObjectWrapper. FescarContextBeanPostProcessor and FescarBeanPostProcessor both implement the Spring BeanPostProcessor interface.

Here is the implementation of FescarContextBeanPostProcessor

    @Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
if (bean instanceof FeignContext && !(bean instanceof FescarFeignContext)) {
return new FescarFeignContext(getFescarFeignObjectWrapper(),
(FeignContext) bean);
}
return bean;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}

The two methods in BeanPostProcessor allow for pre- and post-processing of beans in the Spring container. The postProcessBeforeInitialization method is called before initialization, while the postProcessAfterInitialization method is called after initialization. The return value of these methods can be the original bean instance or a wrapped instance using a wrapper.

FescarContextBeanPostProcessor wraps FeignContext into FescarFeignContext. FescarBeanPostProcessor wraps FeignClient into FescarLoadBalancerFeignClient and FescarFeignClient, depending on whether it inherits from LoadBalancerFeignClient.

In FeignAutoConfiguration, the FeignContext does not have any ConditionalOnXXX conditions. Therefore, Fescar uses a pre-processing approach to wrap FeignContext into FescarFeignContext.

    @Bean
public FeignContext feignContext() {
FeignContext context = new FeignContext();
context.setConfigurations(this.configurations);
return context;
}

For Feign Clients, the FeignClientFactoryBean retrieves an instance of FeignContext. For custom Feign Client objects configured by developers using the @Configuration annotation, they are configured into the builder, which causes the enhanced FescarFeignClient in FescarFeignBuilder to become ineffective. The key code in FeignClientFactoryBean is as follows

	/**
* @param <T> the target type of the Feign client
* @return a {@link Feign} client created with the specified data and the context information
*/
<T> T getTarget() {
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);

if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
url += cleanPath();
return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}

The above code determines whether to make a direct call to the specified URL or use load balancing based on whether the URL parameter is specified in the annotation. The targeter.target method creates the object through dynamic proxy. The general process is as follows: the parsed Feign methods are stored in a map, and then passed as a parameter to generate the InvocationHandler, which in turn generates the dynamic proxy object.

The presence of FescarContextBeanPostProcessor ensures that even if developers customize operations on FeignClient, the enhancement of global transactions required by Fescar can still be achieved.

As for FescarFeignObjectWrapper, let's focus on the Wrapper method:

	Object wrap(Object bean) {
if (bean instanceof Client && !(bean instanceof FescarFeignClient)) {
if (bean instanceof LoadBalancerFeignClient) {
LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean);
return new FescarLoadBalancerFeignClient(client.getDelegate(), factory(),
clientFactory(), this.beanFactory);
}
return new FescarFeignClient(this.beanFactory, (Client) bean);
}
return bean;
}

In the wrap method, if the bean is an instance of LoadBalancerFeignClient, it first retrieves the actual Client object that the LoadBalancerFeignClient proxies using the client.getDelegate() method. It then wraps the Client object into FescarFeignClient and generates a subclass of LoadBalancerFeignClient called FescarLoadBalancerFeignClient. If the bean is an instance of Client and not FescarFeignClient or LoadBalancerFeignClient, it is directly wrapped and transformed into FescarFeignClient.

The above process design is quite clever. It controls the order of configuration based on Spring Boot's Auto Configuration and customizes the Feign Builder bean to ensure that all Clients are enhanced with FescarFeignClient. It also wraps the beans in the Spring container using BeanPostProcessor, ensuring that all beans in the container are enhanced with FescarFeignClient, thus avoiding the replacement action in the getTarget method of FeignClientFactoryBean.

Hystrix Isolation

Now let's take a look at the Hystrix part. Why do we separate Hystrix and implement a separate strategy class in Fescar? Currently, the default implementation of the transaction context RootContext is based on ThreadLocal, which means the context is bound to the thread. Hystrix itself has two isolation modes: semaphore-based isolation and thread pool-based isolation. Hystrix officially recommends using thread pool isolation for better separation, which is the commonly used mode:

Thread or Semaphore
The default, and the recommended setting, is to run HystrixCommands using thread isolation (THREAD) and HystrixObservableCommands using semaphore isolation (SEMAPHORE).

Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.

Generally the only time you should use semaphore isolation for HystrixCommands is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.

You are correct that the service layer's business code and the thread that sends the request are not the same. Therefore, the ThreadLocal approach cannot pass the XID to the Hystrix thread and subsequently to the callee. To address this issue, Hystrix provides a mechanism for developers to customize the concurrency strategy. This can be done by extending the HystrixConcurrencyStrategy class and overriding the wrapCallable method:

public class FescarHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

private HystrixConcurrencyStrategy delegate;

public FescarHystrixConcurrencyStrategy() {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
}

@Override
public <K> Callable<K> wrapCallable(Callable<K> c) {
if (c instanceof FescarContextCallable) {
return c;
}

Callable<K> wrappedCallable;
if (this.delegate != null) {
wrappedCallable = this.delegate.wrapCallable(c);
}
else {
wrappedCallable = c;
}
if (wrappedCallable instanceof FescarContextCallable) {
return wrappedCallable;
}

return new FescarContextCallable<>(wrappedCallable);
}

private static class FescarContextCallable<K> implements Callable<K> {

private final Callable<K> actual;
private final String xid;

FescarContextCallable(Callable<K> actual) {
this.actual = actual;
this.xid = RootContext.getXID();
}

@Override
public K call() throws Exception {
try {
RootContext.bind(xid);
return actual.call();
}
finally {
RootContext.unbind();
}
}

}
}

Fescar also provides a FescarHystrixAutoConfiguration, which generates the FescarHystrixConcurrencyStrategy when HystrixCommand is present.

@Configuration
@ConditionalOnClass(HystrixCommand.class)
public class FescarHystrixAutoConfiguration {

@Bean
FescarHystrixConcurrencyStrategy fescarHystrixConcurrencyStrategy() {
return new FescarHystrixConcurrencyStrategy();
}

}

reference

author

kangshu.guo,Community nickname ywind, formerly employed at Huawei Terminal Cloud, currently a Java engineer at Sohu Intelligent Media Center. Mainly responsible for development related to Sohu accounts. Has a strong interest in distributed transactions, distributed systems, and microservices architecture. min.ji(qinming),Community nickname slievrly, Fescar project leader, core developer of Alibaba middleware TXC/GTS. Engaged in core research and development work in distributed middleware for a long time. Has extensive technical expertise in the field of distributed transactions.

· 6 min read

Many developers are already familiar with Fescar. However, Fescar has now transformed into Seata. If you're not aware of Seata, please check the following link.

SEATA GITHUB: [https://github.com/apache/incubator-seata]

We extend our sincere thanks and greetings to the Alibaba team for their contributions in bringing numerous open-source software to developers.

Today, I will share my insights on integrating Seata with Spring Cloud, aiming to help more developers avoid common pitfalls and streamline their setup process.

2. Project Overview

The setup process is as follows: client -> gateway -> service consumer -> service provider.

Technical Framework: spring cloud gateway
spring cloud fegin
nacos1.0.RC2
fescar-server0.4.1 (Seata)

For instructions on starting Nacos, please refer to: Nacos Startup Guide

Seata supports various service registration methods. In the fescar-server-0.4.1\conf directory, you will find:

file.conf
logback.xml
nacos-config.sh
nacos-config.text
registry.conf

There are a total of five files. Among them, file.conf and registry.conf are needed in the code segments for both service consumers and providers. Note: file.conf and registry.conf must be included in the current applications in use, i.e., both service consumer and provider applications must include them. If you are using a configuration center like Nacos or ZK, file.cnf can be ignored. However, if type="file" is specified, then file.cnf must be used.

Below is the configuration information in the registry.conf file. The registry section is for the service registration center configuration, and the config section is for the configuration center.

As shown below, Seata currently supports nacos, file, eureka, redis, zookeeper, etc., for registration and configuration. The default downloaded configuration type is file. The choice of method depends on your project’s actual requirements. Here, I chose nacos, but eureka can also be used. Both versions have been tested and work fine.

Note: If you are integrating with eureka, please use the latest official version.

3. Core Configuration

registry {
# file, nacos, eureka, redis, zk
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:1001/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}

config {
# file, nacos, apollo, zk
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
apollo {
app.id = "fescar-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}

Note that nacos-config.sh is a script that needs to be executed if using Nacos as the configuration center. It initializes some default settings for Nacos.

Refer to the official guide for SEATA startup: Note that the official startup command separates parameters with spaces, so be careful. The IP is an optional parameter. Due to DNS resolution, sometimes when registering with Nacos, Fescar might obtain the address using the computer name, requiring you to specify the IP or configure the host to point to the IP. This issue has been fixed in the latest SEATA version.

sh fescar-server.sh 8091 /home/admin/fescar/data/ IP (optional)

As mentioned earlier, file.conf and registry.conf are needed in our code. The focus here is on file.conf. It is only loaded if registry is configured with file. If using ZK, Nacos, or other configuration centers, it can be ignored. However, service.localRgroup.grouplist and service.vgroupMapping need to be specified in the configuration center so that your client can automatically obtain the corresponding SEATA service and address from the configuration center upon startup. Failure to configure this will result in an error due to the inability to connect to the server. If using eureka, the config section should specify type="file". SEATA config currently does not support eureka.

transport {
# tcp, udt, unix-domain-socket
type = "TCP"
# NIO, NATIVE
server = "NIO"
# enable heartbeat
heartbeat = true
# thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size, will not be used for UDT
boss-thread-size = 1
# auto default pin or 8
worker-thread-size = 8
}
}
service {
# vgroup -> rgroup
vgroup_mapping.service-provider-fescar-service-group = "default"
# only support single node
localRgroup.grouplist = "127.0.0.1:8091"
# degrade current not support
enableDegrade = false
# disable
disable = false
}

client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}

4. Service Details

Two key points need attention:

grouplist IP: This is the IP and port of the current Fescar server.
vgroup_mapping configuration.

vgroup_mapping.service-provider-fescar-service-group: The service name here is actually the application name configured in the application.properties of your consumer or provider, e.g., spring.application.name=service-provider. In the source code, the application name is concatenated with fescar-service-group to form the key. Similarly, the value is the name of the current Fescar service. cluster = "default" / application = "default"

vgroup_mapping.service-provider-fescar-service-group = "default"
# only support single node
localRgroup.grouplist = "127.0.0.1:8091"

Both provider and consumer need to configure these two files.

If you use Nacos as the configuration center, you need to add the configuration in Nacos by adding the configuration manually.

5. Transaction Usage

In my code, the request is load-balanced through the gateway to the consumer. The consumer then requests the provider through Feign. The official example uses Feign, but here, the request is forwarded directly through the gateway, so the global transaction is handled in the controller layer, similar to the official demo.

@RestController
public class DemoController {
@Autowired
private DemoFeignClient demoFeignClient;

@Autowired
private DemoFeignClient2 demoFeignClient2;
@GlobalTransactional(timeoutMills = 300000, name = "spring-cloud-demo-tx")
@GetMapping("/getdemo")
public String demo() {

// Call service A and simply save
ResponseData<Integer> result = demoFeignClient.insertService("test", 1);
if(result.getStatus() == 400) {
System.out.println(result + "+++++++++++++++++++++++++++++++++++++++");
throw new RuntimeException("this is error1");
}

// Call service B and test rollback of service A upon error
ResponseData<Integer> result2 = demoFeignClient2.saveService();

if(result2.getStatus() == 400) {
System.out.println(result2 + "+++++++++++++++++++++++++++++++++++++++");
throw new RuntimeException("this is error2");
}

return "SUCCESS";
}
}

This concludes the core integration of transactions. Here, service A and B are both providers. When service B encounters an error, the global transaction rolls back. Each transaction can handle its local transactions independently.

SEATA uses a global XID to uniformly identify transactions. I will not list the database tables needed for SEATA here. For details, refer to: spring-cloud-fescar official DEMO

5.Data Proxy

Another important point is that in a distributed database service, each database needs an undo_log table to handle XID storage.

Additionally, each service project needs a database connection pool proxy. Currently, only Druid connection pool is supported. More will be supported in the future.

@Configuration
public class DatabaseConfiguration {

@Bean(destroyMethod = "close", initMethod = "init")
@ConfigurationProperties(prefix="spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}

@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}

@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSourceProxy);
return factoryBean.getObject();
}
}

Pay attention to the configuration file and data proxy. Without a data source proxy, undo_log will have no data, making XID management impossible.

Author: Da Fei