Skip to main content

· 3 min read

Author: FUNKYE (Chen Jianbin), Principal Engineer at a certain Internet company in Hangzhou.

Preface

  1. Let's start by examining the package structure. Under seata-dubbo and seata-dubbo-alibaba, there is a common class named TransactionPropagationFilter, corresponding to Apache Dubbo and Alibaba Dubbo respectively.

20200101203229

Source Code Analysis

package io.seata.integration.dubbo;

import io.seata.core.context.RootContext;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100)
public class TransactionPropagationFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// get local XID
String xid = RootContext.getXID();
String xidInterceptorType = RootContext.getXIDInterceptorType();
// get XID from dubbo param
String rpcXid = getRpcXid();
String rpcXidInterceptorType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
//transfer xid
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE, xidInterceptorType);
} else {
if (rpcXid != null) {
//bind XID
RootContext.bind(rpcXid);
RootContext.bindInterceptorType(rpcXidInterceptorType);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] interceptorType[{}] to RootContext", rpcXid, rpcXidInterceptorType);
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
//remove xid which has finished
String unbindInterceptorType = RootContext.unbindInterceptorType();
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[{}] interceptorType[{}] from RootContext", unbindXid, unbindInterceptorType);
}
// if unbind xid is not current rpc xid
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}, xidInterceptorType from {} to {} ", rpcXid, unbindXid, rpcXidInterceptorType, unbindInterceptorType);
if (unbindXid != null) {
// bind xid
RootContext.bind(unbindXid);
RootContext.bindInterceptorType(unbindInterceptorType);
LOGGER.warn("bind [{}] interceptorType[{}] back to RootContext", unbindXid, unbindInterceptorType);
}
}
}
}
}

/**
* get rpc xid
* @return
*/
private String getRpcXid() {
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (rpcXid == null) {
rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());
}
return rpcXid;
}

}
  1. Based on the source code, we can deduce the corresponding logic processing.

20200101213336

Key Points

  1. Dubbo @Activate Annotation:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {


String[] group() default {};


String[] value() default {};


String[] before() default {};


String[] after() default {};


int order() default 0;
}

It can be analyzed that the @Activate annotation on Seata's Dubbo filter, with parameters @Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100), indicates that both the Dubbo service provider and consumer will trigger this filter. Therefore, our Seata initiator will initiate an XID transmission. The above flowchart and code have clearly represented this.

  1. Dubbo implicit parameter passing can be achieved through setAttachment and getAttachment on RpcContext for implicit parameter transmission between service consumers and providers.

Fetching: RpcContext.getContext().getAttachment(RootContext.KEY_XID);

Passing: RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);

Conclusion

For further source code reading, please visit the Seata official website

· 8 min read

一. Introduction

In the analysis of the Spring module, it is noted that Seata's Spring module handles beans involved in distributed transactions. Upon project startup, when the GlobalTransactionalScanner detects references to TCC services (i.e., TCC transaction participants), it dynamically proxies them by weaving in the implementation class of MethodInterceptor under the TCC mode. The initiator of the TCC transaction still uses the @GlobalTransactional annotation to initiate it, and a generic implementation class of MethodInterceptor is woven in.

The implementation class of MethodInterceptor under the TCC mode is referred to as TccActionInterceptor (in the Spring module). This class invokes ActionInterceptorHandler (in the TCC module) to handle the transaction process under the TCC mode.

The primary functions of TCC dynamic proxy are: generating the TCC runtime context, propagating business parameters, and registering branch transaction records.

二. Introduction to TCC Mode

In the Two-Phase Commit (2PC) protocol, the transaction manager coordinates resource management in two phases. The resource manager provides three operations: the prepare operation in the first phase, and the commit operation and rollback operation in the second phase.

public interface TccAction {

@TwoPhaseBusinessAction(name = "tccActionForTest" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "a") int a,
@BusinessActionContextParameter(paramName = "b", index = 0) List b,
@BusinessActionContextParameter(isParamInProperty = true) TccParam tccParam);

public boolean commit(BusinessActionContext actionContext);

public boolean rollback(BusinessActionContext actionContext);
}

This is a participant instance in TCC. Participants need to implement three methods, where the first parameter must be BusinessActionContext, and the return type of the methods is fixed. These methods are exposed as microservices to be invoked by the transaction manager.

  • prepare: Checks and reserves resources. For example, deducting the account balance and increasing the same frozen balance.
  • commit: Uses the reserved resources to complete the actual business operation. For example, reducing the frozen balance to complete the fund deduction business.
  • cancel: Releases the reserved resources. For example, adding back the frozen balance to the account balance.

The BusinessActionContext encapsulates the context environment of the current transaction: xid, branchId, actionName, and parameters annotated with @BusinessActionContextParam.

