Skip to content

Commit

Permalink
#1335 change sqlExecuteTimeout into what the parameter means
Browse files Browse the repository at this point in the history
  • Loading branch information
sunsun314 committed Nov 5, 2019
1 parent bee9a91 commit 13668fd
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ void execute(RouteResultsetNode node, ServerConnection source,
String compactInfo();

void setOldTimestamp(long oldTimestamp);

void setExecuting(boolean executing);

boolean isExecuting();
}
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ private void returnCon(BackendConnection c) {
if (c.isClosed()) {
return;
}

c.setAttachment(null);
c.setBorrowed(false);
c.setLastTime(TimeUtil.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static void requestFileDataResponse(byte[] data, BackendConnection conn)
RouteResultsetNode rrn = (RouteResultsetNode) conn.getAttachment();
LoadData loadData = rrn.getLoadData();
List<String> loadDataData = loadData.getData();

conn.setExecuting(false);
BufferedInputStream in = null;
try {
if (loadDataData != null && loadDataData.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class MySQLConnection extends AbstractConnection implements
private volatile String oldSchema;
private volatile boolean borrowed = false;
private volatile boolean isDDL = false;
private volatile boolean isRunning = false;
private volatile boolean isRowDataFlowing = false;
private volatile boolean isExecuting = false;
private volatile StatusSync statusSync;
private volatile boolean metaDataSynced = true;
private volatile TxState xaStatus = TxState.TX_INITIALIZE_STATE;
Expand Down Expand Up @@ -150,12 +151,12 @@ public void setPort(int port) {
this.port = port;
}

public void setRunning(boolean running) {
isRunning = running;
public void setRowDataFlowing(boolean rowDataFlowing) {
isRowDataFlowing = rowDataFlowing;
}

public boolean isRunning() {
return isRunning;
public boolean isRowDataFlowing() {
return isRowDataFlowing;
}

public TxState getXaStatus() {
Expand Down Expand Up @@ -298,6 +299,7 @@ public void sendQueryCmd(String query, CharsetNames clientCharset) {
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
isExecuting = true;
lastTime = TimeUtil.currentTimeMillis();
packet.write(this);
}
Expand All @@ -311,6 +313,7 @@ private WriteToBackendTask sendQueryCmdTask(String query, CharsetNames clientCha
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
isExecuting = true;
lastTime = TimeUtil.currentTimeMillis();
return new WriteToBackendTask(this, packet);
}
Expand Down Expand Up @@ -645,7 +648,8 @@ public synchronized void close(final String reason) {
} else {
closeInner(reason);
}
this.setRunning(false);
this.setExecuting(false);
this.setRowDataFlowing(false);
this.signal();
} else {
this.cleanup();
Expand Down Expand Up @@ -680,7 +684,8 @@ private void closeResponseHandler(final String reason) {
@Override
public void run() {
try {
conn.setRunning(false);
conn.setExecuting(false);
conn.setRowDataFlowing(false);
conn.signal();
handler.connectionClose(conn, reason);
respHandler = null;
Expand Down Expand Up @@ -749,7 +754,7 @@ public void release() {
this.close("close for clear usrVariables");
return;
}
if (this.isRunning()) {
if (this.isRowDataFlowing()) {
if (logResponse.compareAndSet(false, true)) {
session.setBackendResponseEndTime(this);
}
Expand All @@ -768,6 +773,15 @@ public void release() {
pool.releaseChannel(this);
}


public boolean isExecuting() {
return isExecuting;
}

public void setExecuting(boolean executing) {
isExecuting = executing;
}

public boolean setResponseHandler(ResponseHandler queryHandler) {
if (handler instanceof MySQLConnectionHandler) {
((MySQLConnectionHandler) handler).setResponseHandler(queryHandler);
Expand Down Expand Up @@ -897,10 +911,12 @@ public int getTxIsolation() {
public boolean syncAndExecute() {
StatusSync sync = this.statusSync;
if (sync == null) {
isExecuting = false;
return true;
} else {
boolean executed = sync.synAndExecuted(this);
if (executed) {
isExecuting = false;
statusSync = null;
}
return executed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ private void handleOkPacket(byte[] data) {
* execute ERROR packet
*/
private void handleErrorPacket(byte[] data) {
ResponseHandler respHand = responseHandler;
this.source.setRunning(false);
final ResponseHandler respHand = responseHandler;
this.source.setExecuting(false);
this.source.setRowDataFlowing(false);
this.source.signal();
if (respHand != null) {
respHand.errorResponse(data, source);
Expand All @@ -190,7 +191,7 @@ private void handleRequestPacket(byte[] data) {
*/
private void handleFieldEofPacket(byte[] data) {
ResponseHandler respHand = responseHandler;
this.source.setRunning(true);
this.source.setRowDataFlowing(true);
if (respHand != null) {
respHand.fieldEofResponse(header, fields, null, data, false, source);
} else {
Expand Down Expand Up @@ -225,7 +226,8 @@ private void handleRowEofPacket(byte[] data) {
if (session != null && !source.isTesting() && this.source.getLogResponse().compareAndSet(false, true)) {
session.setBackendResponseEndTime(this.source);
}
this.source.setRunning(false);
this.source.setExecuting(false);
this.source.setRowDataFlowing(false);
this.source.signal();
if (responseHandler != null) {
responseHandler.rowEofResponse(data, false, source);
Expand All @@ -243,7 +245,8 @@ protected void handleDataError(Exception e) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1000));
}
resultStatus = RESULT_STATUS_INIT;
this.source.setRunning(false);
this.source.setExecuting(false);
this.source.setRowDataFlowing(false);
this.source.signal();
ResponseHandler handler = this.responseHandler;
if (handler != null)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/actiontech/dble/net/NIOProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private void backendCheck() {
}
}
// close the conn which executeTimeOut
if (!c.isDDL() && c.isBorrowed() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) {
if (!c.isDDL() && c.isBorrowed() && c.isExecuting() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) {
LOGGER.info("found backend connection SQL timeout ,close it " + c);
c.close("sql timeout");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public BackEndDataCleaner(MySQLConnection backendConnection) {
public void waitUntilDataFinish() {
lock.lock();
try {
while (backendConnection.isRunning() && !backendConnection.isClosed()) {
while (backendConnection.isRowDataFlowing() && !backendConnection.isClosed()) {
LOGGER.info("await for the row data get to a end");
condRelease.await();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void run() {
try {
lock.lock();
try {
if (backendConnection.isRunning()) {
if (backendConnection.isRowDataFlowing()) {

if (!condRelease.await(10, TimeUnit.MILLISECONDS)) {
backendConnection.close("recycle time out");
Expand Down

0 comments on commit 13668fd

Please sign in to comment.