Skip to main content

· 17 min read

Author | Liu Xiaomin Yu Yu

I. Introduction

In the Java world, netty is a widely used high-performance network communication framework, and many RPC frameworks are based on netty. In the golang world, getty is also a high-performance network communication library similar to netty. getty was originally developed by Yu Yu, the leader of the dubbogo project, and is available in dubbo-go as an underlying communication library. github.com/apache/dubbo-go). With the donation of dubbo-go to the apache foundation, getty eventually made its way into the apache family and was renamed dubbo-getty, thanks to the efforts of the community.

In '18, I was practicing microservices in my company, and the biggest problem I encountered at that time was distributed transactions. In the same year, Ali open-sourced their distributed transaction solution in the community, and I quickly noticed this project, which was initially called fescar, but later renamed seata. Since I was very interested in open source technology, I added a lot of community groups, and at that time, I also paid attention to the dubbo-go project, and silently dived in it. As I learnt more about seata, the idea of making a go version of a distributed transaction framework gradually emerged.

To make a golang version of distributed transaction framework, one of the first problems is how to achieve RPC communication. dubbo-go is a very good example in front of us, so we started to study the underlying getty of dubbo-go.

How to implement RPC communication based on getty?

The overall model of the getty framework is as follows:

! [image.png]( https://img.alicdn.com/imgextra/i1/O1CN011TIcL01jY4JaweOfV_! !6000000004559-2-tps-954-853.png)

The following is a detailed description of the RPC communication process of seata-golang with related code.

1. Establish Connection

To implement RPC communication, we need to establish a network connection first, let's start from client.go.

func (c *client) connect() {
var (
err error
ss Session
)

for {
// Create a session connection
ss = c.dial()
if ss == nil {
if ss == nil { // client has been closed
if ss == nil { // client has been closed
}
err = c.newSession(ss)
if err == nil {
// send and receive messages
ss.(*session).run()
// Omit some code here

break
}
// don't distinguish between tcp connection and websocket connection. because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
ss.Conn().Close()
Close()
}

The connect() method gets a session connection via the dial() method into the dial() method:

func (c *client) dial() Session {
switch c.endPointType {
case TCP_CLIENT.
return c.dialTCP()
case UDP_CLIENT: return c.dialUDP()
return c.dialUDP()
case WS_CLIENT: return c.dialWS()
return c.dialWS()
case WSS_CLIENT: return c.dialWSS()
return c.dialWSS()
}

return nil
}

We're concerned with TCP connections, so we continue into the c.dialTCP() method:

func (c *client) dialTCP() Session {
var (
err error
conn net.
)

for {
if c.IsClosed() {
return nil
}
if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig ! = nil {
d := &net.Dialer{Timeout: connectTimeout}
// Establish an encrypted connection
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
}
} else {
// Establish a tcp connection
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
}
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err == nil {
// Return a TCPSession
return newTCPSession(conn, c)
}

log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}

At this point, we know how getty establishes a TCP connection and returns a TCPSession.

2. Sending and Receiving Messages

How does it send and receive messages? Let's go back to the connection method and look at the next line, which is ss.(*session).run(). After this line of code, the code is a very simple operation, so we guess that the logic of this line of code must include sending and receiving messages, and then go to the run() method:

func (s *session) run() {
// Omit some of the code

go s.handleLoop()
go s.handlePackage()
}

There are two goroutines up here, handleLoop and handlePackage, which literally match our guesses into the handleLoop() method:

func (s *session) handleLoop() {
// Omit some of the code

for {
// A select blocks until one of its cases is ready to run.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
It choose one at random if multiple are ready.
// Otherwise it choose default branch if none is ready.

case outPkg, ok = <-s.wQ.
// Omit some of the code

iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
// Encode interface{} type outPkg into binary bits via s.writer
pkgBytes, err = s.writer.Write(s, outPkg)
// Omit some of the code

iovec = append(iovec, pkgBytes)

// omit some code
}
// Send these binary bits out
err = s.WriteBytesArray(iovec[:]...)
if err ! = nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}

case <-wheel.After(s.period).
if flag {
if wsFlag {
err := wsConn.writePing()
if err ! = nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
// Logic for timed execution, heartbeat, etc.
s.listener.OnCron(s)
}
}
}
}

With the above code, it is easy to see that the handleLoop() method handles the logic of sending the message, which is encoded into binary bits by s.writer and then sent over the established TCP connection. This s.writer corresponds to the Writer interface, which is an interface that must be implemented by the RPC framework.

Moving on to the handlePackage() method:

func (s *session) handlePackage() {
// Omit some of the code

if _, ok := s.Connection.(*gettyTCPConn); ok {
if s.reader == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
log.Error(errStr)
panic(errStr)
}

err = s.handleTCPPackage()
} else if _, ok := s.Connection.(*gettyWSConn); ok {
err = s.handleWSPackage()
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
err = s.handleUDPPackage()
} else {
panic(fmt.Sprintf("unknown type session{%#v}", s))
}
}

Go to the handleTCPPackage() method:

func (s *session) handleTCPPackage() error {
// Omit some of the code

conn = s.Connection.(*gettyTCPConn)
for {
// omit some code

bufLen = 0
for {
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
// Receive a message from the TCP connection
bufLen, err = conn.recv(buf)
// Omit some of the code

break
}
// Omit part of the code

// Write the binary bits of the received message to pkgBuf
pktBuf.Write(buf[:bufLen])
for {
if pktBuf.Len() <= 0 {
Write(buf[:bufLen]) for { if pktBuf.
}
// Decode the received message into an RPC message via s.reader
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// Omit some of the code

s.UpdateActive()
// Put the received message into a TaskQueue for consumption by the RPC consumer.
s.addTask(pkg)
pktBuf.Next(pkgLen)
// continue to handle case 5
If exit { pktBuf.Next(pkgLen) // continue to handle case 5
if exit {
pktBuf.Next(pkgLen) // continue to handle case 5 } if exit {
}
}

return perrors.WithStack(err)
}

From the above code logic, we analyse that the RPC consumer needs to decode the binary bits received from the TCP connection into messages that can be consumed by RPC, and this work is implemented by s.reader, so we need to implement the Reader interface corresponding to s.reader in order to build the RPC communication layer.

3. How to decouple the underlying network message processing logic from the business logic

We all know that netty decouples the underlying network logic from the business logic through the boss thread and the worker thread. So how does getty do it?

At the end of the handlePackage() method, we see that the incoming message is put into the s.addTask(pkg) method, so let's move on:

func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if taskPool := s.EndPoint().GetTaskPool(); taskPool ! = nil {
taskPool.AddTaskAlways(f)
return
}
f()
}

The pkg argument is passed to an anonymous method that ends up in taskPool. This method is critical, and I ran into a pitfall later on when I wrote the seata-golang code, which is analysed later.

Next we look at the definition of taskPool:

// NewTaskPoolSimple builds a simple task pool.
func NewTaskPoolSimple(size int) GenericTaskPool {
if size < 1 {
size = runtime.NumCPU() * 100
NumCPU() * 100 }
return &taskPoolSimple{
work: make(chan task), sem: make(chan struct{task
sem: make(chan struct{}, size),
done: make(chan struct{}),
}
}

Builds a channel sem with a buffer size of size (defaults to runtime.NumCPU() * 100). Then look at the method AddTaskAlways(t task):

func (p *taskPoolSimple) AddTaskAlways(t task) {
select {
case <-p.done.
return
default.
}

select {
case p.work <- t.
return
default: }
}
select {
case p.work <- t: return default: }
case p.sem <- struct{}{}.
p.wg.Add(1)
go p.worker(t)
default.
goSafely(t)
}
}

When a task is added, it is consumed by len(p.sem) goroutines, and if no goroutine is free, a temporary goroutine is started to run t(). This is equivalent to having len(p.sem) goroutines to form a goroutine pool, and the goroutines in the pool process business logic instead of the goroutines that process network messages to run business logic, thus achieving decoupling. One of the pitfalls I encountered when writing seata-golang was that I forgot to set the taskPool, which resulted in the same goroutine handling the business logic and the underlying network message logic. When I blocked the business logic and waited for a task to complete, I blocked the entire goroutine, and I couldn't receive any messages during the blocking period.

4. Implementation

The following code is available at getty.go:

// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
Read(Session, []byte) (interface{}, int, error)
}

// Writer is used to marshal a pkg and write to session.
type Writer interface {
// If @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) ([]byte, error)
Write(Session, interface{}) ([]byte, error) }

// ReadWriter interface use for handle application packages.
type ReadWriter interface {
Writer
Writer
}
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(Session) error

OnOpen(Session) error // invoked when session closed.
EventListener { // invoked when session opened // If the return error is not nil, @Session will be closed.)

OnOpen(Session) error // invoked when session closed.
OnError(Session, error)

// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(Session)

// invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
If the message's length is greater than it, u should should return err in // Reader{Read} and getty will close this connection soon.
// If ur logic processing in this func
// If ur logic processing in this func will take a long time, u should start a goroutine
// If ur logic processing in this func will take a long time, u should start a goroutine pool (like working thread pool in cpp) to handle the processing asynchronously.
// can do the logic processing in other asynchronous way.
Or u // can do the logic processing in other asynchronous way. !In short, ur OnMessage callback func should return asap.
// In short, ur OnMessage callback func should return asap.
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}

By analysing the entire getty code, we only need to implement ReadWriter to encode and decode RPC messages, and then implement EventListener to handle the corresponding specific logic of RPC messages, and then inject the ReadWriter implementation and the EventLister implementation into the Client and Server sides of RPC, then we can implement RPC communication. Inject the ReadWriter implementation and EventLister implementation into the Client and Server side of RPC to achieve RPC communication.

4.1 Codec Protocol Implementation

The following is the definition of the seata protocol: ! [image-20201205214556457.png](https://cdn.nlark.com/yuque/0/2020/png/737378/1607180799872-5f96afb6-680d-4e69-8c95-b8fd1ac4c3a7.png #align=left&display=inline&height=209&margin=%5Bobject%20Object%5D&name=image-20201205214556457.png& originHeight=209&originWidth=690&size=18407&status=done&style=none&width=690)

In the ReadWriter interface implementation RpcPackageHandler, call the Codec method to codec the message body in the above format:

// Encode the message into binary bits
func MessageEncoder(codecType byte, in interface{}) []byte {
switch codecType {
case SEATA.
return SeataEncoder(in)
default.
log.Errorf("not support codecType, %s", codecType)
return nil
}
}

// Decode the binary bits into the message body
func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
switch codecType {
case SEATA.
return SeataDecoder(in)
default.
log.Errorf("not support codecType, %s", codecType)
return nil, 0
}
}

4.2 Client Side Implementation

Let's look at the client-side implementation of EventListener [RpcRemotingClient](https://github.com/opentrx/seata-golang/blob/dev/pkg/client/rpc_remoting_client. go):

func (client *RpcRemoteClient) OnOpen(session getty.Session) error {
go func()
request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.
ApplicationId: client.conf.
TransactionServiceGroup: client.conf.
}}
// Once the connection is established, make a request to the Transaction Coordinator to register the TransactionManager.
_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err == nil {
// Save the connection to the Transaction Coordinator in the connection pool for future use.
clientSessionManager.RegisterGettySession(session)
client.GettySessionOnOpenChannel <- session.RemoteAddr()
}
}()

return nil
}

// OnError ...
func (client *RpcRemoteClient) OnError(session getty.Session, err error) {
clientSessionManager.ReleaseGettySession(session)
}

// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {
clientSessionManager.ReleaseGettySession(session)
}

// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {
log.Info("received message:{%v}", pkg)
rpcMessage, ok := pkg.(clientRpcRemoteClient.Session, pkg interface{}) { log.Info("received message:{%v}", pkg)
if ok {
heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {
log.Debugf("received PONG from %s", session.RemoteAddr())
}
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)

// Process the transaction message, commit or rollback
client.onMessage(rpcMessage, session.RemoteAddr())
} else {
resp, loaded := client.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
client.futures.Delete(rpcMessage.Id)
}
}
}

// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
// Send a heartbeat
client.defaultSendRequest(session, protocal.HeartBeatMessagePing)
}

The logic of clientSessionManager.RegisterGettySession(session) is analysed in subsection 4.4.

4.3 Server-side Transaction Coordinator Implementation

See DefaultCoordinator for code:

func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {
log.Infof("got getty_session:%s", session.Stat())
error { log.Infof("got getty_session:%s", session.Stat())
}

func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {
// Release the TCP connection
SessionManager.ReleaseGettySession(session)
session.Close()
log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)
}

func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {
log.Info("getty_session{%s} is closing......" , session.Stat())
}

func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {
log.Debugf("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.)
RpcMessage) if ok {
_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)
if isRegTM {
// Map the TransactionManager information to the TCP connection.
coordinator.OnRegTmMessage(rpcMessage, session)
OnRegTmMessage(rpcMessage, session)
}

heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {
coordinator.OnCheckMessage(rpcMessage, session)
OnCheckMessage(rpcMessage, session)
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
_, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)
if isRegRM {
// Map the ResourceManager information to the TCP connection.
coordinator.OnRegRmMessage(rpcMessage, session)
} else {
if SessionManager.IsRegistered(session) {
if err := recover(); } else { if SessionManager.IsRegistered(session) {
if err := recover(); err ! = nil { log.Errorf(); err !
log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)
}
}()
// Handle transaction messages, global transaction registration, branch transaction registration, branch transaction commit, global transaction rollback, etc.
coordinator.OnTrxMessage(rpcMessage, session)
} else {
session.Close()
log.Infof("Close an unhandled connection! [%v]", session)
}
}
} else {
resp, loaded := coordinator.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
coordinator.futures.Delete(rpcMessage.Id)
}
}
}
}

func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {

}

coordinator.OnRegTmMessage(rpcMessage, session) registers the Transaction Manager, coordinator.OnRegRmMessage(rpcMessage, session) registers the Resource The logic is analysed in Section 4.4. The message enters the coordinator.OnTrxMessage(rpcMessage, session) method and is routed to the specific logic according to the message type code:

switch msg.GetTypeCode() {
case protocal.TypeGlobalBegin:
req := msg.(protocal.GlobalBeginRequest)
resp := coordinator.doGlobalBegin(req, ctx)
return resp
case protocal.TypeGlobalStatus.
TypeGlobalStatus. req := msg.(protocal.GlobalStatusRequest)
resp := coordinator.doGlobalStatus(req, ctx)
return resp
case protocal.TypeGlobalReport.
req := msg.(protocal.GlobalReportRequest)
resp := coordinator.doGlobalReport(req, ctx)
return resp
case protocal.TypeGlobalCommit.
req := msg.(protocal.GlobalCommitRequest)
resp := coordinator.doGlobalCommit(req, ctx)
return resp
case protocal.TypeGlobalRollback.
req := msg.(protocal.GlobalRollbackRequest)
resp := coordinator.doGlobalRollback(req, ctx)
return resp
case protocal.TypeBranchRegister.
TypeBranchRegister. req := msg.(protocal.BranchRegisterRequest)
resp := coordinator.doBranchRegister(req, ctx)
return resp
case protocal.TypeBranchStatusReport.
TypeBranchStatusReport: req := msg.(protocal.BranchReportRequest)
resp := coordinator.doBranchReport(req, ctx)
return resp
default.
return nil
}

4.4 Session Manager Analysis

After the Client establishes a connection with the Transaction Coordinator, it saves the connection in the map serverSessions = sync.Map{} by using clientSessionManager.RegisterGettySession(session). The key of the map is the RemoteAddress of the Transaction Coordinator obtained from the session, and the value is the session. This allows the Client to register the Transaction Manager and Resource Manager with the Transaction Coordinator through a session in the map. See [getty_client_session_manager.go]. (https://github.com/opentrx/seata-golang/blob/dev/pkg/client/getty_client_session_manager.go) After the Transaction Manager and Resource Manager are registered with the Transaction Coordinator, a connection can be used to send either TM messages or RM messages. We identify a connection with an RpcContext:

type RpcContext struct {
Version string
TransactionServiceGroup string
ClientRole meta.TransactionRole
ApplicationId string
ClientId string
ResourceSets *model.
Session getty.
Session }

When a transaction message is received, we need to construct such an RpcContext to be used by the subsequent transaction logic. So, we will construct the following map to cache the mapping relationships:

var (
// session -> transactionRole
// TM will register before RM, if a session is not the TM registered, // it will be the RM registered.
// it will be the RM registered
session_transactionroles = sync.Map{}

// session -> applicationId
identified_sessions = sync.Map{}

// applicationId -> ip -> port -> session
client_sessions = sync.Map{}

// applicationId -> resourceIds
client_resources = sync.Map{}
)

In this way, the Transaction Manager and Resource Manager are registered to the Transaction Coordinator via coordinator.OnRegTmMessage(rpcMessage, session) and coordinator.OnRegRmMessage(rpcMessage, session) respectively. session) are registered with the Transaction Coordinator, the relationship between applicationId, ip, port and session is cached in the above client_sessions map, and the relationship between applicationId, ip, port and resourceIds (an application may be able to register with the Transaction Coordinator) is cached in the client_resources map. and resourceIds (there may be multiple Resource Managers for an application) in the client_resources map. When needed, we can construct an RpcContext from these mappings, which is very different from the java version of seata, so if you're interested, you can dig a little deeper. See [getty_session_manager.go`]. (https://github.com/opentrx/seata-golang/blob/dev/tc/server/getty_session_manager.go) At this point, we have analysed seata-golang the entire mechanism of the RPC communication model.

III. The Future of seata-golang