There are several points to note in participant business:

  1. Ensure business idempotence, supporting duplicate submission and rollback of the same transaction.
  2. Prevent hanging, i.e., the rollback of the second phase occurs before the try phase.
  3. Relax consistency protocols, eventually consistent, so it is read-after-write.

Three. Remoting package analysis

Remoting Package Analysis

All classes in the package serve DefaultRemotingParser. Dubbo, LocalTCC, and SofaRpc are responsible for parsing classes under their respective RPC protocols.

Main methods of DefaultRemotingParser:

  1. Determine if the bean is a remoting bean, code:
    @Override
public boolean isRemoting(Object bean, String beanName) throws FrameworkException {
//判断是否是服务调用方或者是否是服务提供方
return isReference(bean, beanName) || isService(bean, beanName);
}
  1. Remote bean parsing, parses rpc classes into RemotingDesc.

Code:

@Override
public boolean isRemoting(Object bean, String beanName) throws FrameworkException {
//判断是否是服务调用方或者是否是服务提供方
return isReference(bean, beanName) || isService(bean, beanName);
}

Utilize allRemotingParsers to parse remote beans. allRemotingParsers is dynamically loaded in initRemotingParser() by calling EnhancedServiceLoader.loadAll(RemotingParser.class), which implements the SPI loading mechanism for loading subclasses of RemotingParser.

For extension purposes, such as implementing a parser for feign remote calls, simply write the relevant implementation classes of RemotingParser in the SPI configuration. This approach offers great extensibility.

RemotingDesc contains specific information about remote beans required for the transaction process, such as targetBean, interfaceClass, interfaceClassName, protocol, isReference, and so on.

  1. TCC Resource Registration
public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName) {
RemotingDesc remotingBeanDesc = getServiceDesc(bean, beanName);
if (remotingBeanDesc == null) {
return null;
}
remotingServiceMap.put(beanName, remotingBeanDesc);

Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if (isService(bean, beanName)) {
try {
//service bean, registry resource
Object targetBean = remotingBeanDesc.getTargetBean();
for (Method m : methods) {
TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
TCCResource tccResource = new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
tccResource.setTargetBean(targetBean);
tccResource.setPrepareMethod(m);
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
tccResource.setCommitMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),
new Class[] {BusinessActionContext.class}));
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
tccResource.setRollbackMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),
new Class[] {BusinessActionContext.class}));
//registry tcc resource
DefaultResourceManager.get().registerResource(tccResource);
}
}
} catch (Throwable t) {
throw new FrameworkException(t, "parser remoting service error");
}
}
if (isReference(bean, beanName)) {
//reference bean, TCC proxy
remotingBeanDesc.setReference(true);
}
return remotingBeanDesc;
}

Firstly, determine if it is a transaction participant. If so, obtain the interfaceClass from RemotingDesc, iterate through the methods in the interface, and check if there is a @TwoParserBusinessAction annotation on the method. If found, encapsulate the parameters into TCCResource and register the TCC resource through DefaultResourceManager.

Here, DefaultResourceManager will search for the corresponding resource manager based on the BranchType of the Resource. The resource management class under the TCC mode is in the tcc module.

This RPC parsing class is mainly provided for use by the spring module. parserRemotingServiceInfo() is encapsulated into the TCCBeanParserUtils utility class in the spring module. During project startup, the GlobalTransactionScanner in the spring module parses TCC beans through the utility class. TCCBeanParserUtils calls TCCResourceManager to register resources. If it is a global transaction service provider, it will weave in the TccActionInterceptor proxy. These processes are functionalities of the spring module, where the tcc module provides functional classes for use by the spring module.

Three. TCC Resource Manager

TCCResourceManager is responsible for managing the registration, branching, committing, and rolling back of resources under the TCC mode.

  1. During project startup, when the GlobalTransactionScanner in the spring module detects that a bean is a tcc bean, it caches resources locally and registers them with the server:
    @Override
public void registerResource(Resource resource) {
TCCResource tccResource = (TCCResource)resource;
tccResourceCache.put(tccResource.getResourceId(), tccResource);
super.registerResource(tccResource);
}

The logic for communicating with the server is encapsulated in the parent class AbstractResourceManager. Here, TCCResource is cached based on resourceId. When registering resources in the parent class AbstractResourceManager, resourceGroupId + actionName is used, where actionName is the name specified in the @TwoParseBusinessAction annotation, and resourceGroupId defaults to DEFAULT.

  1. Transaction branch registration is handled in the rm-datasource package under AbstractResourceManager. During registration, the parameter lockKeys is null, which differs from the transaction branch registration under the AT mode.

  2. Committing or rolling back branches:

    @Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
}
try {
boolean result = false;
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
LOGGER.error(msg, t);
throw new FrameworkException(t, msg);
}
}

