Skip to content

Commit

Permalink
transaction init
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhuqing666 committed Dec 6, 2016
1 parent 4dc5580 commit 2bb3092
Show file tree
Hide file tree
Showing 65 changed files with 1,107 additions and 4,636 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ hs_err_pid*
.DS_Store
/target/
/tmlogs/
/txlog/
/txlogs/
src/main/resources/server.xml
src/main/resources/schema.xml
src/main/resources/rule.xml
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/io/mycat/backend/mysql/nio/MySQLConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class MySQLConnection extends BackendAIOConnection {
private volatile String oldSchema;
private volatile boolean borrowed = false;
private volatile boolean modifiedSQLExecuted = false;
private volatile int batchCmdCount = 0;
private AtomicInteger batchCmdCount;

private static long initClientFlags() {
int flag = 0;
Expand Down Expand Up @@ -148,7 +148,7 @@ private static long initClientFlags() {
private final AtomicBoolean isQuit;
private volatile StatusSync statusSync;
private volatile boolean metaDataSyned = true;
private volatile int xaStatus = 0;
private volatile TxState xaStatus = TxState.TX_INITIALIZE_STATE;

public MySQLConnection(NetworkChannel channel, boolean fromSlaveDB) {
super(channel);
Expand All @@ -161,11 +161,11 @@ public MySQLConnection(NetworkChannel channel, boolean fromSlaveDB) {
this.txIsolation = MycatServer.getInstance().getConfig().getSystem().getTxIsolation();
}

public int getXaStatus() {
public TxState getXaStatus() {
return xaStatus;
}

public void setXaStatus(int xaStatus) {
public void setXaStatus(TxState xaStatus) {
this.xaStatus = xaStatus;
}

Expand Down Expand Up @@ -324,17 +324,18 @@ private static class StatusSync {
private final Integer txtIsolation;
private final Boolean autocommit;
private final AtomicInteger synCmdCount;
private final boolean xaStarted;

public StatusSync(boolean xaStarted, String schema,
Integer charsetIndex, Integer txtIsolation, Boolean autocommit,
int synCount) {
super();
this.xaStarted = xaStarted;
this.schema = schema;
this.charsetIndex = charsetIndex;
this.txtIsolation = txtIsolation;
this.autocommit = autocommit;
if (xaStarted) {
synCount++;
}
this.synCmdCount = new AtomicInteger(synCount);
}

Expand Down Expand Up @@ -527,8 +528,7 @@ public void commit() {
}

public boolean batchCmdFinished() {
batchCmdCount--;
return (batchCmdCount == 0);
return (batchCmdCount.decrementAndGet() == 0);
}

public void execCmd(String cmd) {
Expand All @@ -537,7 +537,7 @@ public void execCmd(String cmd) {

public void execBatchCmd(String[] batchCmds) {
// "XA END "+xaID+";"+"XA PREPARE "+xaID
this.batchCmdCount = batchCmds.length;
this.batchCmdCount = new AtomicInteger(batchCmds.length);
StringBuilder sb = new StringBuilder();
for (String sql : batchCmds) {
sb.append(sql).append(';');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,13 @@ public void commit(BackendConnection conn) {
if(conn instanceof MySQLConnection)
{
MySQLConnection mysqlCon = (MySQLConnection) conn;
if (mysqlCon.getXaStatus() == 1)
{
String xaTxId = session.getXaTXID();
String[] cmds = new String[]{"XA END " + xaTxId,
"XA PREPARE " + xaTxId};
mysqlCon.execBatchCmd(cmds);
} else
{
conn.commit();
}
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) {
String xaTxId = session.getXaTXID();
String[] cmds = new String[] { "XA END " + xaTxId, "XA PREPARE " + xaTxId };
mysqlCon.execBatchCmd(cmds);
} else {
conn.commit();
}
}else
{
conn.commit();
Expand All @@ -91,15 +88,15 @@ public void okResponse(byte[] ok, BackendConnection conn) {
MySQLConnection mysqlCon = (MySQLConnection) conn;
switch (mysqlCon.getXaStatus())
{
case 1:
case TX_STARTED_STATE:
if (mysqlCon.batchCmdFinished())
{
String xaTxId = session.getXaTXID();
mysqlCon.execCmd("XA COMMIT " + xaTxId);
mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);
}
return;
case 2:
case TX_PREPARED_STATE:
{
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,7 @@ public void errorResponse(byte[] err, BackendConnection conn) {
if (this.cmdHandler.releaseConOnErr()) {
session.releaseConnection(conn);
} else {



session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(),
false);
session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false);
}
if (this.finished()) {
cmdHandler.errorResponse(session, err, this.nodeCount,
Expand All @@ -145,7 +141,7 @@ public void okResponse(byte[] ok, BackendConnection conn) {
MySQLConnection mysqlCon = (MySQLConnection) conn;
switch (mysqlCon.getXaStatus())
{
case TxState.TX_STARTED_STATE:
case TX_STARTED_STATE:
//if there have many SQL execute wait the okResponse,will come to here one by one
//should be wait all nodes ready ,then send xa commit to all nodes.
if (mysqlCon.batchCmdFinished())
Expand All @@ -172,7 +168,7 @@ public void okResponse(byte[] ok, BackendConnection conn) {
mysqlCon.execCmd(cmd);
}
return;
case TxState.TX_PREPARED_STATE:
case TX_PREPARED_STATE:
{
//recovery log
String xaTxId = session.getXaTXID();
Expand All @@ -197,8 +193,7 @@ public void okResponse(byte[] ok, BackendConnection conn) {
if (this.cmdHandler.relaseConOnOK()) {
session.releaseConnection(conn);
} else {
session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(),
false);
session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false);
}
if (this.finished()) {
cmdHandler.okResponse(session, ok);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* @author mycat
*/
abstract class MultiNodeHandler implements ResponseHandler {
public abstract class MultiNodeHandler implements ResponseHandler {
private static final Logger LOGGER = LoggerFactory
.getLogger(MultiNodeHandler.class);
protected final ReentrantLock lock = new ReentrantLock();
Expand Down Expand Up @@ -66,12 +66,9 @@ public boolean isFail() {

protected int nodeCount;

private Runnable terminateCallBack;



protected boolean canClose(BackendConnection conn, boolean tryErrorFinish) {

// realse this connection if safe
session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false);
boolean allFinished = false;
Expand All @@ -84,43 +81,27 @@ protected boolean canClose(BackendConnection conn, boolean tryErrorFinish) {
}

protected void decrementCountToZero() {
Runnable callback;
lock.lock();
try {
nodeCount = 0;
callback = this.terminateCallBack;
this.terminateCallBack = null;
} finally {
lock.unlock();
}
if (callback != null) {
callback.run();
}
}

public void connectionError(Throwable e, BackendConnection conn) {
final boolean canClose = decrementCountBy(1);
// 需要把Throwable e的错误信息保存下来(setFail()), 否则会导致响应
//null信息,结果mysql命令行等客户端查询结果是"Query OK"!!
// @author Uncle-pan
// @since 2016-03-26
if(canClose){
setFail("backend connect: "+e);
}
this.setFail("backend connect: "+e);
LOGGER.warn("backend connect", e);
this.tryErrorFinished(canClose);
this.tryErrorFinished(decrementCountBy(1));
}

public void errorResponse(byte[] data, BackendConnection conn) {
session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false);
ErrorPacket err = new ErrorPacket();
err.read(data);

String errmsg = new String(err.message);
this.setFail(errmsg);

LOGGER.warn("error response from " + conn + " err " + errmsg + " code:" + err.errno);

this.tryErrorFinished(this.decrementCountBy(1));
}

Expand All @@ -141,19 +122,13 @@ public boolean clearIfSessionClosed(NonBlockingSession session) {

protected boolean decrementCountBy(int finished) {
boolean zeroReached = false;
Runnable callback = null;
lock.lock();
try {
if (zeroReached = --nodeCount == 0) {
callback = this.terminateCallBack;
this.terminateCallBack = null;
}
nodeCount -= finished;
zeroReached = nodeCount == 0;
} finally {
lock.unlock();
}
if (zeroReached && callback != null) {
callback.run();
}
return zeroReached;
}

Expand All @@ -179,7 +154,6 @@ protected ErrorPacket createErrPkg(String errmgs) {

protected void tryErrorFinished(boolean allEnd) {
if (allEnd && !session.closed()) {

if (errorRepsponsed.compareAndSet(false, true)) {
createErrPkg(this.error).write(session.getSource());
}
Expand All @@ -194,28 +168,27 @@ protected void tryErrorFinished(boolean allEnd) {
// clear resouces
clearResources();
}

}

}

public void connectionClose(BackendConnection conn, String reason) {
this.setFail("closed connection:" + reason + " con:" + conn);
boolean finished = false;
lock.lock();
try {
finished = (this.nodeCount == 0);

} finally {
lock.unlock();
}
if (finished == false) {
finished = this.decrementCountBy(1);
}
if (error == null) {
error = "back connection closed ";
}
tryErrorFinished(finished);
// boolean finished = false;
// lock.lock();
// try {
// finished = (this.nodeCount == 0);
//
// } finally {
// lock.unlock();
// }
// if (finished == false) {
// finished = this.decrementCountBy(1);
// }
// if (error == null) {
// error = "back connection closed ";
// }
// tryErrorFinished(finished);
tryErrorFinished(this.decrementCountBy(1));
}

public void clearResources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
import io.mycat.backend.BackendConnection;
import io.mycat.backend.datasource.PhysicalDBNode;
import io.mycat.backend.mysql.LoadDataUtil;
import io.mycat.backend.mysql.nio.handler.TransactionHandler.TxOperation;
import io.mycat.backend.mysql.nio.handler.transaction.AutoTxHandler;
import io.mycat.backend.mysql.nio.handler.transaction.AutoTxHandler.TxOperation;
import io.mycat.cache.LayerCachePool;
import io.mycat.config.ErrorCode;
import io.mycat.config.MycatConfig;
Expand Down Expand Up @@ -174,6 +175,11 @@ public void execute() throws Exception {
if(!autocommit||session.getSource().isTxstart()||node.isModifySQL()){
sb.append("["+node.getName()+"]"+node.getStatement()).append(";\n");
}
}
if(sb.length()>0){
TxnLogHelper.putTxnLog(session.getSource(), sb.toString());
}
for (final RouteResultsetNode node : rrs.getNodes()) {
BackendConnection conn = session.getTarget(node);
if (session.tryExistsCon(conn, node)) {
LOGGER.debug("node.getRunOnSlave()-" + node.getRunOnSlave());
Expand All @@ -193,10 +199,6 @@ public void execute() throws Exception {
// session.bindConnection(node, conn);
// _execute(conn, node);
}

}
if(sb.length()>0){
TxnLogHelper.putTxnLog(session.getSource(), sb.toString());
}
}

Expand Down Expand Up @@ -790,7 +792,7 @@ protected void handleEndPacket(byte[] data, TxOperation txOperation, BackendConn
ServerConnection source = session.getSource();
if (source.isAutocommit() &&!source.isTxstart()&& conn.isModifiedSQLExecuted()) {
//隐式分布式事务
TransactionHandler txHandler = new TransactionHandler(rrs.getNodes(), errConnection, session, txOperation, closeReason);
AutoTxHandler txHandler = new AutoTxHandler(rrs.getNodes(), errConnection, session, txOperation, closeReason);
txHandler.execute(data);
} else {
boolean inTransaction = !source.isAutocommit() || source.isTxstart();
Expand Down
Loading

0 comments on commit 2bb3092

Please sign in to comment.