The development of seata-golang started in April this year, and in August it basically realised the interoperability with the java version of seata 1.2 protocol. seata) protocol, implemented AT mode for mysql database (automatically coordinating the commit rollback of distributed transactions), implemented TCC mode, and used mysql to store data on the TC side, which turned TC into a stateless application to support high-availability deployment. The following figure shows the principle of AT mode: ! [image20201205-232516.png]( https://img.alicdn.com/imgextra/i3/O1CN01alqsQS1G2oQecFYIs_! !6000000000565-2-tps-1025-573.png)

There is still a lot of work to be done, such as support for the registry, support for the configuration centre, protocol interoperability with the java version of seata 1.4, support for other databases, implementation of the craft transaction coordinator, etc. We hope that developers interested in the distributed transaction problem can join in to build a perfect golang's distributed transaction framework.

If you have any questions, please feel free to join the group [group number 33069364]:

Author Bio

Xiaomin Liu (GitHubID dk-lockdown), currently working at h3c Chengdu, is good at using Java/Go language, and has dabbled in cloud-native and microservices related technologies, currently specialising in distributed transactions. Yu Yu (github @AlexStocks), dubbo-go project and community leader, a programmer with more than 10 years of frontline experience in server-side infrastructure R&D, has participated in the improvement of Muduo/Pika/Dubbo/Sentinel-go and other well-known projects, and is currently engaged in container orchestration and service mesh work in the Trusted Native Department of ants. Currently, he is working on container orchestration and service mesh in the Trusted Native Department of AntGold.

References

seata official: https://seata.apache.org

java version seata:https://github.com/apache/incubator-seata

seata-golang project address: https://github.com/apache/incubator-seata-go

seata-golang go night reading b站分享:https://www.bilibili.com/video/BV1oz411e72T

· 30 min read

In Seata version 1.3.0, data source auto-proxy and manual proxy must not be mixed, otherwise it will lead to multi-layer proxy, which will lead to the following problems:

  1. single data source case: cause branch transaction commit, undo_log itself is also proxied, i.e. generated undo_log for undo_log, assumed to be undo_log2, at this time, undo_log will be treated as a branch transaction; branch transaction rollback, because of the undo_log2 generated by the faulty in undo_log corresponding transaction branch rollback. When the branch transaction is rolled back, because there is a problem with the generation of undo_log2, when the transaction branch corresponding to the undo_log is rolled back, it will delete the undo_log associated with the business table, which will lead to the discovery that the business table corresponding to the business tableis rolled back and theundo_logdoesn't exist, and thus generate an additional status of 1 for theundo_log.' This time, the overall logic is already messed up, which is a very serious problem!
  2. multiple data sources and logical data sources are proxied case: in addition to the problems that will occur in the case of a single data source, may also cause deadlock problems. The reason for the deadlock is that for the undo_log operation, the select for update and delete operations that should have been performed in one transaction are spread out over multiple transactions, resulting in one transaction not committing after executing the select for update, and one transaction waiting for a lock when executing the delete until the timeout expires, and then the lock will not lock until the timeout expires. until it times out.

Proxy description

This is a layer of DataSource proxying that overrides some methods. For example, the getConnection method does not return a Connection, but a ConnectionProxy, and so on.

// DataSourceProxy

public DataSourceProxy(DataSource targetDataSource) {
this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
}

private void init(DataSource dataSource, String resourceGroupId) {
DefaultResourceManager.get().registerResource(this); }
}

public Connection getPlainConnection() throws SQLException {
return targetDataSource.getConnection(); } public Connection getPlainConnection(); return targetDataSource.
}

@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection(); } @Override public ConnectionProxy getConnection(); }
return new ConnectionProxy(this, targetConnection);
}

Manual Proxy

That is, manually inject a DataSourceProxy as follows

@Bean
public DataSource druidDataSource() {
return new DruidDataSource()
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource); }
}

AutoProxy

Create a proxy class for DataSource, get DataSourceProxy (create it if it doesn't exist) based on DataSource inside the proxy class, and then call the relevant methods of DataSourceProxy. The core logic is in SeataAutoDataSourceProxyCreator.

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final String[] excludes; private final Advisor advisor = new SeataAutoDataSourceProxyCreator.class
private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());

public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {
this.excludes = excludes;
setProxyTargetClass(!useJdkProxy);
}

@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<? > beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
return new Object[]{advisor};
}

@Override
protected boolean shouldSkip(Class<? > beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}
}

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
}
public Object invoke(MethodInvocation invocation) throws Throwable {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (m ! = null) {
return m.invoke(dataSourceProxy, args); } else { m.invoke(DataSourceProxy.class, method.getName(), method.getParameterTypes())
} else {
return invocation.proceed();
}
}

@Override
public Class<? >[] getInterfaces() {
return new Class[]{SeataProxy.class};
}
}

Data Source Multi-Level Proxy

@Bean.
@DependsOn("strangeAdapter")
public DataSource druidDataSource(StrangeAdapter strangeAdapter) {
druidDataSource(StrangeAdapter strangeAdapter) { doxx
return new DruidDataSource()
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource); }
}
  1. First we inject two DataSources into the configuration class: DruidDataSource and DataSourceProxy, where DruidDataSource is used as the targetDataSource attribute of DataSourceProxy and DataSourceProxy is used as the targetDataSource attribute of DruidDataSource. DataSourceProxyis declared using the@Primary` annotation.
  2. The application has automatic data source proxying enabled by default, so when calling methods related to DruidDataSource, a corresponding data source proxy DataSourceProxy2 will be created for DruidDataSource.
  3. What happens when we want to get a Connection in our application?
  4. first get a DataSource, because the DataSourceProxy is Primary, so we get a DataSourceProxy. 2. based on the DataSource, we create a corresponding DataSourceProxy2.
  5. get a Connection based on the DataSource, i.e. get the Connection through the DataSourceProxy. At this time, we will first call the getConnection method of targetDataSource, i.e. DruidDataSource, but since the cutover will intercept DruidDataSource, according to the interception logic in step 2, we can know that a DataSourceProxy2will be created automatically, and then call theDataSourceProxy2. Then call DataSourceProxy2#getConnection, and then call DruidDataSource#getConnection. This results in a two-tier proxy, and the returned Connectionis also a two-tierConnectionProxy`.

!

The above is actually the modified proxy logic, Seata's default autoproxy will proxy the DataSourceProxy again, the consequence is that there is one more layer of proxy at this time the corresponding diagram is as follows

!

The two problems that can result from multiple layers of proxies for a data source are summarised at the beginning of the article, with case studies below.

Branching Transaction Commits

What happens when the corresponding method is executed through the ConnectionProxy? Let's take an example of a branching transaction commit involving an update operation:

  1. Execute ConnectionProxy#prepareStatement, which returns a PreparedStatementProxy.
  2. Execute PreparedStatementProxy#executeUpdate, PreparedStatementProxy#executeUpdate will probably do two things: execute the business SQL and commit the undo_log.

Commit business SQL

// ExecuteTemplate#execute
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT.
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementLoader.load(InsertExecutor.class, dbType)) { case INSERT.
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new
new Object[]{statementProxy, statementCallback, sqlRecognizer});
statementProxy, statementCallback, sqlRecognizer}); break;
case UPDATE: executor = new UpdateExecutor
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case UPDATE.
break;
case DELETE.
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE.
break; case SELECT_FOR_UPDATE.
case SELECT_FOR_UPDATE: executor = new SelectForUpdate.
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE.
break; break
default: executor = new PlainExecutor
executor = new PlainExecutor<>(statementProxy, statementCallback); break; default.
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } else { executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); }
}

The main process is: first execute the business SQL, then execute the commit method of the ConnectionProxy, in which the corresponding undo_log SQL will be executed for us, and then commit the transaction.

PreparedStatementProxy#executeUpdate =>
ExecuteTemplate#execute =>
BaseTransactionalExecutor#execute =>
AbstractDMLBaseExecutor#doExecute =>
AbstractDMLBaseExecutor#executeAutoCommitTrue =>
AbstractDMLBaseExecutor#executeAutoCommitFalse => In this step, the statementCallback#execute method will be triggered, i.e. the native PreparedStatement#executeUpdate method will be called.
ConnectionProxy#commit
ConnectionProxy#processGlobalTransactionCommit

UNDO_LOG insert

// ConnectionProxy#processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
try {
// Register for a branch transaction, simply understand that a request is sent to the server, and then the server inserts a record into the branch_table table.
register();
} catch (TransactionException e) {
// If there is no for update sql, it will register directly before commit, then not only insert a branch record, but also lock information for the competition, the following exception is generally thrown in the registration did not get the lock, generally is pure update statement concurrency will trigger the competition lock failure exception @FUNKYE
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// undo_log handling, expect targetConnection handling @1
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // Commit local transaction, expect targetConnection.

// Commit the local transaction, expecting it to be handled by targetConnection @2
targetConnection.commit(); } catch (Throwable ex)
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); report(false); }
report(false); } catch (Throwable ex); }
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true); }
}
context.reset();
}
  1. undo_log processing @1, parses the undo_log involved in the current transaction branch and writes it to the database using TargetConnection.
   public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}

String xid = connectionContext.getXid(); long branchId = connectionContext.hasUndoLog(); { return; }
long branchId = connectionContext.getBranchId(); }

BranchUndoLog branchUndoLog = new BranchUndoLog(); branchUndoLog.setBranchId = connectionContext.getBranchId(); }
branchUndoLog.setXid(xid); branchUndoLog.
branchUndoLog.setBranchId(branchId); branchUndoLog.
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}

insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,cp.getTargetConnection());
}
  1. Commit local transaction @2, i.e., commit the transaction via TargetConnection. That is, the same TargetConnection is used for service SQL execution, undo_log write, and i.e. transaction commit.

lcn's built-in database solution, lcn is to write undolog to his embedded h2 (I forget if it's this one) database, at this time it will become two local transactions, one is h2's undolog insertion transaction, one is the transaction of the business database, if the business database is abnormal after the insertion of the h2, lcn's solution will be data redundancy, roll back the data. data is the same, delete undolog and rollback business data is not a local transaction. But the advantage of lcn is the invasion of small, do not need to add another undolog table. Thanks to @FUNKYE for the advice, I don't know much about lcn, I'll look into it when I get a chance!

Branch Transaction Rollback

  1. Server sends a rollback request to Client. 2.
  2. Client receives the request from Server, and after a series of processing, it ends up in the DataSourceManager#branchRollback method. 3.
  3. first according to the resourceId from the DataSourceManager.dataSourceCache to get the corresponding DataSourceProxy, at this time for the masterSlaveProxy (rollback stage we do not test the proxy data source, simple and direct, anyway, the final get all the TragetConnection)
  4. According to the xid and branchId sent from the Server side to find the corresponding undo_log and parse its rollback_info attribute, each undo_log may be parsed out of more than one SQLUndoLog, each SQLUndoLog can be interpreted as an operation. For example, if a branch transaction updates table A and then table B, the undo_log generated for the branch transaction contains two SQLUndoLogs: the first SQLUndoLog corresponds to the snapshot before and after the update of table A; the second SQLUndoLog corresponds to the snapshot before and after the update of table B.
  5. for each SQLUndoLog execute the corresponding rollback operation, for example, a SQLUndoLog corresponds to the operation INSERT, then its corresponding rollback operation is DELETE.
  6. Delete the undo_log based on the xid and branchId.
// AbstractUndoLogManager#undo removes some non-critical code

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true; for (; ; ) {

for (; ; ) {
try {
// Get the connection to the original data source, we don't care about the proxy data source in the rollback phase, we'll end up with the TargetConnection.
conn = dataSourceProxy.getPlainConnection(); // Get the connection to the native data source.

// Put the rollback operation in a local transaction and commit it manually, making sure that the final business SQL operation is committed along with the undo_log delete operation.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}

// Query undo_log based on xid and branchId, note the SQL statement SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId); selectPST.setString(1, branchId); selectPST.setString(1, branchId)
selectPST.setString(2, xid);
rs = selectPST.executeQuery(); boolean exists = false; rs = selectPST.

boolean exists = false; while (rs.next())
while (rs.next()) {
exists = true; boolean exists = false; while (rs.next()) {
// status == 1 undo_log is not processed, related to anti-suspension
if (!canUndo(state)) {
return; }
}

// Parsing the undo_log
byte[] rollbackInfo = getRollbackInfo(rs); // Parsing the undo_log.
BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance(serialiser).decode(rollbackInfo);
try {
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); if (sqlUndoLog.getSqlUndoLogs(parser.getName()); } }
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
// Execute the corresponding rollback operation
undoExecutor.executeOn(conn);
}
}
}

// If (exists) { undoExecutor.executeOn(conn); }
if (exists) {
LOGGER.error("\n delete from undo_log where xid={} AND branchId={} \n", xid, branchId);
deleteUndoLog(xid, branchId, conn);
conn.commit();
// and anti-suspension related If no undo_log is found based on xid and branchId, it means that there is an exception in the branch transaction: for example, the business process timed out, resulting in a global transaction rollback, but the business undo_log was not inserted at that time.
} else {
LOGGER.error("\n insert into undo_log xid={},branchId={} \n", xid, branchId);
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
}
return; }
} catch (Throwable e) {
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,branchId, e.getMessage()), e); }
}
}
}

There are several notes:

  1. rollback does not take into account data source proxying, and ends up using TargetConnection.
  2. set atuoCommit to false, i.e. you need to commit the transaction manually
  3. for update is added when querying the undo_log based on xid and branchId, i.e. the transaction will hold the lock for this undo_log until all rollbacks are complete, as it is not until they are done that the

Multi-Tier Proxy Issues

Several issues that can be caused by multi-tier proxying of data sources have been mentioned at the beginning of the article, focusing on analysing why the above issues are caused:

Impact on branch transaction commits

Let's start by analysing what happens if we use a two-tier proxy. Let's analyse it from two aspects: business SQL and undo_log

  1. business SQL
   PreparedStatementProxy1.executeUpdate =>
statementCallback#executeUpdate(PreparedStatementProxy2#executeUpdate) =>
PreparedStatement#executeUpdate

It doesn't seem to matter, it's just an extra loop, and it's still executed through PreparedStatement in the end.

  1. undo_log
ConnectionProxy1#getTargetConnection ->
ConnectionProxy2#prepareStatement ->
PreparedStatementProxy2#executeUpdate ->
PreparedStatement#executeUpdate (native undo_log write, before generating undo_log2 (the undo_log of undo_log) for that undo_log) ->
ConnectionProxy2#commit ->
ConnectionProxy2#processGlobalTransactionCommit(write undo_log2) ->
ConnectionProxy2#getTargetConnection ->
TargetConnection#prepareStatement ->
PreparedStatement#executeUpdate

Impact on branch transaction rollback

Why isn't the undo_log deleted after a transaction rollback?

It is not actually not deleted. As I said before, the two-tier proxy causes the undo_log to be treated as a branch transaction, so it generates an undo_log for that undo_log (assuming it's undo_log2), and undo_log2 is generated wrongly (which is fine, it should be generated this way), which results in the business-table-associated undo_log being deleted when rolling back. This leads to a rollback that deletes the undo_log associated with the business table, which ultimately leads to the business table corresponding to the transaction branch rolling back to find that the undo_log does not exist, thus generating one more undo_log with a status of 1.

Before the rollback

// undo_log
84 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.1KB 0
85 59734075254222849 172.16.120.59:23004:59734061438185472 serializer=jackson 4.0KB 0

// branch_table
59734070967644161 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware
59734075254222849 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware

// lock_table
jdbc:mysql://xx^^^seata_storage^^^1 59734070967644161 jdbc:mysql://172.16.248.10:3306/tuya_middleware seata_storage 1
jdbc:mysql://xx^^^^undo_log^^^^84 59734075254222849 jdbc:mysql://172.16.248.10:3306/tuya_middleware undo_log 84

After the rollback

// An undo_log with status 1 was generated, corresponding to the log: undo_log added with GlobalFinished
86 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.0Byte 1

Problem analysis

  1. find the corresponding undo_log log based on xid and branchId
  2. parse the undo_log, mainly its rollback_info field, rollback_info is a SQLUndoLog collection, each SQLUndoLog corresponds to an operation, which contains a snapshot before and after the operation, and then perform a corresponding rollback
  3. Delete undo_log logs based on xid and branchId.

Because of the two-tier proxy problem, an undo_log becomes a branch transaction, so when a rollback occurs, we also need to rollback the undo_log branch transaction: 1, first according to the xid and branchId to find the corresponding undo_log and parse its rollback_info attribute, here the parsed rollback_info contains two SQLUndoLog. Why are there two?

If you think about it, you can understand that the first layer of proxy operations on seata_storage are put into the cache, which should be cleared after execution, but because of the two-tier proxy, the process is not finished at this time. When it's the second tier proxy's turn to operate on undo_log, it puts that operation into the cache, and at that point there are two operations in the cache, UPDATE for seata_storage and INSERT for undo_log. So it's easy to see why the undo_log operation is extra large (4KB) because it has two operations in its rollback_info.

One thing to note is that the first SQLUndoLog corresponds to the after snapshot, which has branchId=59734070967644161 pk=84, i.e., branchIdcorresponding to theseata_storage branch and undo_log corresponding to the seata_storage PK. In other words, the undo_log rollback deletes the seata_storage corresponding undo_log`. How to delete the undo_log itself? In the next logic, it will be deleted according to xid and branchId.

  1. Parsing the first SQLUndoLog, it corresponds to the INSERToperation ofundo_log, so its corresponding rollback operation is DELETE. Because undo_logis treated as a business table at this point. So this step will delete the59734075254222849corresponding to theundo_log, **but this is actually the corresponding business table corresponding to the corresponding undo_log`**.

3, parse the second SQLUndoLog, at this time corresponds to the seata_storage UPDATE operation, this time will be through the snapshot of the seata_storage corresponding to the recovery of records

4、Delete the undo_log log according to xid and branchId, here the deletion is the undo_log of undo_log , i.e. undo_log2. So, by this point, both undo_logs have been deleted.

  1. Next, roll back seata_storage, because at this time its corresponding undo_log has been deleted in step 2, so at this time can not check the undo_log, and then regenerate a status == 1 undo_log.

Case Study

Background

  1. Three data sources are configured: two physical data sources and one logical data source, but the corresponding connection addresses of the two physical data sources are the same. Is this interesting?
@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}