Restore the business context using parameters xid, branchId, resourceId, and applicationData.

Execute the commit method through reflection based on the retrieved context and return the execution result. The rollback method follows a similar approach.

Here, branchCommit() and branchRollback() are provided for AbstractRMHandler, an abstract class for resource processing in the rm module. This handler is a further implementation class of the template method defined in the core module. Unlike registerResource(), which actively registers resources during spring scanning.

Four. Transaction Processing in TCC Mode

The invoke() method of TccActionInterceptor in the spring module is executed when the proxied rpc bean is called. This method first retrieves the global transaction xid passed by the rpc interceptor, and then the transaction process of global transaction participants under TCC mode is still handed over to the ActionInterceptorHandler in the tcc module.

In other words, transaction participants are proxied during project startup. The actual business methods are executed through callbacks in ActionInterceptorHandler.

    public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback) throws Throwable {
Map<String, Object> ret = new HashMap<String, Object>(4);

//TCC name
String actionName = businessAction.name();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
//set action anme
actionContext.setActionName(actionName);

//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);

//set the parameter whose type is BusinessActionContext
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
//the final parameters of the try method
ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
//the final result
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
return ret;
}

Here are two important operations:

  1. In the doTccActionLogStore() method, two crucial methods are called:
  • fetchActionRequestContext(method, arguments): This method retrieves parameters annotated with @BusinessActionContextParam and inserts them into BusinessActionComtext along with transaction-related parameters in the init method below.
  • DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid, applicationContextStr, null): This method performs the registration of transaction branches for transaction participants under TCC mode.
  1. Callback execution of targetCallback.execute(), which executes the specific business logic of the proxied bean, i.e., the prepare() method.

Five. Summary

The tcc module primarily provides the following functionalities:

  1. Defines annotations for two-phase protocols, providing attributes needed for transaction processes under TCC mode.
  2. Provides implementations of ParserRemoting for parsing remoting beans of different RPC frameworks, to be invoked by the spring module.
  3. Provides the TCC ResourceManager for resource registration, transaction branch registration, submission, and rollback under TCC mode.
  4. Provides classes for handling transaction processes under TCC mode, allowing MethodInterceptor proxy classes to delegate the execution of specific mode transaction processes to the tcc module.

Author: Zhao Runze, Series Link.

· 6 min read

1. Introduction

The core module defines the types and states of transactions, common behaviors, protocols, and message models for communication between clients and servers, as well as exception handling methods, compilation, compression types, configuration information names, environment context, etc. It also encapsulates RPC based on Netty for use by both clients and servers.

Let's analyze the main functional classes of the core module according to the package order:

Image Description

codec: Defines a codec factory class, which provides a method to find the corresponding processing class based on the serialization type. It also provides an interface class Codec with two abstract methods:

<T> byte[] encode(T t);
<T> T decode(byte[] bytes);

1. codec Module

In version 1.0, the codec module has three serialization implementations: SEATA, PROTOBUF, and KRYO.

compressor: Similar to classes under the codec package, there are three classes here: a compression type class, a factory class, and an abstract class for compression and decompression operations. In version 1.0, there is only one compression method: Gzip.

constants: Consists of two classes, ClientTableColumnsName and ServerTableColumnsName, representing the models for transaction tables stored on the client and server sides respectively. It also includes classes defining supported database types and prefixes for configuration information attributes.

context: The environment class RootContext holds a ThreadLocalContextCore to store transaction identification information. For example, TX_XID uniquely identifies a transaction, and TX_LOCK indicates the need for global lock control for local transactions on update/delete/insert/selectForUpdate SQL operations.

event: Utilizes the Guava EventBus event bus for registration and notification, implementing the listener pattern. In the server module's metrics package, MetricsManager registers monitoring events for changes in GlobalStatus, which represents several states of transaction processing in the server module. When the server processes transactions, the callback methods registered for monitoring events are invoked, primarily for statistical purposes.

lock: When the server receives a registerBranch message for branch registration, it acquires a lock. In version 1.0, there are two lock implementations: DataBaseLocker and MemoryLocker, representing database locks and in-memory locks respectively. Database locks are acquired based on the rowKey = resourceId + tableName + pk, while memory locks are based directly on the primary key.

model: BranchStatus, GlobalStatus, and BranchType are used to define transaction types and global/branch states. Additionally, TransactionManager and ResourceManager are abstract classes representing resource managers (RMs) and transaction managers (TMs) respectively. Specific implementations of RMs and TMs are not provided here due to variations in transaction types.

protocol: Defines entity classes used for transmission in the RPC module, representing models for requests and responses under different transaction status scenarios.

store: Defines data models for interacting with databases and the SQL statements used for database interactions.

    public void exceptionHandleTemplate(Callback callback, AbstractTransactionRequest request,
AbstractTransactionResponse response) {
try {
callback.execute(request, response); //执行事务业务的方法
callback.onSuccess(request, response); //设置response返回码
} catch (TransactionException tex) {
LOGGER.error("Catch TransactionException while do RPC, request: {}", request, tex);
callback.onTransactionException(request, response, tex); //设置response返回码并设置msg
} catch (RuntimeException rex) {
LOGGER.error("Catch RuntimeException while do RPC, request: {}", request, rex);
callback.onException(request, response, rex); //设置response返回码并设置msg
}
}

2. Analysis of Exception Handling in the exception Package

This is the UML diagram of AbstractExceptionHandler. Callback and AbstractCallback are internal interfaces and classes of AbstractExceptionHandler. AbstractCallback implements three methods of the Callback interface but leaves the execute() method unimplemented. AbstractExceptionHandler uses AbstractCallback as a parameter for the template method and utilizes its implemented methods. However, the execute() method is left to be implemented by subclasses.

Image Description

From an external perspective, AbstractExceptionHandler defines a template method with exception handling. The template includes four behaviors, three of which are already implemented, and the behavior execution is delegated to subclasses.

3. Analysis of the rpc Package

When it comes to the encapsulation of RPC by Seata, one need not delve into the details. However, it's worth studying how transaction business is handled.

The client-side RPC class is AbstractRpcRemotingClient:

Image Description

The important attributes and methods are depicted in the class diagram. The methods for message sending and initialization are not shown in the diagram. Let's analyze the class diagram in detail:

clientBootstrap: This is a wrapper class for the netty startup class Bootstrap. It holds an instance of Bootstrap and customizes the properties as desired.

clientChannelManager: Manages the correspondence between server addresses and channels using a ConcurrentHashMap<serverAddress,channel> container.

clientMessageListener: Handles messages. Depending on the message type, there are three specific processing methods.

public void onMessage(RpcMessage request, String serverAddress, ClientMessageSender sender) {
Object msg = request.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("onMessage:" + msg);
}
if (msg instanceof BranchCommitRequest) {
handleBranchCommit(request, serverAddress, (BranchCommitRequest)msg, sender);
} else if (msg instanceof BranchRollbackRequest) {
handleBranchRollback(request, serverAddress, (BranchRollbackRequest)msg, sender);
} else if (msg instanceof UndoLogDeleteRequest) {
handleUndoLogDelete((UndoLogDeleteRequest)msg);
}
}

4. Analysis of the rpc Package (Continued)

Within the message class, the TransactionMessageHandler is responsible for handling messages of different types. Eventually, based on the different transaction types (AT, TCC, SAGE), specific handling classes, as mentioned in the second part, exceptionHandleTemplate(), are invoked.

mergeSendExecutorService: This is a thread pool with only one thread responsible for merging and sending messages from different addresses. In the sendAsyncRequest() method, messages are offered to the queue LinkedBlockingQueue of the thread pool. The thread is then responsible for polling and processing messages.

channelRead(): Handles server-side HeartbeatMessage.PONG heartbeat messages. Additionally, it processes MergeResultMessage, which are response messages for asynchronous messages. It retrieves the corresponding MessageFuture based on the msgId and sets the result of the asynchronous message.

dispatch(): Invokes the clientMessageListener to handle messages sent by the server. Different types of requests have different handling classes.

In summary, when looking at Netty, one should focus on serialization methods and message handling handler classes. Seata's RPC serialization method is processed by finding the Codec implementation class through the factory class, and the handler is the TransactionMessageHandler mentioned earlier.

5. Conclusion

The core module covers a wide range of functionalities, with most classes serving as abstract classes for other modules. Business models are abstracted out, and specific implementations are distributed across different modules. The code in the core module is of high quality, with many classic design patterns such as the template pattern discussed earlier. It is very practical and well-crafted, deserving careful study.

Series Links

· One min read

Event Introduction

Highlight Interpretation

Guest Speakers

  • Ji Min (Qing Ming) "Seata Past, Present, and Future" slides

  • Wu Jiangke "My Open Source Journey with SEATA and SEATA's Application in Internet Healthcare Systems" slides

    1577282651

  • Shen Haiqiang (Xuan Yi) "Essence of Seata AT Mode" slides

    1577282652

  • Zhang Sen "Detailed Explanation of TCC Mode in Distributed Transaction Seata"

    1577282653

  • Chen Long (Yiyuan) "Seata Long Transaction Solution Saga Mode"

    1577282654

  • Chen Pengzhi "Seata Practice in Didi Chuxing's Motorcycle Business" slides

    1577282655

Special Awards