@Bean("dsSlave")
DynamicDataSource dsSlave() {
return new DynamicDataSource(slaveDsRoute); }
}

@Primary
@Bean("masterSlave")
DataSource masterSlave(@Qualifier("dsMaster") DataSource dataSourceMaster,
@Qualifier("dsSlave") DataSource dataSourceSlave) throws SQLException {
Map<String, DataSource> dataSourceMap = new HashMap<>(2);
// Master database
dataSourceMap.put("dsMaster", dataSourceMaster);
//slave
dataSourceMap.put("dsSlave", dataSourceSlave); // Configure read/write separation rules.
// Configure read/write separation rules
MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(
"masterSlave", "dsMaster", Lists.newArrayList("dsSlave")
);
Properties shardingProperties = new Properties();
shardingProperties.setProperty("sql.show", "true");
shardingProperties.setProperty("sql.simple", "true");
// Get the data source object
DataSource dataSource = MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, masterSlaveRuleConfig, shardingProperties);
log.info("datasource initialised!");
return dataSource;˚
}

!

2, open seata's data source dynamic proxy, according to seata's data source proxy logic can be known, will eventually generate three proxy data sources, the relationship between the native data source and the proxy data source is cached in the DataSourceProxyHolder.dataSourceProxyMap, if the native data source and the proxy data source corresponds to the relationship is as follows:

dsMaster(DynamicDataSource) => dsMasterProxy(DataSourceProxy)
dsSlave(DynamicDataSource) => dsSlaveProxy(DataSourceProxy)
masterSlave(MasterSlaveDataSource) => masterSlaveProxy(DataSourceProxy)

So, ultimately, the three data sources that exist in the IOC container are: dsMasterProxy, dsSlaveProxy, masterSlaveProxy. According to the @Primary feature, when we get a DataSource from the container, the default data source returned is the proxy masterSlaveProxy.

I haven't studied shardingjdbc specifically, but just guessed its working mechanism based on the code I saw during the debug.

masterSlaveProxy can be seen as MasterSlaveDataSource wrapped by DataSourceProxy. We can venture to guess that MasterSlaveDataSource is not a physical data source, but a logical data source, which can simply be thought of as containing routing logic. When we get a connection, we will use the routing rules inside to select a specific physical data source, and then get a real connection through that physical data source. The routing rules should be able to be defined by yourself. According to the phenomenon observed when debugging, the default routing rules should be:

  1. for select read operations, will be routed to the slave library, that is, our dsSlave

  2. for update write operations, will be routed to the master library, that is, our dsMaster

  3. When each DataSourceProxy is initialised, it will parse the connection address of that real DataSource, and then maintain that connection address and the DataSourceProxy itself in DataSourceManager.dataSourceCache. The DataSourceManager.dataSourceCache is used for rollback: when rolling back, it finds the corresponding DataSourceProxy based on the connection address, and then does the rollback operation based on that DataSourceProxy. But we can find this problem, these three data sources are resolved to the same connection address, that is, the key is duplicated, so in the DataSourceManager.dataSourceCache, when the connection place is the same, after the registration of the data source will overwrite the existing one. That is: DataSourceManager.dataSourceCache ultimately exists masterSlaveProxy, that is to say, will ultimately be rolled back through the masterSlaveProxy, this point is very important.

4, the table involved: very simple, we expect a business table seata_account, but because of the duplicate proxy problem, resulting in seata will undo_log also as a business table

  1. seata_account
  2. undo_log

OK, here's a brief background, moving on to the Seata session

Requirements

We have a simple requirement to perform a simple update operation inside a branch transaction to update the count value of seata_account. After the update, manually throw an exception that triggers a rollback of the global transaction. To make it easier to troubleshoot and reduce interference, we use one branch transaction in the global transaction and no other branch transactions.SQL is as follows.

update seata_account set count = count - 1 where id = 100;

Problems

Client: In the console log, the following logs are printed over and over again

  1. the above logs are printed at 20s intervals, and I checked the value of the innodb_lock_wait_timeout property of the database, and it happens to be 20, which means that every time a rollback request comes through, the rollback fails because of the timeout for acquiring the lock (20).
  2. Why is it not printed once after 20s? Because the server side will have a timer to process the rollback request.
// Branch rollback starts
Branch rollback start: 172.16.120.59:23004:59991911632711680 59991915571163137 jdbc:mysql://172.16.248.10:3306/tuya_middleware

// undo_log transaction branch The original action corresponds to insert, so it rolls back to delete.
undoSQL undoSQL=DELETE FROM undo_log WHERE id = ? and PK=[[id,139]]
// Since the corresponding operation of the first-level agent is also in the context, when the undo_log branch transaction commits, the corresponding undo_log contains two actions
undoSQL undoSQL=UPDATE seata_account SET money = ? WHERE id = ? and PK=[[id,1]].

// After the branch transaction has been rolled back, delete the corresponding undo_log for that branch transaction
delete from undo_log where xid=172.16.120.59:23004:59991911632711680 AND branchId=59991915571163137

// Threw an exception indicating that the rollback failed because `Lock wait timeout exceeded`, and failed when deleting the undo_log based on the xid and branchId because a lock acquisition timeout occurred, indicating that there was another operation that held a lock on the record that was not released.
branchRollback failed. branchType:[AT], xid:[172.16.120.59:23004:59991911632711680], branchId:[59991915571163137], resourceId:[jdbc. mysql://172.16.248.10:3306/tuya_middleware], applicationData:[null]. reason:[Branch session rollback failed and try again later xid = 172.16.120.59:23004:59991911632711680 branchId = 59991915571163137 Lock wait timeout exceeded; try restarting transaction]

Server: the following log is printed every 20s, indicating that the server is constantly retrying to send a rollback request

Rollback branch transaction failed and will retry, xid = 172.16.120.59:23004:59991911632711680 branchId = 59991915571163137

The SQL involved in the process is roughly as follows:

1. SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE slaveDS
2. SELECT * FROM undo_log WHERE (id ) in ( (?) ) slaveDS
3. DELETE FROM undo_log WHERE id = ? masterDS
4. SELECT * FROM seata_account WHERE (id ) in ( (?) ) masterDS
5. UPDATE seata_account SET money = ? WHERE id = ? masterDS
6. DELETE FROM undo_log WHERE branch_id = ? AND xid = ? masterDS

At this point, check the database transaction status, lock status, lock wait relationship 1, check the current transaction being executed

SELECT * FROM information_schema.INNODB_TRX.

!

  1. Check the current lock status
SELECT * FROM information_schema.INNODB_LOCKs;

!

  1. Check the current lock wait relationship
SELECT * FROM information_schema.INNODB_LOCK_waits;

SELECT
block_trx.trx_mysql_thread_id AS sessionID that already holds a lock, request_trx.
request_trx.trx_mysql_thread_id AS the sessionID that is requesting the lock,
block_trx.trx_query AS the SQL statement that already holds the lock, request_trx.
request_trx.trx_query AS the SQL statement for which the lock is being requested,
waits.blocking_trx_id AS Transaction ID that already holds the lock, waits.requesting_trx.trx_query
waits.requesting_trx_id AS 正在申请锁的事务ID,
waits.requested_lock_id AS the ID of the lock object, waits.
locks.lock_table AS lock_table, -- table locked by the lock object
locks.lock_type AS lock_type, -- lock type
locks.lock_mode AS lock_mode -- lock mode
FROM
information_schema.innodb_lock_waits AS waits
INNER JOIN information_schema.innodb_trx AS block_trx ON waits.blocking_trx_id = block_trx.trx_id
INNER JOIN information_schema.innodb_trx AS request_trx ON waits.requesting_trx_id = request_trx.trx_id
INNER JOIN information_schema.innodb_locks AS locks ON waits.requested_lock_id = locks.lock_id;

!

  1. the record involved is branch_id = 59991915571163137 AND xid = 172.16.120.59:23004:59991911632711680.
  2. transaction ID 1539483284 holds the lock for this record, but its corresponding SQL is empty, so it should be waiting for a commit.
  3. transaction ID 1539483286 is trying to acquire a lock on this record, but the logs show that it is waiting for a lock timeout.

Probably a good guess is that select for update and delete from undo ... are in conflict. According to the logic in the code, these two operations should have been committed in a single transaction, so why have they been separated into two transactions?

Problem Analysis

In conjunction with the rollback process described above, let's look at what happens during the rollback of our example.

  1. first get the data source, at this time dataSourceProxy.getPlainConnection() to get the MasterSlaveDataSource data source
  2. during the select for update operation, get a Connection from the MasterSlaveDataSource, as I said before, the MasterSlaveDataSource is a logical datasource, which has a routing logic, according to the above, this time we get the dsSlave's Connection, and then we get the ddsSlave's Connection. dsSlave's Connection`.
  3. When executing the delete from undo ... 3. When performing the delete from undo ...' operation, you get the Connection from the `dsMaster'.
  4. Although dsSlave and dsMaster correspond to the same address, they must be getting different connections, so the two operations must be spread across two transactions.
  5. the transaction that executes select for update will wait until the deletion of the undo_log is complete before committing.
  6. the transaction that executes delete from undo ... The transaction executing delete from undo ...' waits for the select for update transaction to release the lock.
  7. Typical deadlock problem

Verify the conjecture

I tried to verify this problem in two ways:

  1. change the Seata code from select for update to select, then the query to undo_log does not need to hold a lock on the record, and will not cause a deadlock.

  2. change the data source proxy logic, this is the key to the problem, the main cause of the problem is not select for update. The main cause of the problem is not select for update. The multi-layer proxy problem has already been created before that, and then it will cause the deadlock problem. We should never have proxied the masterSlave datasource in the first place. It's just a logical data source, so why proxy it? If we proxy the masterSlave, we won't cause multiple layers of proxies, and we won't cause the deadlock problem when deleting the undo_log!

Final implementation

masterSlave is also a DataSource type, how to proxy just dsMaster and dsSlave but not masterSlave? Observing the SeataAutoDataSourceProxyCreator#shouldSkip method, we can solve this problem with the excludes attribute of the EnableAutoDataSourceProxy annotation

@Override
protected boolean shouldSkip(Class<? > beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}

i.e.: turn off the data source autoproxy, then add this annotation to the startup class

@EnableAutoDataSourceProxy(excludes = {"org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource"})

Autoproxy optimisation in new releases

Since Seata 1.4.0 has not been officially released yet, I'm currently looking at the 1.4.0-SNAPSHOT version of the code, which is the latest code in the ddevelop branch at the current time

Code changes

The main changes are as follows, but I won't go into too much detail on the minor ones:

  1. DataSourceProxyHolder adjustment
  2. DataSourceProxy adjustment
  3. SeataDataSourceBeanPostProcessor is added.

DataSourceProxyHolder

The most significant of the changes to this class are to its putDataSource method

public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource; if (dataSource instanceof SeataDataSource)
if (dataSource instanceof SeataDataSourceProxy) {
SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;
// If this is a proxy data source and it is the same as the current application's configured data source proxy mode (AT/XA), then return it directly
if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
return (SeataDataSourceProxy)dataSource; }
}

// If it's a proxy data source, and the data source proxy mode (AT/XA) is different from the one configured by the current application, then you need to get its TargetDataSource and create a proxy data source for it.
originalDataSource = dataSourceProxy.getTargetDataSource(); } else { dataSourceProxy.getTargetDataSource()
} else {
originalDataSource = dataSource; } else { originalDataSource = dataSource.
}

// If necessary, create a proxy data source based on the TargetDataSource.
return this.dataSourceProxyMap.computeIfAbsent(originalDataSource, originalDataSource, BranchType.
BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new); }
}

The DataSourceProxyHolder#putDataSource method is used in two main places: in the SeataAutoDataSourceProxyAdvice cutout; and in the SeataDataSourceBeanPostProcessor. What problem does this judgement solve for us? The problem of multi-tier proxying of data sources. Think about the following scenarios with automatic data source proxying turned on:

  1. If we manually injected a DataSourceProxy into our project, a call to the DataSourceProxyHolder#putDataSource method in a cutover would return the DataSourceProxy itself directly, without creating another ` DataSourceProxy
  2. if we manually inject a DruidSource into the project, then the DataSourceProxyHolder#putDataSource method will create another DataSourceProxy for it and return it when it is called from the facet.

It looks like the problem is solved, but is it possible that there are other problems? Take a look at the following code

@Bean
public DataSourceProxy dsA(){
return new DataSourceProxy(druidA)
}

@Bean
public DataSourceProxy dsB(DataSourceProxy dsA){
return new DataSourceProxy(dsA)
}
  1. this is definitely wrong, but you can't help it if he wants to write it this way
  2. there's nothing wrong with dsA, but dsB still has a double proxy problem, because the TargetDataSource of dsB is dsA.
  3. This brings us to the DataSourceProxy change.

DataSourceProxy

public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
// The following judgement ensures that we don't have a two-tier proxy problem even when we pass in a DataSourceProxy
if (targetDataSource instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
}
this.targetDataSource = targetDataSource;
init(targetDataSource, resourceGroupId);
}

SeataDataSourceBeanPostProcessor

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);

......

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy. if (!excludes.contains.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}

//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy."); return ((SeataDataSourceProxy); } }
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean.
}
}
  1. SeataDataSourceBeanPostProcessor implements the BeanPostProcessor interface, which executes the BeanPostProcessor#postProcessAfterInitialization method after a bean is initialised. That is, in the postProcessAfterInitialization method, the bean is already available at this point.
  2. Why provide such a class? From its code, it is just to initialise the corresponding DataSourceProxy for the data source after the bean has been initialised, but why is this necessary?

Because some data sources may not be initialised (i.e. the relevant methods of the data source will not be called) after the application is started. If the SeataDataSourceBeanPostProcessor class is not provided, then the DataSourceProxyHolder#putDataSource method will only be triggered in the SeataAutoDataSourceProxyAdvice cutout. If a client goes down during the rollback, after restarting, the Server sends it a rollback request via a timed task, at which point the client needs to first find the corresponding DatasourceProxy based on the rsourceId (connection address). However, if the client hasn't triggered the data source's related methods before then, it won't enter the SeataAutoDataSourceProxyAdvice cutover logic, and won't initialise the corresponding DataSourceProxy for the data source, which will result in the failure of the rollback.

Multi-Layer Proxy Summary

Through the above analysis, we probably already know some optimisations of seata in avoiding multi-layer proxies, but there is actually one more issue to pay attention to:** Logical data source proxies** !

The calling relationship at this point is: masterSlaveProxy -> masterSlave -> masterproxy/slaveProxy -> master/slave

At this point you can exclude the logical datasource via the excludes attribute so that no datasource proxy is created for it.

To summarise:

  1. when initialising the corresponding DataSourceProxy for a DataSource, determine whether it is necessary to create a corresponding DataSourceProxy for it, and if it is a DataSourceProxy itself, return it directly.
  2. For the case of manual injection of some DataSource, in order to avoid the problem of multi-layer proxy caused by human error, we add a judgement in the constructor of DataSourceProxy, If the input parameter TragetDatasource is a DataSourceProxy itself, then we get the target attribute of TragetDatasource as the target attribute of the new DataSourceProxy. TragetDatasource of the new DataSourceProxy.
  3. for other cases, such as logical data source proxy issues, add exclusions to the excludes attribute to avoid creating a DataSourceProxy for the logical data source.

Suggestions for using global and local transactions

There is a question, if there are multiple DB operations involved in a method, say 3 update operations are involved, do we need to use @Transactional annotation in spring for this method? We consider this question from two perspectives: without @Transactional annotation and with @Transactional annotation.

Not using the @Transactional annotation

  1. in the commit phase, since the branch transaction has 3 update operations, each time the update operation is executed, a branch transaction will be registered with the TC through the data broker and a corresponding undo_log will be generated for it, so that the 3 update operations will be treated as 3 branch transactions
  2. In the rollback phase, the three branch transactions need to be rolled back.
  3. data consistency is ensured by the seata global transaction.

Use the @Transactional annotation.

  1. in the commit phase, the three update operations are committed as one branch transaction, so only one branch transaction will be registered in the end
  2. in the rollback phase, 1 branch transaction needs to be rolled back.
  3. data consistency: the 3 update operations are guaranteed by the consistency of the local transaction; global consistency is guaranteed by the seata global transaction. At this point, the 3 updates are just a branch transaction.

Conclusion

Through the above comparison, the answer is obvious, the reasonable use of local transactions can greatly improve the processing speed of global transactions. The above is just 3 DB operations, what if there are more DB operations involved in a method, then the difference between the two ways is not greater?

Finally, thanks to @FUNKYE for answering a lot of questions and providing valuable suggestions!

· 11 min read

【Distributed Transaction Seata source code interpretation II】 Client-side startup process

In this paper, we analyse the Client-side startup process in AT mode from the source code point of view, the so-called Client-side, i.e. the business application side. Distributed transactions are divided into three modules: TC, TM, RM, where TC is located in the seata-server side, while TM, RM through the SDK way to run in the client side.

The following figure shows a distributed transaction scenario of Seata's official demo, divided into the following several microservices, which together implement a distributed transaction of placing an order, deducting inventory, and deducting balance.

  • **BusinessService: ** business service, the entrance to the order placing service
  • StorageService: Inventory microservice, used to deduct the inventory of goods
  • OrderService: Order microservice, to create orders
  • AccountService: Account microservice, debits the balance of the user's account

! [Insert image description here](https://img-blog.csdnimg.cn/20200820184156758.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10, text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTE0NTg0OA==,size_16,colour_FFFFFF,t_70#pic_center)

It can also be seen from the above figure that in AT mode Seata Client side implements distributed transactions mainly through the following three modules:

  • GlobalTransactionScanner: GlobalTransactionScanner is responsible for initialising the TM, RM module and adding interceptors for methods that add distributed transaction annotations, the interceptors are responsible for the opening, committing or rolling back of the global transaction
  • DatasourceProxy: DatasourceProxy for DataSource to add interception , the interceptor will intercept all SQL execution , and as RM transaction participant role in the distributed transaction execution .
  • Rpc Interceptor: In the previous article Distributed Transaction Seata Source Code Interpretation I there are a few core points of distributed transaction mentioned, one of which is Cross-Service Instance Propagation of Distributed Transactions The Rpc Interceptor is responsible for propagating transactions across multiple microservices.

seata-spring-boot-starter

There are two ways to refer to the seata Distributed Transaction SDK, relying on seata-all or seata-spring-boot-starter. It is recommended to use the seata-spring-boot-starter because the starter has automatically injected the three modules mentioned above, and the user only needs to add the corresponding configuration in the business code to add a global distributed transaction annotation can be. Here's how to start with the code in the seata-spring-boot-starter project:

The following figure shows the project structure of seata-spring-boot-starter: ! [Insert image description here](https://img-blog.csdnimg.cn/20200810204518853.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10, text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTE0NTg0OA==,size_16,colour_FFFFFF,t_70) It is mainly divided into the following modules:

  • properties: The properties directory contains the configuration classes that Springboot adapts to seata, i.e., you can use SpringBoot's configuration to configure the parameters of seata.
  • provider: The classes in the provider directory are responsible for adapting Springboot and SpringCloud configurations to the Seata configuration.
  • resources: There are two main files in the resources directory, spring.facts for registering Springboot auto-assembly classes and ExtConfigurationProvider for registering the SpringbootConfigurationProvider class, the Provider class is responsible for adapting SpringBoot related configuration classes to Seata.

For the springboot-starter project, let's first look at the resources/META-INF/spring.factors file:

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration

You can see that the autoconfiguration class is configured in spring.facts: SeataAutoConfiguration, in which two instances of GlobalTransactionScanner and seataAutoDataSourceProxyCreator are injected. The code is as follows:

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled",
havingValue = "true",
matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {

...

// GlobalTransactionScanner is responsible for adding interceptors to methods that add the GlobalTransaction annotation.
// and is responsible for initialising the RM, TM
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties,
FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(),
seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler); }
failureHandler); }
}

// The SeataAutoDataSourceProxyCreator is responsible for generating proxies for all DataSources in Spring.
// This enables the interception of all SQL execution.
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {
"enableAutoDataSourceProxy", "enable-auto" +
"-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(), seataProperties.getExpressionCreator(seataProperties.getExpressionCreator))
seataProperties.getExcludesForAutoProxying());
}
}

GlobalTransactionScanner

GlobalTransactionScanner inherits from AutoProxyCreator, which is a way to implement AOP in Spring to intercept all instances in Spring and determine whether they need to be proxied. Below is a list of some of the more important fields in GlobalTransactionScanner and the core methods for intercepting proxies:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitialisingBean, ApplicationContextAware,
DisposableBean {
...
// The interceptor field is the interceptor corresponding to a proxy object.
// It can be thought of as a temporary variable with an expiration date of a proxied object.
private MethodInterceptor interceptor; // globalTransactionalInterceptor.

// globalTransactionalInterceptor is the generic Interceptor.
// It is used by all non-TCC transactional methods.
private MethodInterceptor globalTransactionalInterceptor; // PROXYED_SETTING_OBJECT

// PROXYED_SET stores instances that have already been proxied to prevent duplicate processing.
private static final Set<String> PROXYED_SET = new HashSet<>(); // applicationId is the name of a service.

// applicationId is a unique identifier for a service.
// corresponds to spring.application.name in the springcloud project
private final String applicationId; // The group identifier of the transaction.
// Grouping identifier for the transaction, refer to the wiki article: https://seata.apache.org/zh-cn/docs/user/txgroup/transaction-group/
private final String txServiceGroup; // The group identifier of the transaction.

...

// Determine whether the target object needs to be proxied, and if so, generate an interceptor and assign it to the class variable interceptor.
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// Determine if distributed transactions are disabled
if (disableGlobalTransaction) {
return bean; }
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean; }
}

// Each time a proxied object is processed, the intermediary is set to null, so the intermediary's // lifecycle is that of a proxied object.
// lifecycle is a proxied object, and since the intermediary is used in a separate method, getAdvicesAndAdvisorsForBean
// Since the interceptor is used in a separate method getAdvicesAndAdvisorsForBean, the interceptor is defined as a class variable
interceptor = null; // Determine if this is a TCC transaction.

// Determine whether this is TCC transaction mode, primarily based on the presence of the TwoPhaseBusinessAction annotation on the method
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName,
applicationContext)) {
// Create an interceptor for the TCC transaction
interceptor =
new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
// Get the class type of the object to be processed
Class<? > serviceInterface = SpringProxyUtils.findTargetClass(bean); } else { // Get the class type of the object to be processed.
// Get all interfaces inherited by the object to be processed
Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); // Get all interfaces inherited by the pending object.

// If there is a GlobalTransactional annotation on the class of the pending object or on the inherited interfaces.
// or any of the methods of the class of the object to be handled have a GlobalTransactional or
// GlobalLock annotation on any of the methods of the class of the object to be handled returns true, i.e., it needs to be proxied.
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}

// If the interceptor is null, i.e. not in TCC mode.
// then use globalTransactionalInterceptor as the interceptor
if (interceptor == null) {
// globalTransactionalInterceptor will only be created once
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor =
new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener.addConfigListener(
(ConfigurationChangeListener) globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}

if (!AopUtils.isAopProxy(bean)) {
// If the bean itself is not a Proxy object, then the parent class wrapIfNecessary is called to generate the proxy object
// In the parent class, getAdvicesAndAdvisorsForBean is called to get the interceptor defined above.
bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { getAdvicesAndAdvisorsForBean(bean, beanName, cacheKey); }
} else {
// If the bean is already a proxy, add a new interceptor directly to the proxy's interceptor call chain, AdvisedSupport
// and add the new interceptor directly to the proxy's interception invocation chain.
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName,
getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
// Mark that the beanName has been processed
PROXYED_SET.add(beanName);
return bean; }
}
} catch (Exception exx) {
throw new RuntimeException(exx); }
}
}

// Return the interceptor object computed in the wrapIfNecessary method.
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName,
TargetSource customTargetSource)
throws BeansException {
return new Object[]{interceptor};
}
}

The above describes how GlobalTransactionScanner intercepts global transactions via annotations, the specific interceptor implementations are TccActionInterceptor and GlobalTransactionalInterceptor, for the AT pattern we are mainly concerned with the GlobalTransactionalInterceptor, in subsequent articles will introduce the specific implementation of GlobalTransactionalInterceptor.

In addition GloabalTransactionScanner is also responsible for the initialisation of TM, RM, which is implemented in the initClient method:

private void initClient() {
...

// Initialise the TM
TMClient.init(applicationId, txServiceGroup); ...
...

//Initialise RM
RMClient.init(applicationId, txServiceGroup); ...
...

// Register the Spring shutdown callback to free up resources.
registerSpringShutdownHook(); ... // Register the Spring shutdown callback for releasing resources.

}

TMClient, RMClient are Seata based on Netty implementation of the Rpc framework of the client class, just business logic is different, due to TMClient is relatively more simple, we take RMClient as an example to see the source code:

public class RMClient {
// RMClient's init is a static method that creates an instance of RmNettyRemotingClient and calls the init method.
public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient =
RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}
}

RmNettyRemotingClient is implemented as follows:

@Sharable
public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
// ResourceManager is responsible for handling transaction participants, supports AT, TCC and Saga modes.

// RmNettyRemotingClient singleton.
private static volatile RmNettyRemotingClient instance; // RmNettyRemotingClient instance; // RmNettyRemotingClient instance.
private final AtomicBoolean initialised = new AtomicBoolean(false); // The unique identifier of the microservice.
// Unique identifier of the microservice
private String applicationId; // Distributed transaction group name.
// Distributed transaction group name
private String transactionServiceGroup; // The name of the distributed transaction group.

// The init method is called by the init method in RMClient.
public void init() {
// Register the Processor for Seata's custom Rpc.
registerProcessor(); // If (initialised.compareAndAndroid)
if (initialised.compareAndSet(false, true)) {
// Call the init method of the parent class, which is responsible for initialising Netty and establishing a connection to the Seata-Server in the parent class
super.init();
}
}

// Register the Processor for the Seata custom Rpc.
private void registerProcessor() {
// 1. Register the Processor for the Seata-Server initiating the branchCommit.
RmBranchCommitProcessor rmBranchCommitProcessor =
new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor,
messageExecutor); messageExecutor

// 2. Register the Processor for the Seata-Server initiating the branchRollback.
RmBranchRollbackProcessor rmBranchRollbackProcessor =
new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor
, messageExecutor);

// 3. Register the Processor for the Seata-Server initiating the deletion of the undoLog.
RmUndoLogProcessor rmUndoLogProcessor =
new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor,
rmUndoLogProcessor, rmUndoLogProcessor); messageExecutor);

// 4. Register the Processor for the response returned by Seata-Server, ClientOnResponseProcessor.
// Used to process the Request initiated by the Client and the Response returned by the Seata-Server.
The ClientOnResponseProcessor // is responsible for processing the Request sent by the Client and the Response returned by the Seata-Server.
// Response returned by the Seata-Server, thus implementing Rpc.
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(),
getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor,
null); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null)
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.
onResponseProcessor, null); super.registerProcessor(MessageType.
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT,
onResponseProcessor, null); super.registerProcessor(MessageType.
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);

// 5. Processing Pong messages returned by Seata-Server
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor,
null);
}
}

The above logic seems to be quite complex, and there are many related classes, such as Processor, MessageType, TransactionMessageHandler, ResourceManager, etc. In fact, it's essentially an Rpc call, which can be divided into Rm-initiated and Seata-initiated calls.

  • Rm active call methods: such as: registering branches, reporting branch status, applying global locks, etc. Rm active call methods need to be in the ClientOnResponseProcessor to handle the Response returned by Seata-Server.
  • Seata-Server active call methods: such as: commit branch transactions, rollback branch transactions, delete undolog log. Seata-Server active call methods, the Client side corresponds to a different Processor to deal with, and after the end of processing to return to the Seata-Server processing results. Response. The core implementation logic of transaction commit and rollback are in TransactionMessageHandler and ResourceManager.

About TransactionMessageHandler, ResourceManager implementation will also be described in detail in subsequent chapters.

The next article will introduce the SeataAutoDataSourceProxyCreator, Rpc Interceptor is how to initialise and intercept.

· 8 min read

Seata Demo environment build under Mac (AT mode)

Preface

Recently, because of work needs, research and study Seata distributed transaction framework, this article to learn the knowledge of their own record!

Seata overview

cloc code statistics

First look at the seata project cloc code statistics (as of 2020-07-20)

! cloc-seata

The number of Java code lines is about 97K

Code quality

Unit test coverage 50%

! cloc-seata

Demo code

The demo code in this article is the seata-samples-dubbo module under the seata-samples project at the following address:

https://github.com/apache/incubator-seata-samples/tree/master/dubbo

Core problem solved

The AT pattern Demo example gives a typical distributed transaction scenario:

  • In a purchase transaction, it is necessary to:
  1. deduct the inventory of a product
  2. deduct the user account balance
  3. generate a purchase order
  • Obviously, all three steps must either succeed or fail, otherwise the system's data will be messed up.
  • With the popular microservices architecture, generally speaking, inventory, account balance, and purchase order are three separate systems.
  • Each microservice has its own database and is independent of each other.

Here is the scenario for distributed transactions.

! Design diagram

Solution

The idea of the AT pattern to solve this problem is actually quite simple and is summarised in one sentence:

In the distributed transaction process, record the data to be modified before and after the modification of the value to the undo_log table, in case of abnormalities in the transaction, through the data in this to do a rollback!

Of course, the specific code to implement, I believe that many details are far from being so simple.

Demo code structure

Clone the latest code from github.

git clone git@github.com:apache/incubator-seata-samples.git
``

Read the Demo code structure

```sh
$ cd seata-samples/dubbo/
$ tree -C -I 'target' .
.
├── README.md
├─ pom.xml
├── seata-samples-dubbo.iml
└── src
└── main
├─ java
│ └── io
│ └── seata
│ └── samples
│ └─ dubbo
│ ├── ApplicationKeeper.java
│ ├── Order.java
│ ├── service
│ │ ├── AccountService.java
│ │ ├── BusinessService.java
│ ├── OrderService.java │ ├── OrderService.java
│ │ ├── StorageService.java
│ │ └── impl
│ │ ├── AccountServiceImpl.java
│ │ ├── BusinessServiceImpl.java
│ │ ├── OrderServiceImpl.java
│ │ └── StorageServiceImpl.java
│ └── starter
│ ├── DubboAccountServiceStarter.java │ ├── DubboAccountServiceStarter.java
│ ├── DubboBusinessTester.java
│ ├── DubboOrderServiceStarter.java
│ └── DubboStorageServiceStarter.java
└── resources
├── file.conf
├── jdbc.properties
├── log4j.properties
├── registry.conf
├─ spring
│ ├── dubbo-account-service.xml
│ ├── dubbo-business.xml
│ ├── dubbo-order-service.xml
│ └── dubbo-storage-service.xml
└── sql
├── dubbo_biz.sql
└── undo_log.sql

13 directories, 27 files
  • The four *Starter classes under the io.seata.samples.dubbo.starter package emulate each of the four microservices described above

  • Account

  • Business

  • Order

  • Storage

  • 4 services are standard dubbo services, configuration files in the seata-samples/dubbo/src/main/resources/spring directory

  • To run the demo, you need to start all four services, and Business is the last one to start.

  • The main logic is in io.seata.samples.dubbo.service, and the four implementation classes correspond to the business logic of the four microservices.

  • Configuration file for database information: src/main/resources/jdbc.properties

Timing diagram

! cloc-seata

Ok, get going, Make It Happen!

Run the demo

MySQL

Create a table

Execute the scripts dubbo_biz.sql and undo_log.sql in seata-samples/dubbo/src/main/resources/sql.

mysql> show tables;
+-----------------+
| Tables_in_seata |
+-----------------+
| account_tbl |
| order_tbl |
| storage_tbl |
| undo_log |
+-----------------+
4 rows in set (0.01 sec)

After execution, there should be 4 tables in the database

Modify the seata-samples/dubbo/src/main/resources/jdbc.properties file

Modify the values of the variables according to the environment in which you are running MySQL

jdbc.account.url=jdbc:mysql://localhost:3306/seata
jdbc.account.username=your_username
jdbc.account.password=your_password
jdbc.account.driver=com.mysql.jdbc.
# storage db config
jdbc.storage.url=jdbc:mysql://localhost:3306/seata
jdbc.storage.username=your_username
jdbc.storage.password=your_password
jdbc.storage.driver=com.mysql.jdbc.
# order db config
jdbc.order.url=jdbc:mysql://localhost:3306/seata
jdbc.order.username=your_username
jdbc.order.password=your_password
jdbc.order.driver=com.mysql.jdbc.

ZooKeeper

Start ZooKeeper, my local Mac is using Homebrew installation to start it

$ brew services start zookeeper
==> Successfully started `zookeeper` (label: homebrew.

$ brew services list
Name Status User Plist
docker-machine stopped
elasticsearch stopped
kafka stopped
kibana stopped
mysql started portman /Users/portman/Librar
y/LaunchAgents/homebrew.mxcl.mysql.plist
nginx stopped
postgresql stopped
postgresql stopped
zookeeper started portman /Users/portman/Librar
y/LaunchAgents/homebrew.mxcl.zookeeper.plist

Start the TC transaction coordinator

In this link page, download the corresponding version of seata-server, I downloaded version 1.2.0 locally.

  1. Go to the directory where the file is located and extract the file.
  2. Enter the seata directory
  3. Execute the startup script
$ tar -zxvf seata-server-1.2.0.tar.gz
$ cd seata
$ bin/seata-server.sh

Observe the startup log for error messages, if everything is fine and you see the following Server started message, the startup was successful.

2020-07-23 13:45:13.810 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...

Starting a simulated microservice in the IDE

  1. First import the seata-samples project into your local IDE, I'm using IntelliJ IDEA here.
  2. Refresh the Maven project dependencies.
  3. Start the Account, Order and Storage services before Business can invoke them, the corresponding startup classes are:

The corresponding startup classes are:

io.seata.samples.dubbo.starter.DubboStorageServiceStarter
io.seata.samples.dubbo.starter.DubboOrderServiceStarter
io.seata.samples.dubbo.starter.DubboStorageServiceStarter

After each service is started, you see this message indicating that the service was started successfully

Application is keep running ...

! cloc-seata

After successful startup, the account_tbl, storage_tbl tables will have two initialised data, the account balance and the product inventory respectively

mysql> SELECT * FROM account_tbl; SELECT * FROM storage_tbl;
+----+---------+-------+
| id | user_id | money |
+----+---------+-------+ | id | user_id | money | ----+---------+-------+
| 1 | U100001 | 999 |
+----+---------+-------+ | 1 row in set (0.00.00)
1 row in set (0.00 sec)

+----+----------------+-------+
| id | commodity_code | count |
+----+----------------+-------+ | id | commodity_code | count | ----+----------------+-------+
| 1 | C00321 | 100 |
+----+----------------+-------+
1 row in set (0.00 sec)

Use Business to verify results

Normal

Still executing the main function of the DubboBusinessTester class in the IDE, the programme will exit automatically after running.

If everything is working properly, everything should be committed for each microservice, and the data should be consistent.

Let's take a look at the data changes in MySQL

mysql> SELECT * FROM account_tbl; SELECT * FROM order_tbl; SELECT * FROM storage_tbl.
+----+---------+-------+
| id | user_id | money |
+----+---------+-------+ | id | user_id | money | ----+---------+-------+
| 1 | U100001 | 599 |
+----+---------+-------+ | 1 row in set (0.00.00)
1 row in set (0.00 sec)

+----+---------+----------------+-------+-------+
| id | user_id | commodity_code | count | money |
+----+---------+----------------+-------+-------+
| 1 | U100001 | C00321 | 2 | 400 |
+----+---------+----------------+-------+-------+
1 row in set (0.00 sec)

+----+----------------+-------+
| id | commodity_code | count |
+----+----------------+-------+ | id | commodity_code | count | ----+----------------+-------+
| 1 | C00321 | 98 |
+----+----------------+-------+
1 row in set (0.00 sec)

From the data of the 3 tables, we can see: account balance is deducted by 400; the order table is increased by 1 record; the product inventory is deducted by 2

This result is consistent with the logic of the programme, which means that there is no problem with the transaction.

exception

In fact, even if you do not join the distributed transaction control, everything is normal, the transaction itself will not be a problem

So let's focus on what happens when an exception occurs.

Now I'm going to comment out the exception-throwing code in BusinessServiceImpl and execute DubboBusinessTester once more to see what happens.

		@Override
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
storageService.deduct(commodityCode, orderCount); orderService.create(userId)
orderService.create(userId, commodityCode, orderCount); // release this exception throw.

//Leave this exception comment alone to simulate an exception in the application.
throw new RuntimeException("portman's foooooobar error.");;

}

Next, I executed DubboBusinessTester once again, and during the execution I could see the exception message on the console

Exception in thread "main" java.lang.RuntimeException: portman's foooooobar error.

Now we look again at the data changes in MySQL and see that there are no changes in the data, indicating that the distributed transaction control has worked

Questions to ponder

The above steps just demonstrates seata's simplest demo programme, more complex cases can be discussed and verified later!

There are still some questions and doubts in the learning process, followed by further study

  • Global lock on the performance of the degree of impact
  • undo_log log can be rolled back to the original state, but if the data state has changed how to deal with (for example, increased user points have been spent by other local transactions)

References

  • [What is Seata?] (/docs/overview/what-is-seata)
  • [Quickstart] (/docs/user/quickstart)

Author information

Xu Xiaoga, Software Architect, Kingdee

Github

· 9 min read

RPC module is where I initially started to study Seata source code, so I have had some deep research on Seata's RPC module. After I did some research, I found that the code in the RPC module needs to be optimised to make the code more elegant and the interaction logic more clear and easy to understand, and in line with the original intention of "**Let the world have no difficult to understand In the spirit of "Let there be no difficult RPC communication code", I started the refactoring of the RPC module.

Here I suggest that if you want to know more about Seata interaction details, you may want to start from the source code of RPC module, RPC module is equivalent to Seata's hub, Seata all the interaction logic in the RPC module to show the most.

This refactoring of the RPC module will make the Seata hub more robust and easier to interpret.

Refactoring Inheritance

In the old version of Seata, the overall structure of the RPC module was a bit confusing, especially in terms of the inheritance relationships between classes:

  1. directly inheriting Netty Handler in Remoting class, which makes Remoting class coupled with Netty Handler processing logic;
  2. The inheritance relationship between the Reomting class on the client side and the Reomting class on the server side is not unified;
  3. RemotingClient is implemented by RpcClientBootstrap, while RemotingServer is implemented by RpcServer without an independent ServerBootstrap, which seems to be a very confusing relationship. 4. Some interfaces are not necessary to be extracted;
  4. Some interfaces are not necessary to extract, such as ClientMessageSender, ClientMessageListener, ServerMessageSender and so on, because these interfaces will increase the complexity of the overall structure of the inheritance relationship.

In response to the problems identified above, I did the following during the refactoring process:

  1. Abstract Netty Handler as an inner class and put it in Remoting class. 2) Put RemotingClient as an inner class and put it in RemotingClass;
  1. put RemotingClient as the top-level client interface, define the basic methods of client-server interaction, abstract a layer of AbstractNettyRemotingClient, and the following are respectively RmNettyRemotingClient, TmNettyRemotingClient; RemotingServer is the top-level interface of server, defining the basic methods of interaction between the server and the client, and the implementation of NettyRemotingServer;
  2. At the same time, ClientMessageSender, ClientMessageListener, ServerMessageSender and other interface methods are grouped into RemotingClient, RemotingServer, and implemented by Reomting. class to implement RemotingClient and RemotingServer and unify the inheritance relationship of Remoting class;
  3. Create a new RemotingBootstrap interface and implement NettyClientBootstrap and NettyServerBootstrap on the client and server side respectively, so as to extract the bootstrap logic from the Reomting class.

The inheritance relationship in the latest RPC module is simple and clear, represented by the following class relationship diagram:

  1. AbstractNettyRemoting: the top level abstraction of Remoting class, contains common member variables and common methods for both client and server, has common request methods (we will talk about it later in the article), and Processor processor invocation logic (we will talk about it later in the article);
  2. RemotingClient: the client's top-level interface, defining the basic methods of client-server interaction;
  3. RemotingServer: the top-level interface of the server side, defining the basic methods of interaction between the server side and the client side;
  4. AbstractNettyRemotingClient: client-side abstract class, inherits AbstractNettyRemoting class and implements RemotingClient interface;
  5. NettyRemotingServer: server implementation class, inherits AbstractNettyRemoting class and implements RemotingServer interface;
  6. RmNettyRemotingClient: Rm client implementation class, inherits AbstractNettyRemotingClient class;
  7. TmNettyRemotingClient: Tm client implementation class, inherits AbstractNettyRemotingClient class.

At the same time, the client-side and server-side bootstrap class logic is abstracted out, as shown in the following class relationship diagram:

  1. RemotingBootstrap: bootstrap class interface with two abstract methods: start and stop. 2;
  2. NettyClientBootstrap: client-side bootstrap implementation class. 3;
  3. NettyServerBootstrap: server-side bootstrap implementation class.

Decoupled processing logic

Decoupled processing logic is the processing logic of RPC interactions from the Netty Handler abstracted out, and processing logic into a Processor abstraction, why do this? I'm going to talk about some of the problems that exist right now:

  1. Netty Handler and Processing Logic are blended together, since both client and server share a set of Processing Logic, in order to be compatible with more interactions, in the Processing Logic you can see a lot of difficult to understand judgement logic. 2. in Seata interactions, the Netty Handler is not a Processor;
  2. In Seata's interaction, some requests are processed asynchronously and some requests are processed synchronously, but the expression of synchronous and asynchronous processing in the old processing code logic is very obscure and difficult to understand;
  3. It is not possible to clearly express the relationship between the type of request message and the corresponding processing logic in the code logic;
  4. In the later iterations of Seata, it will be very difficult to add new interaction logic to this part of the code if the processing logic is not extracted from it.

Before extracting the processing logic from the Netty Handler, let's take a look at Seata's existing interaction logic:

  • RM client-server interaction logic:

RM client request server interaction logic:

  • TM client-server interaction logic:

RM Client Request Server Interaction Logic:

  • Interaction logic for a server requesting an RM client:

The interaction logic of Seata can be clearly seen in the above interaction diagram.

The client receives messages from the server side in total:

  1. Server-side request messages
  1. BranchCommitRequest, BranchRollbackRequest, UndoLogDeleteRequest
  1. Server-side response messages
  1. RegisterRMResponse, BranchRegisterResponse, BranchReportResponse, GlobalLockQueryResponse

RegisterTMResponse, GlobalBeginResponse, GlobalCommitResponse, GlobalRollbackResponse, GlobalStatusResponse, GlobalReportResponse 3. HeartbeatMessage(PONG)

The server receives messages from the client in total:

  1. Client request messages:
  1. RegisterRMRequest, BranchRegisterRequest, BranchReportRequest, GlobalLockQueryRequest

RegisterTMRequest, GlobalBeginRequest, GlobalCommitRequest, GlobalRollbackRequest, GlobalStatusRequest, GlobalReportRequest 3. HeartbeatMessage(PING)

  1. Client response message:
  1. BranchCommitResponse, BranchRollbackResponse

Based on the above analysis of the interaction logic, we can abstract the logic of processing messages into a number of Processor, a Processor can handle one or more message types of messages, only in Seata startup registration will be registered to the message type ProcessorTable A Processor can process messages of one or more message types, just register the message types into the ProcessorTable when Seata starts up, forming a mapping relationship, so that the corresponding Processor can be called to process the message according to the message type, as shown in the following diagram:

In the abstract Remoting class, there is a processMessage method, the logic of the method is to get the corresponding Processor from the ProcessorTable according to the message type.

In this way, the processing logic is completely removed from the Netty Handler, and the Handler#channelRead method only needs to call the processMessage method, and it can dynamically register Processors into the ProcessorTable according to the message type. ProcessorTable, the scalability of the processing logic has been greatly improved.

The following is the invocation flow of Processor:

  1. Client

  1. RmBranchCommitProcessor: process the server-side global commit request;
  2. RmBranchRollbackProcessor: process server-side global rollback request;
  3. RmUndoLogProcessor: handles server-side undo log deletion requests;
  4. ClientOnResponseProcessor: client-side processing of server-side response requests, such as: BranchRegisterResponse, GlobalBeginResponse, GlobalCommitResponse and so on;
  5. ClientHeartbeatProcessor: processing server-side heartbeat response.
  1. Server-side

  1. RegRmProcessor: Handle RM client registration request. 2;
  2. RegTmProcessor: handle TM client registration request;
  3. ServerOnRequestProcessor: handle client related requests, such as: BranchRegisterRequest, GlobalBeginRequest, GlobalLockQueryRequest, etc. 4;
  4. ServerOnResponseProcessor: handle client-related responses, such as: BranchCommitResponse, BranchRollbackResponse and so on;
  5. ServerHeartbeatProcessor: handle client heartbeat response.

Below is an example of a TM initiating a global transaction commit request to give you a sense of where the Processor sits in the entire interaction:

Refactoring the request method

In older versions of Seata, the request methods for RPC also lacked elegance:

  1. request methods are too cluttered and not hierarchical;
  2. sendAsyncRequest method is coupled with too much code, the logic is too confusing, the client and server both share a set of request logic, the method to decide whether to send bulk is based on the parameter address is null or not to decide, to decide whether to synchronise the request is based on whether the timeout is greater than 0, it is extremely unreasonable, and it is not reasonable. The method to decide whether to send bulk is based on whether the address is null, and to decide whether to make a synchronous request is based on whether the timeout is greater than 0, which is extremely unreasonable;
  3. request method name style is not uniform, for example, the client sendMsgWithResponse, but the server is called sendSyncRequest;

To address the above shortcomings of the old RPC request methods, I have made the following changes. 1:

  1. put the request method into the RemotingClient and RemotingServer interfaces as the top-level interface. 2. separate the client-side and server-side request methods;
  2. Separate the client-side and server-side request logic, and separate the batch request logic into the client-side request method, so that the decision of whether or not to send a batch of requests is no longer based on whether or not the parameter address is null;
  3. due to Seata's Due to Seata's own logic characteristics, the parameters of client-server request methods cannot be unified, so we can extract common synchronous/asynchronous request methods, the client and server implement their own synchronous/asynchronous request logic according to their own request logic characteristics, and then finally call the common synchronous/asynchronous request methods, so that synchronous/asynchronous requests have a clear method, and are no longer decided according to whether or not the timeout is greater than 0. 4;
  4. Unify the request name style.

Finally, Seata RPC request methods look more elegant and hierarchical.

Synchronous requests:

Asynchronous request:

Other

  1. Class Catalogue Adjustment: There is also a netty catalogue in the RPC module catalogue, and it can be found from the catalogue structure that Seata's original intention is to be compatible with multiple RPC frameworks, and only netty is implemented at present, but it is found that some of the classes in the netty module are not "netty" and the classes in the RPC classes in the netty module are not "netty", and the RPC classes in the catalogue are not common, so the location of the relevant classes needs to be adjusted;
  2. some classes are renamed, e.g. netty related classes contain "netty";

The final RPC module looks like this:

Author Bio

Zhang Chenghui, currently working in Ant Group, loves to share technology, author of WeChat public number "Backend Advanced", technical blog (https://objcoding.com/) blogger, Seata Contributor, GitHub ID: objcoding.

· 18 min read

[Distributed Transaction Seata Source Code Interpretation I] Server-side startup process

Core points for implementing distributed transactions:

  1. transaction persistence, the various states of the transaction at the various state of the transaction participants need to be persistent, when the instance is down in order to roll back or commit the transaction based on the persistent data to achieve the ultimate consistency
  2. Timing on the timeout transaction processing (continue to try to commit or rollback), that is, through the retry mechanism to achieve the ultimate consistency of the transaction
  3. cross-service instance propagation of distributed transactions, when distributed transactions across multiple instances need to achieve transaction propagation, generally need to adapt to different rpc frameworks.
  4. transaction isolation level: most distributed transactions for performance, the default isolation level is read uncommitted
  5. idempotency: for XA or seata's AT such distributed transactions, have been implemented by default idempotency, and TCC, Saga interface level implementation of distributed transactions are still required to implement their own business developers to achieve idempotency.

This article introduces the source code of seata-server from the point of view of the startup process of seata-server, the startup flow chart is as follows:

! [Insert image description here](https://img-blog.csdnimg.cn/20200726213919467.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10, text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTE0NTg0OA==,size_16,colour_FFFFFF,t_70)

1. Startup class Server

The entry class for seata-server is in the Server class with the following source code:

public static void main(String[] args) throws IOException {
// Get the listening port from an environment variable or runtime parameter, default port 8091
int port = PortHelper.getPort(args);

// Set the listening port to SystemProperty, Logback's LoggerContextListener implementation class.
// SystemPropertyLoggerContextListener writes the Port to Logback's Context.
// The Port variable will be used in the logback.xml file to construct the log file name.
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));; // Create LoggerContextListener.

// Create the Logger
final Logger logger = LoggerFactory.getLogger(Server.class);
if (ContainerHelper.isRunningInContainer()) {
logger.info("The server is running in container."); }
}

// Parsing various configuration parameters for startup and configuration files
ParameterParser parameterParser = new ParameterParser(args); // metrics related, here is the metrics.

// metrics related, here is the SPI mechanism to get the Registry instance object
MetricsManager.get().init(); // read the metrics from the configuration file.

// Write the storeMode read from the config file into SystemProperty for use by other classes.
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

// Create an instance of NettyRemotingServer, an rpc framework based on the Netty implementation.
// Not initialised at this point, NettyRemotingServer is responsible for network communication with the TM and RM in the client SDK.
nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);

// Set the listening port
nettyRemotingServer.setListenPort(parameterParser.getPort()); // Set the port to listen to.

// Initialise UUIDGenerator, which is implemented based on the snowflake algorithm.
// Used to generate ids for global transactions, branch transactions.
// Multiple Server instances are configured with different ServerNodes to ensure uniqueness of the ids
UUIDGenerator.init(parameterParser.getServerNode());; // The UUIDGenerator.init(parameterParser.getServerNode()).

// SessionHodler is responsible for persistent storage of transaction logs (state).
// Currently supports three storage modes: file, db, and redis; for cluster deployment mode, use db or redis mode
SessionHolder.init(parameterParser.getStoreMode()); // Create the initialisation DefaultCoher.

// Create an instance of DefaultCoordinator, the core transaction logic processing class of TC.
DefaultCoordinator is the core transaction logic handling class of TC, // containing logic handling for different transaction types such as AT, TCC, SAGA, etc. at the bottom.
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator); // register ShutdownHook.
// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator); // register ShutdownHook.
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);; // 127.0.0.1

// 127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort()); }

try {
// Initialise Netty, start listening on the port and block here, waiting for the application to shut down.
nettyRemotingServer.init(); } catch (Throwable e); }
} catch (Throwable e) {
logger.error("nettyServer init error:{}", e.getMessage(), e);
System.exit(-1); }
}

System.exit(0);
}

2. Parsing Configuration

The implementation code for parameter parsing is in the ParameterParser class, the init method source code is as follows:

private void init(String[] args) {
try {
// Determine if you are running in a container, and if you are, get the configuration from the environment variable.
if (ContainerHelper.isRunningInContainer()) {
this.seataEnv = ContainerHelper.getEnv();
this.host = ContainerHelper.getHost();
this.port = ContainerHelper.getPort();
this.serverNode = ContainerHelper.getServerNode(); this.storeMode = ContainerHelper.getServerNode()
this.storeMode = ContainerHelper.getStoreMode();
} else {
// Based on JCommander's ability to get the parameters configured when starting the application.
// JCommander assigns the parameters to the fields of the current class via annotations and reflection.
JCommander jCommander = JCommander.newBuilder().addObject(this).build();
JCommander.parse(args);
if (help) {
jCommander.setProgramName(PROGRAM_NAME);
jCommander.usage();
System.exit(0);
}
}
// serverNode is used as a unique identifier for instances in snowflake maths and needs to be guaranteed unique.
// If you don't specify a randomly generated one based on the current server's I
if (this.serverNode == null) {
this.serverNode = IdWorker.initWorkerId();
}
if (StringUtils.isNotBlank(seataEnv)) {
System.setProperty(ENV_PROPERTY_KEY, seataEnv);
}
if (StringUtils.isBlank(storeMode)) {
// There is an important Configuration class involved here, ParameterParser is only responsible for getting the core parameters such as ip, port and storeMode.
// All other parameters are taken from the Configuration. Here, if there is no startup parameter that doesn't specify a storeMode, // it's taken from Configuration.
// is taken from the Configuration class.
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE, // SERVER_DEFAULT, SERVER_DEFAULT, SERVER_DEFAULT))
SERVER_DEFAULT_STORE_MODE);
}
} catch (ParameterException e) {
printError(e);
}

}

The first call to ConfigurationFactory.getInstance() in the ParameterParser's init method initialises a singleton Configuration object, which is responsible for initialising all other configuration parameter information. From the Seata Server side of the source code we can see two configuration files file.conf, registry.conf. So what is the difference between these two configuration files, both files are required? We continue to look at the code.

ConfigurationFactory.getInstance method is actually to get a singleton object, the core is in the buildConfiguration method, but before the buidlConfiguration method, there is a static code block of the ConfigurationFactory class will be executed first.

// Get the singleton object for Configuration.
public static Configuration getInstance() {
if (instance == null) {
synchronized (Configuration.class) {
if (instance == null) {
instance = buildConfiguration();
}
}
}
return instance;
}

// ConfigurationFactory static code block
static
// Get the name of the configuration file, defaults to registry.conf.
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
If (seataConfigName == null) {
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}
if (seataConfigName == null) {
seataConfigName = REGISTRY_CONF_PREFIX;
}
String envValue = System.getProperty(ENV_PROPERTY_KEY);
If (envValue == null) {
envValue = System.getenv(ENV_SYSTEM_KEY);
}

// Read the configuration from the registry.Conf file to build the base configuration object
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName + REGISTRY_CONF_SUFFIX,
false) : new FileConfiguration(seataConfigName + "-" + envValue + REGISTRY_CONF_SUFFIX, false);
Configuration extConfiguration = null;
try {
// ExtConfigurationProvider currently has only one SpringBootConfigurationProvider implementation class
// Used to support the client-side SDK SpringBoot's configuration file approach, this logic can be ignored for the Server side.
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null ?
: extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
CURRENT_FILE_INSTANCE = extConfiguration == null ? Configuration : extConfiguration;
}

The static block in ConfigurationFactory reads configuration information from registry.conf. The conf configuration file is mandatory, the registry.conf configuration file specifies the source for other detailed configurations, the current configuration source supports file, zk, apollo, nacos, etcd3, etc. So file.conf is not required, only when the configuration source is set to the file type will read the contents of the file.conf file.

Next buildConfiguration in ConfigurationFactory is to load more configuration items based on the configuration source set in registry.conf.

private static Configuration buildConfiguration() {
ConfigType configType;
String configTypeName;
try {
// Read the config.type field from the registry.conf configuration file and parse it into the ConfigType enumeration.
configTypeName = CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);

if (StringUtils.isBlank(configTypeName)) {
throw new NotSupportYetException("Configuration type cannot be blank");
}

configType = ConfigType.getType(configTypeName);
} catch (Exception e) {
throw e;
}
Configuration extConfiguration = null;
Configuration Configuration;
If (ConfigType.File == configType) {
// If the configuration file is of type file, read the config.file.name configuration entry from registry.conf, // i.e. the path to the config file of type file, example config.file.name configuration entry.
// i.e. the path to the file type configuration file, default is file.conf in the example.
String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR.File),
ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY);
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);

// Build the FileConfiguration object based on the path to the file configuration file
Configuration file = new FileConfiguration(name);
try {
// Additional extensions to the configuration, also available only to the client SpringBoot SDK.
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null
? configuration.getClass().getSimpleName() : extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
} else {
// If the configuration file is of a type other than file, e.g. nacos, zk, etc., // then generate it via SPI.
// then generate the corresponding ConfigurationProvider object by way of SPI
ConfigurationProvider = EnhancedServiceLoader
.load(ConfigurationProvider.class, Objects.requireNonNull(configurationType).name()).provide();
}
try {
// ConfigurationCache is a one-time proxy memory cache of the configuration to improve the performance of fetching the configuration.
ConfigurationCache;
if (null ! = extConfiguration) {
configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
} else {
configurationCache = ConfigurationCache.getInstance().proxy(configuration);
}
If (null ! = configurationCache) {
extConfiguration = configurationCache;
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load configurationCacheProvider:{}", e.getMessage(), e);
}
return null == extConfiguration ? configuration : extConfiguration;
}

3. Initialisation of UUIDGenerator

The UUIDGenertor initialisation receives a serverNode parameter, the UUIDGenertor currently uses the snowflake algorithm to generate the unique Id, this serverNode is used to ensure that the unique ids generated by multiple seata-server instances are not duplicated.

public class UUIDGenerator {

/**
* Generate uuid long.
*
* @return the long
*/
public static long generateUUID() {
return IdWorker.getInstance().nextId();
}

/**
* Init.
* * @param serverNode the server node id.
* @param serverNode the server node id
*/
public static void init(Long serverNode) {
IdWorker.init(serverNode); }
}
}

UUIDGenerator is a wrapper around IdWorker, the core implementation logic for the unique id is in the IdWoker class, and IdWorker is a snowflake algorithm implementation. The IdWorker in this case is again a single instance

public class IdWorker
/**
* Constructor
* @param workerId is the ServerNode mentioned above, in the range of
* @param workerId is the ServerNode mentioned above, with a value in the range of 0-1023, i.e., 10 digits in the 64-bit UUID.
*/
public IdWorker(long workerId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(
String.format("Worker Id can't be greater than %d or less than 0", maxWorkerId));
}
this.workerId = workerId;
}

/**
* Get the next ID (the method is thread-safe)
} /** * Get the next ID (the method is thread-safe).
* @return SnowflakeId
*/
public long nextId() {
public long nextId() { long timestamp = timeGen(); if (timestamp < lastTimestamp) {

if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format(
"clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); }
}

synchronized (this) {
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask; if (sequence == 0) { sequence == 0)
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L; }
}
lastTimestamp = timestamp; } else { sequence = 0L; }
}
// Snowflake algorithm 64-bit unique id composition: first 0 + 41-bit timestamp + 10-bit workerId + 12-bit incremental serialisation (self-incrementing within the same timestamp)
return ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
}

4. SessionHolder initialisation

SessionHolder is responsible for session persistence, a session object corresponds to a transaction, there are two kinds of transaction: GlobalSession and BranchSession. SessionHolder supports two types of persistence: file and db, of which db supports cluster mode and is recommended to use db. The four most important fields in SessionHolder are as follows:

// ROOT_SESSION_MANAGER is used to get all the Setssion, as well as Session creation, update, deletion, and so on.
private static SessionManager ROOT_SESSION_MANAGER;
// Used to get and update all asynchronous commits.
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER; // Used to get and update all sessions that need to be commited asynchronously.
// Get and update all sessions that need to retry commits.
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER; // Used to fetch and update all sessions that need to retry commits.
// Used to retrieve and update all sessions that need to retry a rollback.
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER; // for getting and updating all sessions that need to retry rollbacks.

SessionHolder init method

private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER
public static void init(String mode) throws IOException {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
}
StoreMode storeMode = StoreMode.get(mode);
if (StoreMode.DB.equals(storeMode)) {
// The SPI method of loading the SessionManager is used here again.
// In fact, the four SessionManager instances obtained below are all different instances of the same class, DataBaseSessionManager.
The four instances of SessionManager are all different instances of the same class DataBaseSessionManager, // just passing different parameters to the DataBaseSessionManager constructor.
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); } else if (StoreMode.DB.getName()); }
} else if (StoreMode.FILE.equals(storeMode)) {
//file mode can be left alone for now
...
} else {
throw new IllegalArgumentException("unknown store mode:" + mode);
}
// The reload method can be ignored for db mode
reload(); }
}

The four SessionManagers in the SessionHolder are all instances of the class DataBaseSessionManager, but they pass different parameters to the constructor, so take a look at the definition of DataBaseSessionManager:

public DataBaseSessionManager(String name) {
super();
this.taskName = name.
}

// Determine the list of transactions returned by allSessions based on the instance's taskName, // if taskName equals ASYNC_COMMITMENT.
// If taskName is equal to ASYNC_COMMITTING_SESSION_MANAGER_NAME, then all transactions with status Async_COMMITTING_SESSION_MANAGER_NAME are returned.
// All transactions with a status of AsyncCommitting are returned.
public Collection<GlobalSession> allSessions() {
// get by taskName
if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
} else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying}));
} else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
} else {
// A taskName of null corresponds to ROOT_SESSION_MANAGER.
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {
GlobalStatus.UnKnown, GlobalStatus.Begin,
GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
GlobalStatus.RollbackRetrying,
GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying,
GlobalStatus.AsyncCommitting}));
}
}

5. Initialise DefaultCoordinator

The DefaultCoordinator is the core of the transaction coordinator, e.g., opening, committing, and rolling back global transactions, registering, committing, and rolling back branch transactions are all coordinated by the DefaultCoordinator.The DefaultCoordinato communicate with remote TMs and RMs through the RpcServer to achieve branch transactions such as commit and rollback. DefaultCoordinato communicates with remote TMs and RMs through the RpcServer to achieve branch transactions.

public DefaultCoordinator(ServerMessageSender messageSender) {
// The implementation class for the messageSender interface is the RpcServer mentioned above.
this.messageSender = messageSender; // The interface messageSender is implemented in the RpcServer class mentioned above.

// DefaultCore encapsulates the implementation classes for AT, TCC, Saga, and other distributed transaction patterns.
this.core = new DefaultCore(messageSender); }
}

// The init method initialises five timers, which are mainly used for the retry mechanism of distributed transactions.
// Because the instability of distributed environments can cause transactions to be in an intermediate state.
// because the instability of a distributed environment can cause transactions to be in an intermediate state, // so the ultimate consistency of a transaction is achieved through a constant retry mechanism.
// The following timers, except for undoLogDelete, are executed once every 1 second by default.
public void init() {
// Handling transactions that are in a rollback state that can be retried
retryRollbacking.scheduleAtFixedRate(() -> {
handleRetryRollbacking.scheduleAtFixedRate(() -> {
handleRetryRollbacking(); }
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// Handle state-retryable transactions that can retry committing in the second stage
retryCommitting.scheduleAtFixedRate(() -> {
handleRetryCommitting.scheduleAtFixedRate(() -> {
handleRetryCommitting(); }
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// Handle asynchronous committing transactions
asyncCommitting.scheduleAtFixedRate(() -> {
try {
handleAsyncCommitting(); } catch (Exception e) { asyncCommitting.
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

// Checking for a transaction whose first phase has timed out, setting the transaction state to TimeoutRollbacking.
// the transaction will be rolled back by another timed task
timeoutCheck.scheduleAtFixedRate(() -> {
timeoutCheck.scheduleAtFixedRate(() -> {
timeoutCheck(); } catch (Exception e) { timeoutCheck.scheduleAtFixedRate()
} catch (Exception e) {
LOGGER.info("Exception timeout checking ... ", e);
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS); }

// Call RM to delete the unlog based on the number of days the unlog has been saved
undoLogDelete.scheduleAtFixedRate(() -> {
try {
undoLogDelete(); }
} catch (Exception e) {
LOGGER.info("Exception undoLog deleting ... ", e);
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS); }
}

6. Initialising NettyRemotingServer

NettyRemotingServer is a simplified version of Rpc server based on Netty implementation, NettyRemotingServer initialisation does two main things:

  1. registerProcessor: registers the Processor that communicates with the Client.
  2. super.init(): the super.init() method is responsible for initialising Netty and registering the IP port of the current instance with the registry
public void init() {
// registry processor
registerProcessor();
if (initialised.compareAndSet(false, true)) {
super.init(); }
}
}

private void registerProcessor() {
// 1. Register the core ServerOnRequestProcessor, i.e. the Processor associated with the transaction.
// e.g. global transaction start, commit, branch transaction registration, feedback on current state, etc.
// ServerOnRequestProcessor's constructor passes in the example returned by getHandler(), which handler
// is the aforementioned DefaultCoordinator, which is the core processing class for distributed transactions.
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. Register the ResponseProcessor, which is used to process the message that the Client replies to when the Server initiates a request.
// Client replies with a response, e.g., if Server sends a request to Client to commit or roll back a branch transaction, // Client returns a commit/rollback message.
// The Client returns the commit/rollback result.
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures()); super.registerProcessor(Message); }
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor); // 3.

// 3. The Processor on the Client side that initiates the RM registration request.
RegRmProcessor regRmProcessor = new RegRmProcessor(this); super.registerProcessor(MessageProcessor, messageExecutor); }
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); // 4.

// 4. The Processor that will be used when the Client initiates the TM registration request.
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); // 5.

// 5. The Processor that the Client sends a heartbeat request to.
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this); super.registerProcessor(MessageType.TYPE_REG_CLT, null); }
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

In NettyRemotingServer there is a call to the init method of the base class AbstractNettyRemotingServer with the following code:

public void init() {
// The super.init() method starts a timed task that cleans up timed-out Rpc requests once every 3S. super.init(); // Configure the Netty Server side to start executing a timed task that cleans up timed-out Rpc requests once every 3S.
super.init(); // Configure the Netty Server side to start a timed task that cleans up timed out Rpc requests, 3S once.
// Configure the Netty Server side to start listening on a port.
serverBootstrap.start(); // Configure the Netty server side to start listening on the port.
}

// serverBootstrap.start(); // Configure the server side to start listening on the port.
public void start() {
// General configuration of the Netty server side, where two ChannelHandlers are added: // ProtocolV1Decoder, ProtocolV1Decoder, ProtocolV1Decoder.
// ProtocolV1Decoder, ProtocolV1Encoder, // corresponding to Seata custom RFIDs, respectively.
// Decoder and Encoder, // corresponding to Seata's custom RPC protocols, respectively.
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(listenPort))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder()); if (channelHandlers !) .
if (channelHandlers ! = null) {
addChannelPipelineLast(ch, channelHandlers); } if (channelHandlers ! = null)
}

}
});

try {
// Start listening on the configured port
ChannelFuture future = this.serverBootstrap.bind(listenPort).sync(); {}" Start listening on the configured port.
LOGGER.info("Server started, listen port: {}", listenPort); // Connect the current server to a new port after Netty starts successfully.
// After a successful Netty startup register the current instance with the Registry.Conf configuration file's registry
RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()); // After a successful startup register the current instance with the Registry. Conf configuration file.
Initialisation.set(true); future.channel.config()
future.channel().closeFuture().sync(); } catch (Exception exx).
} catch (Exception exx) {
Throw a new runtime exception (exx); }
}
}

· 15 min read

Author profile: Xuan Yi, GitHub ID: sharajava, responsible for the GTS development team of Alibaba Middleware, initiator of the SEATA open source project, worked for many years at Oracle Beijing R&D Center, and was engaged in WebLogic core development. He has long been focused on middleware, especially technical practices in the field of distributed transactions.

The 1.2.0 version of Seata has released a new transaction mode: XA mode, which supports the XA protocol.

Here, we will interpret this new feature in depth from three aspects:

  • What: What is XA mode?
  • Why: Why support XA?
  • How: How is XA mode implemented and how to use it?

1. What is XA mode?

There are two basic preliminary concepts here:

  1. What is XA?
  2. What is the so-called transaction mode defined by Seata?

Based on these two points, understanding XA mode becomes quite natural.

1.1 What is XA?

The XA specification is a standard for distributed transaction processing (DTP) defined by the X/Open organization.

The XA specification describes the interface between the global transaction manager(TM) and the local resource manager(RM). The purpose of the XA specification is to allow multiple resources (such as databases, application servers, message queues, etc.) to access the same transaction, thus maintaining ACID properties across applications.

The XA specification uses the Two-Phase Commit (2PC) to ensure that all resources are committed or rolled back at the same time for any specific transaction.

The XA specification was proposed in the early 1990s. Currently, almost all mainstream databases support the XA specification.

1.2 What is Seata's transaction mode?

Seata defines the framework for global transactions.

A global transaction is defined as the overall coordination of several branch transactions:

  1. The Transaction Manager (TM) requests the Transaction Coordinator (TC) to initiate (Begin), commit (Commit), or rollback (Rollback) the global transaction.
  2. The TM binds the XID representing the global transaction to the branch transaction.
  3. The Resource Manager (RM) registers with the TC, associating the branch transaction with the global transaction represented by XID.
  4. The RM reports the execution result of the branch transaction to the TC. (optional)
  5. The TC sends a branch commit or branch rollback command to the RM.
seata-mod

Seata's global transaction processing process is divided into two phases:

  • Execution phase: Execute branch transactions and ensure that the execution results are rollbackable and durable.
  • Completion phase: Based on the resolution of the execution phase, the application sends a request for global commit or rollback to the TC through the TM, and the TC commands the RM to drive the branch transaction to commit or rollback.

Seata's so-called transaction mode refers to the behavior mode of branch transactions running under the Seata global transaction framework. More precisely, it should be called the branch transaction mode.

The difference between different transaction modes lies in the different ways branch transactions achieve the goals of the two phases of the global transaction. That is, answering the following two questions:

  • Execution phase: How to execute and ensure that the execution results are rollbackable and durable.
  • Completion phase: After receiving the command from the TC, how to submit or rollback the branch transaction?

Taking our Seata AT mode and TCC mode as examples:

AT mode

at-mod
  • Execution phase:

    • Rollbackable: Record rollback logs according to the SQL parsing result
    • Durable: Rollback logs and business SQL are committed to the database in the same local transaction
  • Completion phase:

    • Branch commit: Asynchronously delete rollback log records
    • Branch rollback: Compensate and update according to the rollback log

TCC mode

tcc-mod
  • Execution Phase:

    • Call the Try method defined by the business (guaranteed rollback and persistence entirely by the business layer)
  • Completion Phase:

    • Branch Commit: Call the Confirm method defined for each transaction branch
    • Branch Rollback: Call the Cancel method defined for each transaction branch

1.3 What is XA mode in Seata?

XA mode:

Within the distributed transaction framework defined by Seata, it is a transaction mode that uses XA protocol mechanisms to manage branch transactions with the support of transaction resources (databases, message services, etc.) for the XA protocol.

xa-mod
  • Execution Phase:

    • Rollback: Business SQL operations are performed in an XA branch, and the support of resources for the XA protocol ensures rollback
    • Persistence: After the XA branch is completed, XA prepare is executed, and similarly, the support of resources for the XA protocol ensures persistence (i.e., any unexpected occurrences will not cause situations where rollback is not possible)
  • Completion Phase:

    • Branch Commit: Perform commit for XA branch
    • Branch Rollback: Perform rollback for XA branch

2. Why support XA?

Why add XA mode in Seata? What is the significance of supporting XA?

2.1 Problems with Compensatory Transaction Mode

Essentially, the 3 major transaction modes that Seata already supports: AT, TCC, and Saga, are all compensatory in nature.

Compensatory transaction processing mechanisms are built on top of transaction resources (either in the middleware layer or in the application layer), and the transaction resources themselves are unaware of distributed transactions.

img

The fundamental problem with transaction resources being unaware of distributed transactions is the inability to achieve true global consistency.

For example, in a compensatory transaction processing process, a stock record is reduced from 100 to 50. At this point, the warehouse administrator connects to the database and sees the current quantity as 50. Later, the transaction is rolled back due to an unexpected occurrence, and the stock is compensated back to 100. Clearly, the warehouse administrator's query finding 50 is dirty data.

It can be seen that because compensatory distributed transaction mechanisms do not require the mechanism of transaction resources (such as a database), they cannot guarantee data consistency from a global perspective outside the transaction framework.

2.2 Value of XA

Unlike compensatory transaction modes, the XA protocol requires transaction resources to provide support for standards and protocols.

nct

Because transaction resources are aware of and participate in the distributed transaction processing process, they (such as databases) can guarantee effective isolation of data from any perspective and satisfy global data consistency.

For example, in the scenario of stock updates mentioned in the previous section, during the XA transaction processing process, the intermediate state of the database holding 50 is guaranteed by the database itself and will not be seen in the warehouse administrator's query statistics. (Of course, the isolation level needs to be READ_COMMITTED or higher.)

In addition to the fundamental value of global consistency, supporting XA also has the following benefits:

  1. Non-invasive business: Like AT, XA mode will be non-invasive for businesses, without bringing additional burden to application design and development.
  2. Wide support for databases: XA protocol is widely supported by mainstream relational databases and can be used without additional adaptation.
  3. Easy multi-language support: Because it does not involve SQL parsing, the XA mode has lower requirements for Seata's RM, making it easier for different language development SDKs compared to the AT mode.
  4. Migration of traditional XA-based applications: Traditional applications based on the XA protocol can be smoothly migrated to the Seata platform using the XA mode.

2.3 Widely Questioned Issues of XA

There is no distributed transaction mechanism that can perfectly adapt to all scenarios and meet all requirements.

The XA specification was proposed as early as the early 1990s to solve the problems in the field of distributed transaction processing.

Now, whether it's the AT mode, TCC mode, or the Saga mode, the essence of these modes' proposals stems from the inability of the XA specification to meet certain scenario requirements.

The distributed transaction processing mechanism defined by the XA specification has some widely questioned issues. What is our thinking regarding these issues?

  1. Data Locking: Data is locked throughout the entire transaction processing until it is finished, and reads and writes are constrained according to the definition of isolation levels.

Thinking:

Data locking is the cost to obtain higher isolation and global consistency.

In compensatory transaction processing mechanisms, the completion of branch (local) transactions is done during the execution stage, and data is not locked at the resource level. However, this is done at the cost of sacrificing isolation.

Additionally, the AT mode uses global locks to ensure basic write isolation, effectively locking data, but the lock is managed centrally on the TC side, with high unlock efficiency and no blocking issues.

  1. Protocol Blocking: After XA prepare, the branch transaction enters a blocking stage and must wait for XA commit or XA rollback.

Thinking:

The blocking mechanism of the protocol itself is not the problem. The key issue is the combination of protocol blocking and data locking.

If a resource participating in the global transaction is "offline" (does not receive commands to end branch transactions), the data it locks will remain locked. This may even lead to deadlocks.

This is the core pain point of the XA protocol and is the key problem that Seata aims to solve by introducing the XA mode.

The basic idea is twofold: avoiding "loss of connection" and adding a "self-release" mechanism. (This involves a lot of technical details, which will not be discussed at the moment. They will be specifically discussed in the subsequent evolution of the XA mode.)

  1. Poor Performance: Performance loss mainly comes from two aspects: on one hand, the transaction coordination process increases the RT of individual transactions; on the other hand, concurrent transaction data lock conflicts reduce throughput.

Thinking:

Compared to running scenarios without distributed transaction support, performance will certainly decline, there is no doubt about that.

Essentially, the transaction mechanism (whether local or distributed) sacrifices some performance to achieve a simple programming model.

Compared to the AT mode, which is also non-invasive for businesses:

Firstly, because XA mode also runs under Seata's defined distributed transaction framework, it does not generate additional transaction coordination communication overhead.

Secondly, in concurrent transactions, if data has hotspots and lock conflicts occur, this situation also exists in the AT mode (which defaults to using a global lock).

Therefore, in the two main aspects affecting performance, the XA mode does not have a significantly obvious disadvantage compared to the AT mode.

The performance advantage of the AT mode mainly lies in: centralized management of global data locks, where the release of locks does not require RM involvement and is very fast; in addition, the asynchronous completion of the global commit stage.

3. How Does XA Mode Work and How to Use It?

3.1 Design of XA Mode

3.1.1 Design Objectives

The basic design objectives of XA mode mainly focus on two main aspects:

  1. From the perspective of scenarios, it meets the requirement of global consistency.
  2. From the perspective of applications, it maintains the non-invasive nature consistent with the AT mode.
  3. From the perspective of mechanisms, it adapts to the characteristics of distributed microservice architecture.

Overall idea:

  1. Same as the AT mode: Construct branch transactions from local transactions in the application program.
  2. Through data source proxy, wrap the interaction mechanism of the XA protocol at the framework level outside the scope of local transactions in the application program, making the XA programming model transparent.
  3. Split the 2PC of XA and perform XA prepare at the end of the execution stage of branch transactions, seamlessly integrating the XA protocol into Seata's transaction framework, reducing one round of RPC interaction.

3.1.2 Core Design

1. Overall Operating Mechanism

XA mode runs within the transaction framework defined by Seata:

xa-fw
  • Execution phase (Execute):

    • XA start/XA end/XA prepare + SQL + Branch registration
  • Completion phase (Finish):

    • XA commit/XA rollback

2. Data Source Proxy

XA mode requires XAConnection.

There are two ways to obtain XAConnection:

  • Method 1: Requires developers to configure XADataSource
  • Method 2: Creation based on the developer's normal DataSource

The first method adds cognitive burden to developers, as they need to learn and use XA data sources specifically for XA mode, which contradicts the design goal of transparent XA programming model.

The second method is more user-friendly, similar to the AT mode, where developers do not need to worry about any XA-related issues and can maintain a local programming model.

We prioritize the implementation of the second method: the data source proxy creates the corresponding XAConnection based on the normal JDBC connection obtained from the normal data source.

Comparison with the data source proxy mechanism of the AT mode:

img

However, the second method has limitations: it cannot guarantee compatibility correctness.

In fact, this method is what database drivers should do. Different vendors and different versions of database driver implementation mechanisms are vendor-specific, and we can only guarantee correctness on fully tested driver versions, as differences in the driver versions used by developers can lead to the failure of the mechanism.

This is particularly evident in Oracle. See Druid issue: https://github.com/alibaba/druid/issues/3707

Taking everything into account, the data source proxy design for XA mode needs to support the first method: proxy based on XA data source.

Comparison with the data source proxy mechanism of the AT mode:

img

3. Branch Registration

XA start requires the Xid parameter.

This Xid needs to be associated with the XID and BranchId of the Seata global transaction, so that the TC can drive the XA branch to commit or rollback.

Currently, the BranchId in Seata is generated uniformly by the TC during the branch registration process, so the timing of the XA mode branch registration needs to be before XA start.

A possible optimization in the future:

Delay branch registration as much as possible. Similar to the AT mode, register the branch before the local transaction commit to avoid meaningless branch registration in case of branch execution failure.

This optimization direction requires a change in the BranchId generation mechanism to cooperate. BranchId will not be generated through the branch registration process, but will be generated and then used to register the branch.

4. Summary

Here, only a few important core designs of the XA mode are explained to illustrate its basic operation mechanism.

In addition, important aspects such as connection maintenance and exception handling are also important and can be further understood from the project code.

More information and exchange will be written and shared with everyone in the future.

3.1.3 Evolution Plan

The overall evolution plan of the XA mode is as follows:

  1. Step 1 (already completed): The first version (1.2.0) runs the prototype mechanism of the XA mode. Ensure only addition, no modification, and no new issues introduced to other modes.
  2. Step 2 (planned to be completed in May): Necessary integration and refactoring with the AT mode.
  3. Step 3 (planned to be completed in July): Refine the exception handling mechanism and polish for production readiness.
  4. Step 4 (planned to be completed in August): Performance optimization.
  5. Step 5 (planned to be completed in 2020): Integrate with Seata project's ongoing design for cloud-native Transaction Mesh to create cloud-native capabilities.

3.2 Usage of XA Mode

From a programming model perspective, XA mode is exactly the same as the AT mode.

You can refer to the Seata official website sample: seata-xa

The example scenario is the classic Seata example, involving the product ordering business of three microservices: inventory, orders, and accounts.

In the example, the upper programming model is the same as the AT mode. By simply modifying the data source proxy, you can switch between XA mode and AT mode.

@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
// DataSourceProxy for AT mode
// return new DataSourceProxy(druidDataSource);

// DataSourceProxyXA for XA mode
return new DataSourceProxyXA(druidDataSource);
}

4. Summary

At the current stage of technological development, there is no distributed transaction processing mechanism that can perfectly meet all scenarios' requirements.

Consistency, reliability, ease of use, performance, and many other aspects of system design constraints require different transaction processing mechanisms to meet them.

The core value of the Seata project is to build a standardized platform that comprehensively addresses the distributed transaction problem.

Based on Seata, the upper application architecture can flexibly choose the appropriate distributed transaction solution according to the actual scenario's needs.

img

The addition of XA mode fills the gap in Seata in the global consistency scenario, forming a landscape of four major transaction modes: AT, TCC, Saga, and XA, which can basically meet all scenarios' demands for distributed transaction processing.

Of course, both XA mode and the Seata project itself are not yet perfect, and there are many areas that need improvement and enhancement. We warmly welcome everyone to participate in the project's development and contribute to building a standardized distributed transaction platform together.

· 11 min read

Seata is an open source Ali open source **distributed transaction **solution , is committed to providing high-performance and easy-to-use distributed transaction services .

1.1 Four transaction patterns

Seata aims to create a one-stop solution for distributed transactions, and will eventually provide four transaction modes:

  • AT mode: See the "Seata AT mode" document.
  • TCC mode: see the Seata TCC mode document (/docs/dev/mode/tcc-mode/).
  • Saga mode: see the document "SEATA Saga mode".
  • XA mode: under development...

Currently used popularity situation is: AT > TCC > Saga. therefore, when we learn Seata, we can spend more energy on AT mode, it is best to understand the principle behind the implementation, after all, distributed transaction involves the correctness of the data, the problem needs to be quickly troubleshooting to locate and solve.

Friendly note: specific popularity, friends can choose to look at Wanted: who's using Seata each company registered use.

1.2 Three roles

There are three roles in the architecture of Seata:

! Three Roles

  • TC (Transaction Coordinator) - Transaction Coordinator: maintains the state of global and branch transactions, drives global transactions commit or rollback.
  • TM (Transaction Manager) - Transaction Manager: defines the scope of a global transaction, starts the global transaction, commits or rolls back the global transaction.
  • RM (Resource Manager) - Resource Manager: manages the resources processed by the Branch Transaction, talks to the TC to register the branch transaction and report on the status of the branch transaction, and drives the Branch Transaction to commit or rollback.

The TC is a separately deployed Server server and the TM and RM are Client clients embedded in the application.

In Seata, the Lifecycle of a distributed transaction is as follows:

! Architecture diagram

Friendly reminder: look at the red ticks added by the carrots.

  • The TM requests the TC to open a global transaction. the TC generates a XID as the number of this global transaction.

XID, which is propagated through the microservice's invocation chain, is guaranteed to associate multiple microservice sub-transactions together.

  • RM requests the TC to register the local transaction as a branch transaction of the global transaction to be associated via the XID of the global transaction.
  • The TM requests the TC to tell the XID whether the corresponding global transaction is to be committed or rolled back.
  • TC drives RMs to commit or rollback their own local transactions corresponding to XID.

1.3 Framework Support

Seata currently provides support for the major microservices frameworks:

  • Dubbo

Integration via seata-dubbo

  • SOFA-RPC

integrated via seata-sofa-rpc

  • Motan

Integrated via seata-motan

  • gRPC

integrated via seata-grpc

  • Apache HttpClient

integrated via seata-http

Seata also provides a Starter library for easy integration into Java projects:

Because Seata is based on the DataSource data source for proxy to extend, it naturally provides very good support for mainstream ORM frameworks:

  • MyBatis, MyBatis-Plus
  • JPA, Hibernate

1.4 Case Scenarios

From the registration of Wanted: who's using Seata, Seata has started to land in many teams in China, including many large companies such as DDT and Rhyme. This can be summarised in the figure below:

! summary chart

In addition, in the awesome-seata warehouse, carrots carrots see the drop and so on the company's landing when the technology to share, or very real and reliable. As shown in the picture below:! awesome-seata 滴滴

In terms of the case, Seata is probably the most reliable distributed transaction solution known to date, or at least it is a very good choice to invest in it technically.

2. Deploying a Standalone TC Server

In this subsection, we will learn to deploy a standalone Seata TC Server, which is commonly used for learning or testing purposes, and is not recommended to be deployed in a production environment.

Because TC needs to record global and branch transactions, it needs corresponding storage. Currently, TC has two storage modes ( store.mode):

  • file mode: suitable for standalone mode, global transaction session information is read/written in memory and persisted to local file root.data, with high performance.
  • db mode: suitable for cluster mode, global transaction session information is shared via db, relatively low performance.

Obviously, we will adopt the file mode, and finally we deploy the standalone TC Server as shown below: ! Standalone TC Server

After so much beeping, we start to formally deploy the standalone TC Server, here carrots carrots use macOS system, and Linux, Windows is similar to the friend of the brain to translate.

2.1 Download Seata Package

Open the Seata download page, and select the version of Seata you want. Here, we choose v1.1.0, the latest version.

# Create the directory
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# Download
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# Extract
$ tar -zxvf seata-server-1.1.0.tar.gz

# View directory
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # Executing scripts
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # configuration file
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + dependency library

2.2 Starting TC Server

Execute the nohup sh bin/seata-server.sh & command to start TC Server in the background. In the nohup.out file, we see the following log, which indicates that the startup was successful:

# Using File Storage
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[FILE] extension by class[io.seata.server.store.file.FileTransactionStoreManager]
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[FILE] extension by class [io.seata.server.session.file.FileBasedSessionManager]
# Started successfully
2020-04-02 08:36:01.597 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
  • In the default configuration, Seata TC Server starts on the 8091 endpoint.

Since we are using file mode, we can see the local file root.data for persistence. The command to do this is as follows:

$ ls -ls sessionStore/
total 0
0 -rw-r--r-- 1 yunai staff 0 Apr 2 08:36 root.data

As a follow-up, you can read the "4. Getting Started with Java Applications" subsection to get started with distributed transactions using Seata.

3. Deploying a Clustered TC Server

In this subsection, we will learn to deploy Cluster Seata TC Server to achieve high availability, a must for production environments. In clustering, multiple Seata TC Servers share global transaction session information through the db database.

At the same time, each Seata TC Server can register itself to the registry so that applications can get them from the registry. Eventually we deploy the Clustered TC Server as shown below: ! Cluster TC Server

Seata TC Server provides integration with all major registries, as shown in the discovery directory. Considering the increasing popularity of using Nacos as a registry in China, we will use it here.

Friendly note: If you don't know anything about Nacos, you can refer to the "Nacos Installation and Deployment" article.

After beeping so much, we start to deploy standalone TC Server formally, here carrots carrots use macOS system, and Linux, Windows is similar to the friend of the brain to translate.

3.1 Downloading the Seata package

Open the Seata download page (https://github.com/apache/incubator-seata/releases), and select the version of Seata you want. Here, we choose v1.1.0, the latest version.

# Create the directory
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# Download
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# Extract
$ tar -zxvf seata-server-1.1.0.tar.gz

# View directory
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # Executing scripts
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # configuration file
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + dependency library

3.2 Initialising the database

① Use the mysql.sql script to initialise the db database of Seata TC Server. The contents of the script are as follows:

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT, `status` TINYL
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32), `transaction_service
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000), `gmt_create
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8.

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL, `xid` VARCHARGE
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32), `resource_id` VARCHAR(32), `transaction_id` BIGINT
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8), `status` TINYINT
`status` TINYINT,
`client_id` VARCHAR(64), `application_data` TINYINT, `client_id` VARCHAR(64), `application_data` TINYINT
`application_data` VARCHAR(2000), `gmt_create
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`), `branch_id`, `idx_x
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8; -- the table to store lock data.

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(128) NOT NULL, -- the table to store lock data
`xid` VARCHAR(96),
`transaction_id` BIGINT, `branch_id` BIGINT, `branch_id` BIGINT
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36), `gmt_create` VARCHAR(256), `gmt_create
`gmt_create` DATETIME, `gmt_modify` VARCHAR(256), `pk` VARCHAR(36), `gmt_create` DATETIME
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8.

In MySQL, create seata database and execute the script under it. The final result is as follows: ! seata Database - MySQL 5.X

② Modify the conf/file configuration file to use the db database to share the global transaction session information of Seata TC Server. As shown in the following figure: ! conf/file configuration file

③ MySQL8 support

If your friend is using MySQL version 8.X, you need to see this step. Otherwise, you can just skip it.

Firstly, you need to download the MySQL 8.X JDBC driver, the command line operation is as follows:

$ cd lib
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar

Then, modify the conf/file configuration file to use the MySQL 8.X JDBC driver. As shown below: ! seata database - MySQL 8.X

3.3 Setting up to use the Nacos Registry

Modify the conf/registry.conf configuration file to set up the Nacos registry. As shown in the following figure: ! conf/registry.conf configuration file

3.4 Starting TC Server

① Execute nohup sh bin/seata-server.sh -p 18091 -n 1 & command to start the first TC Server in the background.

  • -p: Port on which Seata TC Server listens.
  • -n: Server node. In case of multiple TC Servers, it is necessary to differentiate the respective nodes for generating transactionId transaction numbers for different zones to avoid conflicts.

In the nohup.out file, we see the following log, indicating a successful startup:

# Using DB Stores
2020-04-05 16:54:12.793 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load DataSourceGenerator[dbcp] extension by class[io.seata.server.store.db.DbcpDataSourceGenerator]
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load LogStore[DB] extension by class[io. seata.core.store.db.LogStoreDataBaseDAO]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[DB] extension by class[io.seata.server.store.db.DatabaseTransactionStoreManager]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[DB] extension by class[ io.seata.server.session.db.DataBaseSessionManager]
# Started successfully
2020-04-05 16:54:13.779 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
# Using the Nacos Registry
2020-04-05 16:54:13.788 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load RegistryProvider[Nacos] extension by class[io.seata.discovery.registry.nacos.NacosRegistryProvider]

② Execute the nohup sh bin/seata-server.sh -p 28091 -n 2 & command to start the second TC Server in the background.

③ Open the Nacos Registry console and we can see that there are two Seata TC Server examples. As shown in the following figure: ! Nacos console

4. Accessing Java Applications

4.1 AT mode

① Spring Boot.

  1. "2. AT Mode + Multiple Data Sources" subsection of "Getting Started with Taro Road Spring Boot Distributed Transaction Seata" implements distributed transactions for a single Spring Boot project under multiple data sources.

! Overall diagram

  1. "AT Pattern + HttpClient Remote Call" subsection of "Getting Started with Taro Road Spring Boot Distributed Transaction Seata", to implement distributed transactions for multiple Spring Boot projects.

! Overall diagram

② Dubbo

Subsection "2. AT Patterns" of "Getting Started with Dubbo Distributed Transaction Seata" implements distributed transactions under multiple Dubbo services.

! Overall Diagram

③ Spring Cloud

The "3. AT Patterns + Feign" subsection of "Getting Started with Alibaba Distributed Transaction Seata for Taro Road Spring Cloud" implements multiple Spring Cloud services.

! Overall diagram

4.2 TCC Pattern

4.3 Saga mode

4.4 XA mode

Seata is under development...

· 7 min read

To make Seata highly available using a configuration centre and database, take Nacos and MySQL as an example and deploy the [cloud-seata-nacos](https://github.com/helloworlde/spring-cloud-alibaba-component/blob/ master/cloud-seata-nacos/) application to a Kubernetes cluster.

The application uses Nacos as the configuration and registration centre, and has three services: order-service, pay-service, and storage-service. The order-service provides an interface for placing an order, and when the balance and inventory are sufficient, the order succeeds and a transaction is submitted; when they are insufficient, an exception is thrown, the order fails, and the transaction is rolled back. Rollback transaction

Preparation

You need to prepare available registry, configuration centre Nacos and MySQL, usually, the registry, configuration centre and database are already available and do not need special configuration, in this practice, for simplicity, only deploy a stand-alone registry, configuration centre and database, assuming they are reliable

  • Deploying Nacos

Deploy Nacos on a server with port 8848 open for seata-server registration at 192.168.199.2.

docker run --name nacos -p 8848:8848 -e MODE=standalone nacos/nacos-server
  • Deploying MySQL

Deploy a MySQL database to hold transaction data at 192.168.199.2.

docker run --name mysql -p 30060:3306-e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17

Deploying seata-server

  • Create the tables needed for seata-server.

Refer to script/server/db for the exact SQL, here we are using MySQL's script and the database name is seata.

You also need to create the undo_log table, see script/client/at/db/.

  • Modify the seata-server configuration

Add the following configuration to the Nacos Configuration Centre, as described in script/config-center

service.vgroupMapping.my_test_tx_group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.
store.db.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true
store.db.user=root
store.db.password=123456

Deploying seata-server to Kubernetes

  • seata-server.yaml

You need to change the ConfigMap's Registry and Configuration Centre addresses to the appropriate addresses

apiVersion: v1
kind: Service
metadata: name: seata-ha-server.yaml
name: seata-ha-server
namespace: default
labels: app.kubernetes.io/name: seata-ha-server
app.kubernetes.io/name: seata-ha-server
spec.
type: ClusterIP
spec: type: ClusterIP
- port: 8091
protocol: TCP
name: http
selector: app.kubernetes.io/name: seata-ha-server
app.kubernetes.io/name: seata-ha-server

---apiVersion: apps/v1

apiVersion: apps/v1
kind: StatefulSet
metadata.
name: seata-ha-server
namespace: default
labels: app.kubernetes.io/name: seata-ha-server
app.kubernetes.io/name: seata-ha-server
spec: serviceName: seata-ha-server
serviceName: seata-ha-server
replicas: 3
selector: seata-ha-server
matchLabels.
app.kubernetes.io/name: seata-ha-server
template: seata-ha-server
metadata.
labels: app.kubernetes.io/name: seata-ha-server
app.kubernetes.io/name: seata-ha-server
spec.
containers: name: seata-ha-server
- name: seata-ha-server
image: docker.io/seataio/seata-server:latest
imagePullPolicy: IfNotPresent
env: name: SEATA_CONFIG
- name: SEATA_CONFIG_NAME
value: file:/root/seata-config/registry
ports: name: http
- name: http
containerPort: 8091
protocol: TCP
volumeMounts: name: seata-config
- name: seata-config
mountPath: /root/seata-config
volumes: name: seata-config mountPath: /root/seata-config
- name: seata-config
configMap: name: seata-ha-server-config
name: seata-ha-server-config


---apiVersion: v1
apiVersion: v1
kind: ConfigMap
apiVersion: v1 kind: ConfigMap
name: seata-ha-server-config
data: name: seata-ha-server-config
registry.conf: |
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "192.168.199.2"
}
}
config {
type = "nacos"
nacos {
serverAddr = "192.168.199.2"
group = "SEATA_GROUP"
}
}
  • Deployment
kubectl apply -f seata-server.yaml

When the deployment is complete, there will be three pods

kubectl get pod | grep seata-ha-server

seata-ha-server-645844b8b6-9qh5j 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-pzczs 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-wkpw8 1/1 Running 0 3m14s

After the startup is complete, you can find three instances of seata-server in the Nacos service list, so you have completed the highly available deployment of seata-server.

  • Viewing the service logs
kubelet logs -f seata-ha-server-645844b8b6-9qh5j
[0.012s][info ][gc] Using Serial
2020-04-15 00:55:09.880 INFO [main]io.seata.server.ParameterParser.init:90 -The server is running in container.
2020-04-15 00:55:10.013 INFO [main]io.seata.config.FileConfiguration.<init>:110 -The configuration file used is file:/root/seata- config/registry.conf
2020-04-15 00:55:12.426 INFO [main]com.alibaba.druid.pool.DruidDataSource.init:947 -{dataSource-1} inited
2020-04-15 00:55:13.127 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started

where {dataSource-1} indicates that the database is used and initialised properly

  • Looking at the registry, there are three instances of the seata-serve service at this point

! seata-ha-nacos-list.png

Deploying the business service

  • Create business tables and initialise data

You can refer to [cloud-seata-nacos/README.md](https://github.com/helloworlde/spring-cloud-alibaba-component/blob/master/cloud-seata- nacos/README.md).

  • Adding Nacos Configuration

Under the public namespace, create the configurations with data-id order-service.properties, pay-service.properties, storage-service.properties, with the same content. password

# MySQL
spring.datasource.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true &useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.
# Seata
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
  • Deploying the Service

Deploy the service via the application.yaml configuration file, and note that you need to change the NACOS_ADDR of the ConfigMap to your Nacos address.

apiVersion: v1
kind: Service
metadata: namespace: default
namespace: default
name: seata-ha-service
labels: app.kubernetes.io/name: seata-ha-service
app.kubernetes.io/name: seata-ha-service
spec.
type: NodePort
spec: type: NodePort
- nodePort: 30081
nodePort: 30081
protocol: TCP
name: http
selector: app.kubernetes.io/name: seata-ha-service
app.kubernetes.io/name: seata-ha-service

---
apiVersion: v1
kind: ConfigMap
metadata: name: seata-ha-service-config
name: seata-ha-service-config
data: NACOS_ADDR: 192.168.199.2:8848
NACOS_ADDR: 192.168.199.2:8848

---apiVersion: v1
apiVersion: v1
kind: ServiceAccount
metadata: name: seata-ha-account
name: seata-ha-account
namespace: default

---apiVersion: rbac.authorisation.k8s.io/v1beta1
apiVersion: rbac.authorisation.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata: name: seata-ha-account
name: seata-ha-account
roleRef.
apiGroup: rbac.authorisation.k8s.io/v1beta1 kind: ClusterRoleBinding
roleRef: apiGroup: rbac.authorisation.k8s.io
name: cluster-admin
subjects.
- kind: ServiceAccount
name: seata-ha-account
namespace: default

---
apiVersion: apps/v1
kind: Deployment
namespace: default --- --- apiVersion: apps/v1 kind: Deployment
namespace: default
name: seata-ha-service
labels: app.kubernetes.io/name: seata-ha-service
app.kubernetes.io/name: seata-ha-service
spec: replicas: 1
replicas: 1
selector.
matchLabels: app.kubernetes.io/name: seata-ha-service
app.kubernetes.io/name: seata-ha-service
template: seata-ha-service
metadata: seata-ha-service template.
labels: app.kubernetes.io/name: seata-ha-service
app.kubernetes.io/name: seata-ha-service
spec: serviceAccountName: seata-ha-service
serviceAccountName: seata-ha-account
containers: name: seata-ha-order
- name: seata-ha-order-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-order-service:1.1"
imagePullPolicy: IfNotPresent
imagePullPolicy: IfNotPresent
- name: NACOS_ADDR
valueFrom.
configMapKeyRef.
key: NACOS_ADDR
name: seata-ha-service-config
name: seata-ha-service-config
- name: http
containerPort: 8081
protocol: TCP
- name: seata-ha-pay-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-pay-service:1.1"
imagePullPolicy: IfNotPresent
env.
- name: NACOS_ADDR
valueFrom.
configMapKeyRef.
key: NACOS_ADDR
name: seata-ha-service-config
name: seata-ha-service-config
- name: http
containerPort: 8082
protocol: TCP
- name: seata-ha-storage-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-storage-service:1.1"
imagePullPolicy: IfNotPresent
env.
- name: NACOS_ADDR
valueFrom.
NACOS_ADDR valueFrom: NACOS_ADDR valueFrom: NACOS_ADDR valueFrom.
key: NACOS_ADDR
name: seata-ha-service-config
name: seata-ha-service-config
- name: http
containerPort: 8083
protocol: TCP

Deploy the application to the cluster with the following command

kubectl apply -f application.yaml

Then look at the pods that were created, there are three pods under the seata-ha-service service

kubectl get pod | grep seata-ha-service

seata-ha-service-7dbdc6894b-5r8q4 3/3 Running 0 12m

Once the application is up and running, in the Nacos service list, there will be the corresponding service

! seata-ha-service-list.png

At this point, if you look at the service's logs, you will see that the service has registered with each of the TC's

kubectl logs -f seata-ha-service-7dbdc6894b-5r8q4 seata-ha-order-service

! seata-ha-service-register.png

Looking at any TC log, you'll see that each service is registered with the TC

kubelet logs -f seata-ha-server-645844b8b6-9qh5j

! seata-ha-tc-register.png

Test

Test Success Scenario

Call the order interface, set the price to 1, because the initial balance is 10, and the order is placed successfully.

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 1
}'

At this point the return result is:

{"success":true, "message":null, "data":null}

Checking the TC logs, the transaction was successfully committed:

! seata-ha-commit-tc-success.png

View the order-service service log ! seata-ha-commit-success.png

Test failure scenario

If you set the price to 100 and the balance is not enough, the order fails and throws an exception, and the transaction is rolled back.

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 100
}'

View the logs for TC: ! seata-ha-commit-tc-rollback.png

View the logs of the service : ! seata-ha-commit-service-rollback.png

Multiple calls to view the service logs reveal that transaction registration is randomly initiated to one of the T Cs, and when capacity is expanded or scaled down, the corresponding TC participates or withdraws, proving that the high availability deployment is in effect.

· 6 min read

1. Introduction

According to the classification defined by experts, configurations can be categorized into three types: environment configuration, descriptive configuration, and extension configuration.

  • Environment configuration: Typically consists of discrete simple values like parameters for component startup, often in key-value pair format.
  • Descriptive configuration: Pertains to business logic, such as transaction initiators and participants, and is usually embedded within the lifecycle management of the business. Descriptive configuration contains more information, sometimes with hierarchical relationships.
  • Extension configuration: Products need to discover third-party implementations, requiring high aggregation of configurations. Examples include various configuration centers and registration centers. The common practice is to place the fully qualified class name files under the META-INF/services directory of the JAR file, with each line representing an implementation class name.

2. Environment Configuration

When the Seata server is loaded, it uses resources/registry.conf to determine the types of configuration centers and registration centers. Starting from version 1.0, Seata client not only loads configurations using the conf file but also allows configuration through YAML files in Spring Boot using seata.config.{type} for choosing the configuration center, similar to selecting the registration center. The source code for loading configurations via YAML is located in the io.seata.spring.boot.autoconfigure.properties.registry package.

If the user of the Seata client places both a conf configuration file under resources and configures via YAML files, the configuration in the YAML file will take precedence. Code example:

CURRENT_FILE_INSTANCE = null == extConfiguration ? configuration : extConfiguration;

Here, extConfiguration is an instance of external configuration provided by the ExtConfigurationProvider#provide() external configuration provider class, while configuration is provided by another configuration provider class, ConfigurationProvider#provide(). These two configuration provider classes are loaded through SPI in the static block of the ConfigurationFactory in the config module.

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);

The selection of configuration center types discussed above is related to determining the configuration environment. Once the type of configuration center to be used is determined, the environment configuration is loaded through the corresponding configuration center. File-based configuration, represented by File, is also considered a type of configuration center.

Both the client and server obtain configuration parameters by using ConfigurationFactory#getInstance() to get an instance of the configuration class, and then retrieve configuration parameters using the instance. The constants defining configuration keys are mainly found in the config file under the core module.

The meanings of some important environment configuration properties are documented on the official website.

During instantiation, the configuration parameters obtained through ConfigurationFactory and injected into constructors require a restart to take effect. However, parameters obtained in real-time using ConfigurationFactory become effective immediately when the configuration changes.

The config module provides the ConfigurationChangeListener#onChangeEvent interface method to modify internal attributes of instances. In this method, dynamic changes to properties are monitored, and if the properties used by the instance are found to have changed from the initial injection, the attributes stored in the instance are modified to align with the configuration center. This enables dynamic configuration updates.

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener {
private volatile boolean disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,false);
@Override public Object invoke(Param param) {
if(disable){//Transaction business processing}
}
@Override public void onChangeEvent(Param param) {
disable = param;
}}

The code snippet above pertains to the pseudo-code related to the GlobalTransactionalInterceptor and its degradation properties under the Spring module. When the GlobalTransactionalScanner instantiates the interceptor class mentioned above, it registers the interceptor into the list of configuration change listeners. When a configuration change occurs, the listener is invoked:

ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);

The term "degradation" refers to the scenario where a particular functionality of a service becomes unavailable. By dynamically configuring properties, this functionality can be turned off to avoid continuous attempts and failures. The interceptor#invoke() method executes Seata transaction-related business only when the disable attribute is set to true.

3. Descriptive Configuration

Descriptive configurations in general frameworks often contain abundant information, sometimes with hierarchical relationships. XML configuration is convenient for describing tree structures due to its strong descriptive capabilities. However, the current trend advocates for eliminating cumbersome prescriptive configurations in favor of using conventions.

In Seata's AT (Automatic Transaction) mode, transaction processing is achieved through proxying data sources, resulting in minimal intrusion on the business logic. Simply identifying which business components need to enable global transactions during Seata startup can be achieved using annotations, thus facilitating descriptive configuration.

@GlobalTransactional(timeoutMills = 300000, name = "busi-doBiz")
public String doBiz(String msg) {}

If using the TCC (Try-Confirm-Cancel) mode, transaction participants also need to annotate their involvement:

@TwoPhaseBusinessAction(name = "tccActionForSpringTest" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, int i);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);

4. Extension Configuration

Extension configurations typically have high requirements for product aggregation because products need to discover third-party implementations and incorporate them into their internals.

Image Description

Here's an example of a custom configuration center provider class. Place a text file with the same name as the interface under META-INF/services, and the content of the file should be the implementation class of the interface. This follows the standard SPI (Service Provider Interface) approach. Then, modify the configuration file registry.conf to set config.type=test.

However, if you think that by doing so, Seata can recognize it and replace the configuration center, then you are mistaken. When Seata loads the configuration center, it encapsulates the value of the configuration center type specified in the configuration file using the enum ConfigType:

private static Configuration buildConfiguration() {
configTypeName = "test";//The 'config.type' configured in 'registry.conf
configType = ConfigType.getType(configTypeName);//An exception will be thrown if ConfigType cannot be retrieved.
}

If a configuration center type like test is not defined in ConfigType, it will throw an exception. Therefore, merely modifying the configuration file without changing the source code will not enable the use of configuration center provider classes other than those defined in ConfigType.

Currently, in version 1.0, the configuration center types defined in ConfigType include: File, ZK, Nacos, Apollo, Consul, Etcd3, SpringCloudConfig, and Custom. If a user wishes to use a custom configuration center type, they can use the Custom type.

Image Description

One inelegant approach here is to provide an implementation class with a specified name ZK but with a higher priority level (order=3) than the default ZK implementation (which has order=1). This will make ConfigurationFactory use TestConfigurationProvider as the configuration center provider class.

Through the above steps, Seata can be configured to use our own provided code. Modules in Seata such as codec, compressor, discovery, integration, etc., all use the SPI mechanism to load functional classes, adhering to the design philosophy of microkernel + plug-in, treating third parties equally.

5. Seata Source Code Analysis Series

Author: Zhao Runze, Series Address.