diff --git a/pom.xml b/pom.xml
index d0b7c15f0c..89d34ef35a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,7 @@
4.0.0
com.actiontech
dble
- 2.19.03.3
+ 2.19.03.4
jar
dble-server
The project of dble-server
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java
index 037d0de2d8..dde700ebb4 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java
@@ -255,7 +255,7 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, BackendConnection c
}
finishedTest = true;
session.setTraceSimpleHandler(handler);
- session.setPreExecuteEnd();
+ session.setPreExecuteEnd(false);
handler.execute();
} catch (Exception e) {
LOGGER.warn(String.valueOf(source) + oriRrs, e);
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java
index d91835a94f..2b5b7e1ab5 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java
@@ -112,7 +112,7 @@ private void execute(BackendConnection conn) {
TxnLogHelper.putTxnLog(session.getSource(), node.getStatement());
}
session.readyToDeliver();
- session.setPreExecuteEnd();
+ session.setPreExecuteEnd(false);
conn.execute(node, session.getSource(), isAutocommit);
}
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java
index 0229107658..7d5b050009 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java
@@ -9,8 +9,8 @@
import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.GlobalVisitor;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.*;
-import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.AggregateHandler;
+import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.AllAnySubQueryHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.InSubQueryHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.SingleRowSubQueryHandler;
@@ -186,8 +186,8 @@ protected final void noShardBuild() {
}
}
- MultiNodeMergeHandler mh = new MultiNodeMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(),
- session, null);
+ MultiNodeMergeHandler mh = new MultiNodeEasyMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(),
+ session);
addHandler(mh);
}
@@ -391,11 +391,15 @@ public static long getSequenceId() {
protected void buildMergeHandler(PlanNode planNode, RouteResultsetNode[] rrssArray) {
hBuilder.checkRRSs(rrssArray);
- MultiNodeMergeHandler mh = null;
List orderBys = planNode.getGroupBys().size() > 0 ? planNode.getGroupBys() : planNode.getOrderBys();
+ boolean isEasyMerge = rrssArray.length == 1 || (orderBys == null || orderBys.size() == 0);
- mh = new MultiNodeMergeHandler(getSequenceId(), rrssArray, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session,
- orderBys);
+ MultiNodeMergeHandler mh;
+ if (isEasyMerge) {
+ mh = new MultiNodeEasyMergeHandler(getSequenceId(), rrssArray, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session);
+ } else {
+ mh = new MultiNodeMergeAndOrderHandler(getSequenceId(), rrssArray, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session, orderBys);
+ }
addHandler(mh);
}
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java
index f4f9dd3298..3ee368da0c 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java
@@ -69,8 +69,8 @@ public BaseHandlerBuilder build() throws Exception {
//set slave only into rrsNode
for (DMLResponseHandler startHandler : fh.getMerges()) {
MultiNodeMergeHandler mergeHandler = (MultiNodeMergeHandler) startHandler;
- for (BaseSelectHandler bshandler : mergeHandler.getExeHandlers()) {
- bshandler.getRrss().setRunOnSlave(this.session.getComplexRrs().getRunOnSlave());
+ for (BaseSelectHandler baseHandler : mergeHandler.getExeHandlers()) {
+ baseHandler.getRrss().setRunOnSlave(this.session.getComplexRrs().getRunOnSlave());
}
}
session.endComplexRoute();
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/NoNameNodeHandlerBuilder.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/NoNameNodeHandlerBuilder.java
index 7512117e96..65095db31c 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/NoNameNodeHandlerBuilder.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/NoNameNodeHandlerBuilder.java
@@ -7,6 +7,7 @@
import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.PushDownVisitor;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
+import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeEasyMergeHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler;
import com.actiontech.dble.config.model.SchemaConfig;
import com.actiontech.dble.plan.node.NoNameNode;
@@ -54,8 +55,7 @@ public void buildOwn() {
String randomDatenode = getRandomNode(schemaConfig.getAllDataNodes());
RouteResultsetNode[] rrss = new RouteResultsetNode[]{new RouteResultsetNode(randomDatenode, ServerParse.SELECT, sql)};
hBuilder.checkRRSs(rrss);
- MultiNodeMergeHandler mh = new MultiNodeMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(),
- session, null);
+ MultiNodeMergeHandler mh = new MultiNodeEasyMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session);
addHandler(mh);
}
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/DMLResponseHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/DMLResponseHandler.java
index 8eafebd834..8bf7c8e7e0 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/DMLResponseHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/DMLResponseHandler.java
@@ -11,7 +11,7 @@
public interface DMLResponseHandler extends ResponseHandler {
enum HandlerType {
- DIRECT, TEMPTABLE, BASESEL, REFRESHFP, MERGE, JOIN, WHERE, GROUPBY, HAVING, ORDERBY, LIMIT, UNION, DISTINCT, SENDMAKER, FINAL, SCALAR_SUB_QUERY, IN_SUB_QUERY, ALL_ANY_SUB_QUERY, RENAME_FIELD
+ TEMPTABLE, BASESEL, EASY_MERGE, MERGE_AND_ORDER, JOIN, NOT_IN, WHERE, GROUPBY, HAVING, ORDERBY, LIMIT, UNION, DISTINCT, SENDMAKER, FINAL, SCALAR_SUB_QUERY, IN_SUB_QUERY, ALL_ANY_SUB_QUERY, RENAME_FIELD
}
HandlerType type();
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java
new file mode 100644
index 0000000000..27a229aec1
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2016-2019 ActionTech.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.backend.mysql.nio.handler.query.impl;
+
+import com.actiontech.dble.backend.BackendConnection;
+import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
+import com.actiontech.dble.net.mysql.FieldPacket;
+import com.actiontech.dble.net.mysql.RowDataPacket;
+import com.actiontech.dble.route.RouteResultsetNode;
+import com.actiontech.dble.server.NonBlockingSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * mergeHandler will merge data,if contains aggregate function,use group by handler
+ *
+ * @author ActionTech
+ */
+public class MultiNodeEasyMergeHandler extends MultiNodeMergeHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeEasyMergeHandler.class);
+ private int rowEndConCount = 0;
+
+ public MultiNodeEasyMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session) {
+ super(id, route, autocommit, session);
+ this.merges.add(this);
+ }
+
+ @Override
+ public void execute() throws Exception {
+ synchronized (exeHandlers) {
+ if (terminate.get())
+ return;
+ for (BaseSelectHandler exeHandler : exeHandlers) {
+ session.setHandlerStart(exeHandler); //base start execute
+ MySQLConnection exeConn = exeHandler.initConnection();
+ if (exeConn != null) {
+ exeConn.setComplexQuery(true);
+ exeHandler.execute(exeConn);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void fieldEofResponse(byte[] header, List fields, List fieldPackets, byte[] eof,
+ boolean isLeft, BackendConnection conn) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(conn.toString() + "'s field is reached.");
+ }
+ session.setHandlerStart(this);
+ // if terminated
+ if (terminate.get()) {
+ return;
+ }
+ lock.lock(); // for combine
+ try {
+ if (this.fieldPackets.isEmpty()) {
+ this.fieldPackets = fieldPackets;
+ nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, conn);
+ }
+ startEasyMerge();
+ if (++reachedConCount == route.length) {
+ session.allBackendConnReceive();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) {
+ if (terminate.get())
+ return true;
+ return nextHandler.rowResponse(null, rowPacket, this.isLeft, conn);
+ }
+
+ @Override
+ public void rowEofResponse(byte[] data, boolean isLeft, BackendConnection conn) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(conn.toString() + " 's rowEof is reached.");
+ }
+
+ if (this.terminate.get())
+ return;
+ lock.lock();
+ try {
+ if (++rowEndConCount == route.length) {
+ session.setHandlerEnd(this);
+ nextHandler.rowEofResponse(null, this.isLeft, conn);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ protected void ownThreadJob(Object... objects) {
+ }
+
+ @Override
+ protected void terminateThread() throws Exception {
+ recycleConn();
+ }
+
+ @Override
+ protected void recycleResources() {
+ }
+
+ @Override
+ public HandlerType type() {
+ return HandlerType.EASY_MERGE;
+ }
+}
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java
new file mode 100644
index 0000000000..462632151f
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright (C) 2016-2019 ActionTech.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.backend.mysql.nio.handler.query.impl;
+
+import com.actiontech.dble.DbleServer;
+import com.actiontech.dble.backend.BackendConnection;
+import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
+import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap;
+import com.actiontech.dble.backend.mysql.nio.handler.util.HeapItem;
+import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator;
+import com.actiontech.dble.net.mysql.FieldPacket;
+import com.actiontech.dble.net.mysql.RowDataPacket;
+import com.actiontech.dble.plan.Order;
+import com.actiontech.dble.route.RouteResultsetNode;
+import com.actiontech.dble.server.NonBlockingSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * mergeHandler will merge data,if contains aggregate function,use group by handler
+ *
+ * @author ActionTech
+ */
+public class MultiNodeMergeAndOrderHandler extends MultiNodeMergeHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeMergeAndOrderHandler.class);
+
+ private final int queueSize;
+ // map;conn->blocking queue.if receive row packet, add to the queue,if receive rowEof packet, add NullHeapItem into queue;
+ private Map> queues;
+ private List orderBys;
+ private RowDataComparator rowComparator;
+ private volatile boolean noNeedRows = false;
+
+ public MultiNodeMergeAndOrderHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session,
+ List orderBys) {
+ super(id, route, autocommit, session);
+ this.orderBys = orderBys;
+ this.queueSize = DbleServer.getInstance().getConfig().getSystem().getMergeQueueSize();
+ this.queues = new ConcurrentHashMap<>();
+ this.merges.add(this);
+
+ }
+
+ @Override
+ public void execute() throws Exception {
+ synchronized (exeHandlers) {
+ if (terminate.get())
+ return;
+ for (BaseSelectHandler exeHandler : exeHandlers) {
+ session.setHandlerStart(exeHandler); //base start execute
+ MySQLConnection exeConn = exeHandler.initConnection();
+ if (exeConn != null) {
+ exeConn.setComplexQuery(true);
+ queues.put(exeConn, new LinkedBlockingQueue<>(queueSize));
+ exeHandler.execute(exeConn);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void fieldEofResponse(byte[] header, List fields, List fieldPackets, byte[] eof,
+ boolean isLeft, BackendConnection conn) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(conn.toString() + "'s field is reached.");
+ }
+ session.setHandlerStart(this);
+ // if terminated
+ if (terminate.get()) {
+ return;
+ }
+ lock.lock(); // for combine
+ try {
+ if (this.fieldPackets.isEmpty()) {
+ this.fieldPackets = fieldPackets;
+ rowComparator = new RowDataComparator(this.fieldPackets, orderBys, this.isAllPushDown(), this.type());
+ nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, conn);
+ }
+ if (++reachedConCount == route.length) {
+ session.allBackendConnReceive();
+ startOwnThread();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) {
+ if (terminate.get() || noNeedRows)
+ return true;
+
+ MySQLConnection mySQLConn = (MySQLConnection) conn;
+ BlockingQueue queue = queues.get(mySQLConn);
+ if (queue == null)
+ return true;
+ HeapItem item = new HeapItem(row, rowPacket, mySQLConn);
+ try {
+ queue.put(item);
+ } catch (InterruptedException e) {
+ //ignore error
+ }
+ return false;
+ }
+
+ @Override
+ public void rowEofResponse(byte[] data, boolean isLeft, BackendConnection conn) {
+ MySQLConnection mySQLConn = (MySQLConnection) conn;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(mySQLConn.toString() + " 's rowEof is reached.");
+ }
+
+ if (this.terminate.get())
+ return;
+ BlockingQueue queue = queues.get(mySQLConn);
+ if (queue == null)
+ return;
+ try {
+ queue.put(HeapItem.nullItem());
+ } catch (InterruptedException e) {
+ //ignore error
+ }
+ }
+
+ @Override
+ protected void ownThreadJob(Object... objects) {
+ try {
+ ArrayMinHeap heap = new ArrayMinHeap<>(new Comparator() {
+ @Override
+ public int compare(HeapItem o1, HeapItem o2) {
+ RowDataPacket row1 = o1.getRowPacket();
+ RowDataPacket row2 = o2.getRowPacket();
+ if (row1 == null || row2 == null) {
+ if (row1 == row2)
+ return 0;
+ if (row1 == null)
+ return -1;
+ return 1;
+ }
+ return rowComparator.compare(row1, row2);
+ }
+ });
+ // init heap
+ for (Entry> entry : queues.entrySet()) {
+ HeapItem firstItem = entry.getValue().take();
+ heap.add(firstItem);
+ }
+ while (!heap.isEmpty()) {
+ if (terminate.get())
+ return;
+ HeapItem top = heap.peak();
+ if (top.isNullItem()) {
+ heap.poll();
+ } else {
+ BlockingQueue topItemQueue = queues.get(top.getIndex());
+ HeapItem item = topItemQueue.take();
+ heap.replaceTop(item);
+ if (nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), this.isLeft, top.getIndex())) {
+ noNeedRows = true;
+ while (!heap.isEmpty()) {
+ HeapItem itemToDiscard = heap.poll();
+ if (!itemToDiscard.isNullItem()) {
+ BlockingQueue discardQueue = queues.get(itemToDiscard.getIndex());
+ while (true) {
+ if (discardQueue.take().isNullItem() || terminate.get()) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ if (LOGGER.isDebugEnabled()) {
+ String executeQueries = getRoutesSql(route);
+ LOGGER.debug(executeQueries + " heap send eof: ");
+ }
+ session.setHandlerEnd(this);
+ nextHandler.rowEofResponse(null, this.isLeft, queues.keySet().iterator().next());
+ } catch (Exception e) {
+ String msg = "Merge thread error, " + e.getLocalizedMessage();
+ LOGGER.info(msg, e);
+ session.onQueryError(msg.getBytes());
+ }
+ }
+
+ @Override
+ protected void terminateThread() throws Exception {
+ for (Entry> entry : this.queues.entrySet()) {
+ // add EOF to signal atoMerge thread
+ entry.getValue().clear();
+ entry.getValue().put(new HeapItem(null, null, entry.getKey()));
+ }
+ recycleConn();
+ }
+
+ @Override
+ protected void recycleResources() {
+ Iterator>> iterator = this.queues.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry> entry = iterator.next();
+ // fair lock queue,poll for clear
+ while (true) {
+ if (entry.getValue().poll() == null) {
+ break;
+ }
+ }
+ iterator.remove();
+ }
+ }
+
+ private String getRoutesSql(RouteResultsetNode[] nodes) {
+ StringBuilder sb = new StringBuilder();
+ sb.append('{');
+ Map> sqlMap = new HashMap<>();
+ for (RouteResultsetNode rrss : nodes) {
+ String sql = rrss.getStatement();
+ if (!sqlMap.containsKey(sql)) {
+ List rrssList = new ArrayList<>();
+ rrssList.add(rrss);
+ sqlMap.put(sql, rrssList);
+ } else {
+ List rrssList = sqlMap.get(sql);
+ rrssList.add(rrss);
+ }
+ }
+ for (Entry> entry : sqlMap.entrySet()) {
+ sb.append(entry.getKey()).append(entry.getValue()).append(';');
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public HandlerType type() {
+ return HandlerType.MERGE_AND_ORDER;
+ }
+}
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java
index 839c2bfb11..57b1043ef7 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java
@@ -5,29 +5,15 @@
package com.actiontech.dble.backend.mysql.nio.handler.query.impl;
-import com.actiontech.dble.DbleServer;
-import com.actiontech.dble.backend.BackendConnection;
-import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.OwnThreadDMLHandler;
-import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap;
-import com.actiontech.dble.backend.mysql.nio.handler.util.HeapItem;
-import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator;
import com.actiontech.dble.config.ErrorCode;
-import com.actiontech.dble.net.mysql.FieldPacket;
-import com.actiontech.dble.net.mysql.RowDataPacket;
-import com.actiontech.dble.plan.Order;
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -35,23 +21,14 @@
*
* @author ActionTech
*/
-public class MultiNodeMergeHandler extends OwnThreadDMLHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeMergeHandler.class);
+public abstract class MultiNodeMergeHandler extends OwnThreadDMLHandler {
- private final int queueSize;
- private final ReentrantLock lock;
- private final List exeHandlers;
- // map;conn->blocking queue.if receive row packet, add to the queue,if receive rowEof packet, add NullHeapItem into queue;
- private Map> queues;
- private List orderBys;
- private RowDataComparator rowComparator;
- private RouteResultsetNode[] route;
- private int reachedConCount;
- private boolean isEasyMerge;
- private volatile boolean noNeedRows = false;
+ protected final ReentrantLock lock;
+ final List exeHandlers;
+ protected RouteResultsetNode[] route;
+ int reachedConCount = 0;
- public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session,
- List orderBys) {
+ public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session) {
super(id, session);
this.exeHandlers = new ArrayList<>();
this.lock = new ReentrantLock();
@@ -63,204 +40,15 @@ public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autoco
this.exeHandlers.add(exeHandler);
}
this.route = route;
- this.orderBys = orderBys;
- this.queueSize = DbleServer.getInstance().getConfig().getSystem().getMergeQueueSize();
- this.isEasyMerge = route.length == 1 || (orderBys == null || orderBys.size() == 0);
- this.queues = new ConcurrentHashMap<>();
- this.merges.add(this);
+ session.setRouteResultToTrace(route);
}
+ public abstract void execute() throws Exception;
+
public List getExeHandlers() {
return exeHandlers;
}
- public void execute() throws Exception {
- synchronized (exeHandlers) {
- if (terminate.get())
- return;
- for (BaseSelectHandler exeHandler : exeHandlers) {
- session.setHandlerStart(exeHandler); //base start execute
- MySQLConnection exeConn = exeHandler.initConnection();
- if (exeConn != null) {
- exeConn.setComplexQuery(true);
- queues.put(exeConn, new LinkedBlockingQueue(queueSize));
- exeHandler.execute(exeConn);
- }
- }
- }
- }
-
- @Override
- public void fieldEofResponse(byte[] header, List fields, List fieldPackets, byte[] eof,
- boolean isLeft, BackendConnection conn) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(conn.toString() + "'s field is reached.");
- }
- session.setHandlerStart(this);
- // if terminated
- if (terminate.get()) {
- return;
- }
- lock.lock(); // for combine
- try {
- if (this.fieldPackets.isEmpty()) {
- this.fieldPackets = fieldPackets;
- rowComparator = makeRowDataSorter();
- nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, conn);
- }
- if (isEasyMerge) {
- startEasyMerge();
- } else {
- if (++reachedConCount == route.length) {
- session.allBackendConnReceive();
- startOwnThread();
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) {
- if (terminate.get() || noNeedRows)
- return true;
-
- if (isEasyMerge) {
- nextHandler.rowResponse(null, rowPacket, this.isLeft, conn);
- } else {
- MySQLConnection mySQLConn = (MySQLConnection) conn;
- BlockingQueue queue = queues.get(mySQLConn);
- if (queue == null)
- return true;
- HeapItem item = new HeapItem(row, rowPacket, mySQLConn);
- try {
- queue.put(item);
- } catch (InterruptedException e) {
- //ignore error
- }
- }
- return false;
- }
-
- @Override
- public void rowEofResponse(byte[] data, boolean isLeft, BackendConnection conn) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(conn.toString() + " 's rowEof is reached.");
- }
-
- if (this.terminate.get())
- return;
- if (isEasyMerge) {
- lock.lock();
- try {
- if (++reachedConCount == route.length) {
- session.setHandlerEnd(this);
- nextHandler.rowEofResponse(null, this.isLeft, conn);
- }
- } finally {
- lock.unlock();
- }
- } else {
- BlockingQueue queue = queues.get(conn);
- if (queue == null)
- return;
- try {
- queue.put(HeapItem.nullItem());
- } catch (InterruptedException e) {
- //ignore error
- }
- }
- }
-
- @Override
- protected void ownThreadJob(Object... objects) {
- try {
- ArrayMinHeap heap = new ArrayMinHeap<>(new Comparator() {
-
- @Override
- public int compare(HeapItem o1, HeapItem o2) {
- RowDataPacket row1 = o1.getRowPacket();
- RowDataPacket row2 = o2.getRowPacket();
- if (row1 == null || row2 == null) {
- if (row1 == row2)
- return 0;
- if (row1 == null)
- return -1;
- return 1;
- }
- return rowComparator.compare(row1, row2);
- }
- });
- // init heap
- for (Map.Entry> entry : queues.entrySet()) {
- HeapItem firstItem = entry.getValue().take();
- heap.add(firstItem);
- }
- while (!heap.isEmpty()) {
- if (terminate.get())
- return;
- HeapItem top = heap.peak();
- if (top.isNullItem()) {
- heap.poll();
- } else {
- BlockingQueue topItemQueue = queues.get(top.getIndex());
- HeapItem item = topItemQueue.take();
- heap.replaceTop(item);
- if (nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), this.isLeft, top.getIndex())) {
- noNeedRows = true;
- while (!heap.isEmpty()) {
- HeapItem itemToDiscard = heap.poll();
- if (!itemToDiscard.isNullItem()) {
- BlockingQueue discardQueue = queues.get(itemToDiscard.getIndex());
- while (true) {
- if (discardQueue.take().isNullItem() || terminate.get()) {
- break;
- }
- }
- }
- }
- }
- }
- }
- if (LOGGER.isDebugEnabled()) {
- String executeQueries = getRoutesSql(route);
- LOGGER.debug(executeQueries + " heap send eof: ");
- }
- session.setHandlerEnd(this);
- nextHandler.rowEofResponse(null, this.isLeft, queues.keySet().iterator().next());
- } catch (Exception e) {
- String msg = "Merge thread error, " + e.getLocalizedMessage();
- LOGGER.info(msg, e);
- session.onQueryError(msg.getBytes());
- }
- }
-
- @Override
- protected void terminateThread() throws Exception {
- for (Entry> entry : this.queues.entrySet()) {
- // add EOF to signal atoMerge thread
- entry.getValue().clear();
- entry.getValue().put(new HeapItem(null, null, entry.getKey()));
- }
- recycleConn();
- }
-
- @Override
- protected void recycleResources() {
- Iterator>> iterator = this.queues.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry> entry = iterator.next();
- // fair lock queue,poll for clear
- while (true) {
- if (entry.getValue().poll() == null) {
- break;
- }
- }
- iterator.remove();
- }
- }
-
protected void recycleConn() {
synchronized (exeHandlers) {
for (BaseSelectHandler exeHandler : exeHandlers) {
@@ -284,36 +72,4 @@ private void terminatePreHandler(DMLResponseHandler handler) {
}
}
- private RowDataComparator makeRowDataSorter() {
- if (!isEasyMerge)
- return new RowDataComparator(this.fieldPackets, orderBys, this.isAllPushDown(), this.type());
- return null;
- }
-
- @Override
- public HandlerType type() {
- return HandlerType.MERGE;
- }
-
- private String getRoutesSql(RouteResultsetNode[] nodes) {
- StringBuilder sb = new StringBuilder();
- sb.append('{');
- Map> sqlMap = new HashMap<>();
- for (RouteResultsetNode rrss : nodes) {
- String sql = rrss.getStatement();
- if (!sqlMap.containsKey(sql)) {
- List rrssList = new ArrayList<>();
- rrssList.add(rrss);
- sqlMap.put(sql, rrssList);
- } else {
- List rrssList = sqlMap.get(sql);
- rrssList.add(rrss);
- }
- }
- for (Map.Entry> entry : sqlMap.entrySet()) {
- sb.append(entry.getKey()).append(entry.getValue()).append(';');
- }
- sb.append('}');
- return sb.toString();
- }
}
diff --git a/src/main/java/com/actiontech/dble/manager/ManagerConnection.java b/src/main/java/com/actiontech/dble/manager/ManagerConnection.java
index eda04c993e..a12c0d0d05 100644
--- a/src/main/java/com/actiontech/dble/manager/ManagerConnection.java
+++ b/src/main/java/com/actiontech/dble/manager/ManagerConnection.java
@@ -41,6 +41,11 @@ public void startProcess() {
//do nothing
}
+ @Override
+ public void markFinished() {
+ //do nothing
+ }
+
@Override
public void handle(final byte[] data) {
handler.handle(data);
diff --git a/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java b/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java
index ab678a1879..68693ce6fa 100644
--- a/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java
+++ b/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java
@@ -223,6 +223,14 @@ public static void handle(String stmt, ManagerConnection c, int offset) {
case ManagerParseShow.DDL_STATE:
ShowDdlState.execute(c);
break;
+ case ManagerParseShow.CONNECTION_SQL_STATUS:
+ String id = stmt.substring(rs >>> 8).trim();
+ if (StringUtil.isEmpty(id)) {
+ c.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");
+ } else {
+ ShowConnectionSQLStatus.execute(c, id);
+ }
+ break;
default:
if (isSupportShow(stmt)) {
Iterator iterator = DbleServer.getInstance().getConfig().getDataHosts().values().iterator();
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java
index 1949034248..b3664a1dbf 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java
@@ -15,6 +15,7 @@
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.ResultSetHeaderPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
+import com.actiontech.dble.server.ServerConnection;
import com.actiontech.dble.util.FormatUtil;
import com.actiontech.dble.util.LongUtil;
import com.actiontech.dble.util.StringUtil;
@@ -29,7 +30,7 @@ public final class ShowConnectionSQL {
private ShowConnectionSQL() {
}
- private static final int FIELD_COUNT = 7;
+ private static final int FIELD_COUNT = 8;
private static final ResultSetHeaderPacket HEADER = PacketUtil.getHeader(FIELD_COUNT);
private static final FieldPacket[] FIELDS = new FieldPacket[FIELD_COUNT];
private static final EOFPacket EOF = new EOFPacket();
@@ -60,6 +61,9 @@ private ShowConnectionSQL() {
FIELDS[i] = PacketUtil.getField("SQL", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i++].setPacketId(++packetId);
+ FIELDS[i] = PacketUtil.getField("STAGE", Fields.FIELD_TYPE_VAR_STRING);
+ FIELDS[i].setPacketId(++packetId);
+
EOF.setPacketId(++packetId);
}
@@ -113,7 +117,12 @@ private static RowDataPacket getRow(FrontendConnection c, String charset) {
executeSql = c.getExecuteSql().length() <= 1024 ? c.getExecuteSql() : c.getExecuteSql().substring(0, 1024);
}
row.add(StringUtil.encode(executeSql, charset));
+ if (c instanceof ServerConnection) {
+ ServerConnection sc = (ServerConnection) c;
+ row.add(StringUtil.encode(sc.getSession2().getSessionStage().toString(), charset));
+ } else {
+ row.add(StringUtil.encode("Manager connection", charset));
+ }
return row;
}
-
}
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQLStatus.java b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQLStatus.java
new file mode 100644
index 0000000000..1f47a73ef4
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQLStatus.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2016-2019 ActionTech.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.manager.response;
+
+import com.actiontech.dble.DbleServer;
+import com.actiontech.dble.backend.mysql.PacketUtil;
+import com.actiontech.dble.config.ErrorCode;
+import com.actiontech.dble.config.Fields;
+import com.actiontech.dble.manager.ManagerConnection;
+import com.actiontech.dble.net.FrontendConnection;
+import com.actiontech.dble.net.NIOProcessor;
+import com.actiontech.dble.net.mysql.EOFPacket;
+import com.actiontech.dble.net.mysql.FieldPacket;
+import com.actiontech.dble.net.mysql.ResultSetHeaderPacket;
+import com.actiontech.dble.net.mysql.RowDataPacket;
+import com.actiontech.dble.server.ServerConnection;
+import com.actiontech.dble.server.status.SlowQueryLog;
+import com.actiontech.dble.util.StringUtil;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public final class ShowConnectionSQLStatus {
+ private ShowConnectionSQLStatus() {
+ }
+ private static final int FIELD_COUNT = 6;
+ private static final ResultSetHeaderPacket HEADER = PacketUtil.getHeader(FIELD_COUNT);
+ private static final FieldPacket[] FIELDS = new FieldPacket[FIELD_COUNT];
+ private static final EOFPacket EOF = new EOFPacket();
+
+ static {
+ int i = 0;
+ byte packetId = 0;
+ HEADER.setPacketId(++packetId);
+
+ FIELDS[i] = PacketUtil.getField("OPERATION", Fields.FIELD_TYPE_LONG);
+ FIELDS[i++].setPacketId(++packetId);
+
+ FIELDS[i] = PacketUtil.getField("START(ms)", Fields.FIELD_TYPE_VAR_STRING);
+ FIELDS[i++].setPacketId(++packetId);
+
+ FIELDS[i] = PacketUtil.getField("END(ms)", Fields.FIELD_TYPE_VAR_STRING);
+ FIELDS[i++].setPacketId(++packetId);
+
+ FIELDS[i] = PacketUtil.getField("DURATION(ms)", Fields.FIELD_TYPE_VAR_STRING);
+ FIELDS[i++].setPacketId(++packetId);
+
+ FIELDS[i] = PacketUtil.getField("DATA_NODE", Fields.FIELD_TYPE_VAR_STRING);
+ FIELDS[i++].setPacketId(++packetId);
+
+ FIELDS[i] = PacketUtil.getField("SQL/REF", Fields.FIELD_TYPE_VAR_STRING);
+ FIELDS[i].setPacketId(++packetId);
+ EOF.setPacketId(++packetId);
+ }
+
+ public static void execute(ManagerConnection c, String id) {
+ if (!SlowQueryLog.getInstance().isEnableSlowLog()) {
+ c.writeErrMessage(ErrorCode.ER_YES, "please enable @@slow_query_log first");
+ return;
+ }
+ long realId = 0;
+ try {
+ realId = Long.parseLong(id);
+ } catch (NumberFormatException e) {
+ c.writeErrMessage(ErrorCode.ER_YES, "front_id must be a number");
+ return;
+ }
+ NIOProcessor[] processors = DbleServer.getInstance().getFrontProcessors();
+ FrontendConnection target = null;
+ for (NIOProcessor p : processors) {
+ for (FrontendConnection fc : p.getFrontends().values()) {
+ if (fc != null && fc.getId() == realId) {
+ target = fc;
+ break;
+ }
+ }
+ }
+ if (target == null) {
+ c.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " doesn't exist");
+ return;
+ }
+ if (target instanceof ManagerConnection) {
+ c.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " is a manager connection");
+ return;
+ }
+ ByteBuffer buffer = c.allocate();
+
+ // write header
+ buffer = HEADER.write(buffer, c, true);
+
+ // write fields
+ for (FieldPacket field : FIELDS) {
+ buffer = field.write(buffer, c, true);
+ }
+
+ // write eof
+ buffer = EOF.write(buffer, c, true);
+
+ // write rows
+ byte packetId = EOF.getPacketId();
+
+ List results = ((ServerConnection) target).getSession2().genRunningSQLStage();
+ if (results != null) {
+ for (String[] result : results) {
+ RowDataPacket row = new RowDataPacket(FIELD_COUNT);
+ for (int i = 0; i < FIELD_COUNT; i++) {
+ row.add(StringUtil.encode(result[i], c.getCharset().getResults()));
+ }
+ row.setPacketId(++packetId);
+ buffer = row.write(buffer, c, true);
+ }
+ }
+
+ // write last eof
+ EOFPacket lastEof = new EOFPacket();
+ lastEof.setPacketId(++packetId);
+ buffer = lastEof.write(buffer, c, true);
+
+ // write buffer
+ c.write(buffer);
+ }
+}
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowHelp.java b/src/main/java/com/actiontech/dble/manager/response/ShowHelp.java
index 93c0a21bf7..c9c3fdb909 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowHelp.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowHelp.java
@@ -114,6 +114,7 @@ private static RowDataPacket getRow(String stmt, String desc, String charset) {
HELPS.put("show @@session", "Report front session details");
HELPS.put("show @@session.xa", "Report front session and associated xa transaction details");
HELPS.put("show @@connection.sql", "Report connection sql");
+ HELPS.put("show @@connection.sql.status where FRONT_ID= ?;", "Show current connection sql status and detail");
HELPS.put("show @@sql", "Report SQL list");
// helps.put("show @@sql where id = ?", "Report specify SQL");
HELPS.put("show @@sql.high", "Report Hight Frequency SQL");
diff --git a/src/main/java/com/actiontech/dble/net/FrontendConnection.java b/src/main/java/com/actiontech/dble/net/FrontendConnection.java
index 6aa731709c..342ffdd503 100755
--- a/src/main/java/com/actiontech/dble/net/FrontendConnection.java
+++ b/src/main/java/com/actiontech/dble/net/FrontendConnection.java
@@ -211,6 +211,7 @@ public void writeErrMessage(byte id, int vendorCode, String msg) {
}
protected void writeErrMessage(byte id, int vendorCode, String sqlState, String msg) {
+ markFinished();
ErrorPacket err = new ErrorPacket();
err.setPacketId(id);
err.setErrNo(vendorCode);
@@ -223,6 +224,8 @@ protected void writeErrMessage(byte id, int vendorCode, String sqlState, String
public abstract void startProcess();
+ protected abstract void markFinished();
+
public void initDB(byte[] data) {
MySQLMessage mm = new MySQLMessage(data);
diff --git a/src/main/java/com/actiontech/dble/plan/util/ComplexQueryPlanUtil.java b/src/main/java/com/actiontech/dble/plan/util/ComplexQueryPlanUtil.java
index d1b5ed9dd2..ec5b45b242 100644
--- a/src/main/java/com/actiontech/dble/plan/util/ComplexQueryPlanUtil.java
+++ b/src/main/java/com/actiontech/dble/plan/util/ComplexQueryPlanUtil.java
@@ -8,8 +8,8 @@
import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.*;
-import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.AggregateHandler;
+import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.join.JoinHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.join.NotInHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.AllAnySubQueryHandler;
@@ -74,10 +74,16 @@ private static String buildHandlerTree(DMLResponseHandler endHandler, Map mergeList = new ArrayList<>();
mergeList.addAll(((MultiNodeMergeHandler) startHandler).getExeHandlers());
- String mergeNode = genHandlerName("MERGE", nameMap);
- ReferenceHandlerInfo refInfo = new ReferenceHandlerInfo(mergeNode, "MERGE", mergeHandler);
+ String mergeNode = genHandlerName(mergeName, nameMap);
+ ReferenceHandlerInfo refInfo = new ReferenceHandlerInfo(mergeNode, mergeName, mergeHandler);
handlerMap.put(mergeHandler, refInfo);
refMap.put(mergeNode, refInfo);
for (BaseSelectHandler exeHandler : mergeList) {
diff --git a/src/main/java/com/actiontech/dble/route/parser/ManagerParseShow.java b/src/main/java/com/actiontech/dble/route/parser/ManagerParseShow.java
index 5c6e51cd65..949643e06c 100644
--- a/src/main/java/com/actiontech/dble/route/parser/ManagerParseShow.java
+++ b/src/main/java/com/actiontech/dble/route/parser/ManagerParseShow.java
@@ -38,6 +38,7 @@ private ManagerParseShow() {
public static final int TIME_CURRENT = 22;
public static final int TIME_STARTUP = 23;
public static final int VERSION = 24;
+ public static final int CONNECTION_SQL_STATUS = 26;
public static final int CONNECTION_SQL = 27;
public static final int DATANODE_SCHEMA = 28;
public static final int DATASOURCE_WHERE = 29;
@@ -1604,8 +1605,14 @@ private static int show2ConnectonSQL(String stmt, int offset) {
char c1 = stmt.charAt(++offset);
char c2 = stmt.charAt(++offset);
if ((c1 == 'q' || c1 == 'Q') && (c2 == 'l' || c2 == 'L')) {
- if (ParseUtil.isErrorTail(++offset, stmt)) {
- return OTHER;
+ while (stmt.length() > ++offset) {
+ if (ParseUtil.isSpace(stmt.charAt(offset))) {
+ continue;
+ } else if ('.' == stmt.charAt(offset)) {
+ return show2ConnectonStatusCheck(stmt, offset);
+ } else {
+ return OTHER;
+ }
}
return CONNECTION_SQL;
}
@@ -1613,6 +1620,97 @@ private static int show2ConnectonSQL(String stmt, int offset) {
return OTHER;
}
+ private static int show2ConnectonStatusCheck(String stmt, int offset) {
+ if (stmt.length() > offset + "STATUS".length()) {
+ char c1 = stmt.charAt(++offset);
+ char c2 = stmt.charAt(++offset);
+ char c3 = stmt.charAt(++offset);
+ char c4 = stmt.charAt(++offset);
+ char c5 = stmt.charAt(++offset);
+ char c6 = stmt.charAt(++offset);
+ if ((c1 == 'S' || c1 == 's') && (c2 == 'T' || c2 == 't') && (c3 == 'A' || c3 == 'a') &&
+ (c4 == 'T' || c4 == 't') && (c5 == 'U' || c5 == 'u') && (c6 == 'S' || c6 == 's')) {
+ while (stmt.length() > ++offset) {
+ if (ParseUtil.isSpace(stmt.charAt(offset))) {
+ continue;
+ }
+ switch (stmt.charAt(offset)) {
+ case 'W':
+ case 'w':
+ if (!ParseUtil.isSpace(stmt.charAt(offset - 1))) {
+ return OTHER;
+ }
+ return show2ConnectonStatusWhereCheck(stmt, offset);
+ default:
+ return OTHER;
+ }
+ }
+ }
+ }
+ return OTHER;
+ }
+
+ private static int show2ConnectonStatusWhereCheck(String stmt, int offset) {
+ if (stmt.length() > offset + "HERE".length()) {
+ char c1 = stmt.charAt(++offset);
+ char c2 = stmt.charAt(++offset);
+ char c3 = stmt.charAt(++offset);
+ char c4 = stmt.charAt(++offset);
+ if ((c1 == 'H' || c1 == 'h') && (c2 == 'E' || c2 == 'e') && (c3 == 'R' || c3 == 'r') &&
+ (c4 == 'E' || c4 == 'e')) {
+ while (stmt.length() > ++offset) {
+ if (ParseUtil.isSpace(stmt.charAt(offset))) {
+ continue;
+ }
+ switch (stmt.charAt(offset)) {
+ case 'f':
+ case 'F':
+ if (!ParseUtil.isSpace(stmt.charAt(offset - 1))) {
+ return OTHER;
+ }
+ return show2ConnectonStatusFrontCheck(stmt, offset);
+ default:
+ return OTHER;
+ }
+ }
+ }
+ }
+ return OTHER;
+ }
+
+ private static int show2ConnectonStatusFrontCheck(String stmt, int offset) {
+ if (stmt.length() > offset + "RONT_ID".length()) {
+ char c1 = stmt.charAt(++offset);
+ char c2 = stmt.charAt(++offset);
+ char c3 = stmt.charAt(++offset);
+ char c4 = stmt.charAt(++offset);
+ char c5 = stmt.charAt(++offset);
+ char c6 = stmt.charAt(++offset);
+ char c7 = stmt.charAt(++offset);
+ if ((c1 == 'R' || c1 == 'r') && (c2 == 'O' || c2 == 'o') &&
+ (c3 == 'N' || c3 == 'n') && (c4 == 'T' || c4 == 't') && c5 == '_' &&
+ (c6 == 'I' || c6 == 'i') && (c7 == 'D' || c7 == 'd')) {
+ while (stmt.length() > ++offset) {
+ if (ParseUtil.isSpace(stmt.charAt(offset))) {
+ continue;
+ }
+ switch (stmt.charAt(offset)) {
+ case '=':
+ while (stmt.length() > ++offset) {
+ if (!ParseUtil.isSpace(stmt.charAt(offset))) {
+ return (offset << 8) | CONNECTION_SQL_STATUS;
+ }
+ }
+ return OTHER;
+ default:
+ return OTHER;
+ }
+ }
+ }
+ }
+ return OTHER;
+ }
+
// SHOW @@TIME.CURRENT
private static int show2TimeCCheck(String stmt, int offset) {
if (stmt.length() > offset + "URRENT".length()) {
diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java
index 3be272f445..c5bf1edf1c 100644
--- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java
+++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java
@@ -108,6 +108,7 @@ public class NonBlockingSession implements Session {
private volatile boolean traceEnable = false;
private volatile TraceResult traceResult = new TraceResult();
private volatile RouteResultset complexRrs = null;
+ private volatile SessionStage sessionStage = SessionStage.Init;
public NonBlockingSession(ServerConnection source) {
this.source = source;
@@ -127,7 +128,9 @@ public ServerConnection getSource() {
}
void setRequestTime() {
+ sessionStage = SessionStage.Read_SQL;
long requestTime = 0;
+
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
requestTime = System.nanoTime();
traceResult.setVeryStartPrepare(requestTime);
@@ -158,6 +161,7 @@ void setRequestTime() {
}
void startProcess() {
+ sessionStage = SessionStage.Parse_SQL;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setParseStartPrepare(new TraceRecord(System.nanoTime()));
}
@@ -168,6 +172,7 @@ void startProcess() {
}
public void endParse() {
+ sessionStage = SessionStage.Route_Calculation;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.ready();
traceResult.setRouteStart(new TraceRecord(System.nanoTime()));
@@ -180,6 +185,7 @@ public void endParse() {
void endRoute(RouteResultset rrs) {
+ sessionStage = SessionStage.Prepare_to_Push;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setPreExecuteStart(new TraceRecord(System.nanoTime()));
}
@@ -211,15 +217,24 @@ public void readyToDeliver() {
provider.readyToDeliver(source.getId());
}
- public void setPreExecuteEnd() {
+ public void setPreExecuteEnd(boolean isComplexQuery) {
+ sessionStage = SessionStage.Execute_SQL;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
+ traceResult.setComplexQuery(isComplexQuery);
traceResult.setPreExecuteEnd(new TraceRecord(System.nanoTime()));
traceResult.clearConnReceivedMap();
traceResult.clearConnFlagMap();
}
}
+ public void setSubQuery() {
+ if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
+ traceResult.setSubQuery(true);
+ }
+ }
+
public void setBackendRequestTime(long backendID) {
+ sessionStage = SessionStage.First_Node_Fetching_Result;
if (!timeCost) {
return;
}
@@ -290,6 +305,7 @@ public void allBackendConnReceive() {
}
public void setResponseTime(boolean isSuccess) {
+ sessionStage = SessionStage.Finished;
long responseTime = 0;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
responseTime = System.nanoTime();
@@ -312,7 +328,12 @@ public void setResponseTime(boolean isSuccess) {
QueryTimeCostContainer.getInstance().add(queryTimeCost);
}
+ public void setStageFinished() {
+ sessionStage = SessionStage.Finished;
+ }
+
public void setBackendResponseEndTime(MySQLConnection conn) {
+ sessionStage = SessionStage.First_Node_Fetched_Result;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
RouteResultsetNode node = (RouteResultsetNode) conn.getAttachment();
ResponseHandler responseHandler = conn.getRespHandler();
@@ -331,6 +352,7 @@ public void setBackendResponseEndTime(MySQLConnection conn) {
}
public void setBeginCommitTime() {
+ sessionStage = SessionStage.Distributed_Transaction_Commit;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setAdtCommitBegin(new TraceRecord(System.nanoTime()));
}
@@ -349,6 +371,10 @@ public void setHandlerStart(DMLResponseHandler handler) {
}
public void setHandlerEnd(DMLResponseHandler handler) {
+ if (handler.getNextHandler() != null) {
+ DMLResponseHandler next = handler.getNextHandler();
+ sessionStage = SessionStage.changeFromHandlerType(next.type());
+ }
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.addToRecordEndMap(handler, new TraceRecord(System.nanoTime()));
}
@@ -362,6 +388,15 @@ public List genTraceResult() {
}
}
+ public List genRunningSQLStage() {
+ if (SlowQueryLog.getInstance().isEnableSlowLog()) {
+ TraceResult tmpResult = (TraceResult) traceResult.clone();
+ return tmpResult.genRunningSQLStage();
+ } else {
+ return null;
+ }
+ }
+
@Override
public int getTargetCount() {
return target.size();
@@ -391,6 +426,10 @@ public boolean isNeedWaitFinished() {
return needWaitFinished;
}
+ public SessionStage getSessionStage() {
+ return sessionStage;
+ }
+
/**
* SET CANCELABLE STATUS
*/
@@ -435,6 +474,8 @@ public void execute(RouteResultset rrs) {
}
return;
}
+
+ setRouteResultToTrace(rrs.getNodes());
if (this.getSessionXaID() != null && this.xaState == TxState.TX_INITIALIZE_STATE) {
this.xaState = TxState.TX_STARTED_STATE;
}
@@ -459,6 +500,12 @@ public void execute(RouteResultset rrs) {
}
}
+ public void setRouteResultToTrace(RouteResultsetNode[] nodes) {
+ if (SlowQueryLog.getInstance().isEnableSlowLog()) {
+ traceResult.setDataNodes(nodes);
+ }
+ }
+
private void executeMultiResultSet(RouteResultset rrs) {
if (rrs.getSqlType() == ServerParse.DDL) {
/*
@@ -482,7 +529,7 @@ private void executeMultiResultSet(RouteResultset rrs) {
} else if (ServerParse.SELECT == rrs.getSqlType() && rrs.getGroupByCols() != null) {
MultiNodeSelectHandler multiNodeSelectHandler = new MultiNodeSelectHandler(rrs, this);
setTraceSimpleHandler(multiNodeSelectHandler);
- setPreExecuteEnd();
+ setPreExecuteEnd(false);
readyToDeliver();
if (this.isPrepared()) {
multiNodeSelectHandler.setPrepared(true);
@@ -499,7 +546,7 @@ private void executeMultiResultSet(RouteResultset rrs) {
} else {
MultiNodeQueryHandler multiNodeHandler = new MultiNodeQueryHandler(rrs, this);
setTraceSimpleHandler(multiNodeHandler);
- setPreExecuteEnd();
+ setPreExecuteEnd(false);
readyToDeliver();
if (this.isPrepared()) {
multiNodeHandler.setPrepared(true);
@@ -570,8 +617,9 @@ public void executeMultiSelect(RouteResultset rrs) {
return;
}
}
- setPreExecuteEnd();
+ setPreExecuteEnd(true);
if (PlanUtil.containsSubQuery(node)) {
+ setSubQuery();
final PlanNode finalNode = node;
DbleServer.getInstance().getComplexQueryExecutor().execute(new Runnable() {
//sub Query build will be blocked, so use ComplexQueryExecutor
diff --git a/src/main/java/com/actiontech/dble/server/ServerConnection.java b/src/main/java/com/actiontech/dble/server/ServerConnection.java
index dacd4ac629..7a47803f5e 100644
--- a/src/main/java/com/actiontech/dble/server/ServerConnection.java
+++ b/src/main/java/com/actiontech/dble/server/ServerConnection.java
@@ -180,6 +180,11 @@ public void startProcess() {
session.startProcess();
}
+ @Override
+ public void markFinished() {
+ session.setStageFinished();
+ }
+
public void executeTask() {
for (Pair> task : contextTask) {
switch (task.getKey()) {
diff --git a/src/main/java/com/actiontech/dble/server/SessionStage.java b/src/main/java/com/actiontech/dble/server/SessionStage.java
new file mode 100644
index 0000000000..59a2c9d2f5
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/server/SessionStage.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2016-2019 ActionTech.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.server;
+
+import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
+
+public enum SessionStage {
+ Init,
+ Read_SQL, Parse_SQL, Route_Calculation, Prepare_to_Push, Execute_SQL, First_Node_Fetching_Result,
+ First_Node_Fetched_Result, Distributed_Transaction_Commit, Finished, Generate_New_Query,
+ Nested_Loop, Easy_Merge, Merge_and_Order, Join, Not_In, Where_Filter, Aggregate, Having_filter,
+ Order, Limit, Union, Distinct, Send_Maker, Write_to_Client, Scalar_Sub_Query, In_Sub_Query, All_Any_Sub_Query, Renamed_Filed,;
+
+ public static SessionStage changeFromHandlerType(DMLResponseHandler.HandlerType handlerType) {
+ switch (handlerType) {
+ case TEMPTABLE:
+ return Nested_Loop;
+ case BASESEL:
+ return Generate_New_Query;
+ case EASY_MERGE:
+ return Easy_Merge;
+ case MERGE_AND_ORDER:
+ return Merge_and_Order;
+ case JOIN:
+ return Join;
+ case NOT_IN:
+ return Not_In;
+ case WHERE:
+ return Where_Filter;
+ case GROUPBY:
+ return Aggregate;
+ case HAVING:
+ return Having_filter;
+ case ORDERBY:
+ return Order;
+ case LIMIT:
+ return Limit;
+ case UNION:
+ return Union;
+ case DISTINCT:
+ return Distinct;
+ case SENDMAKER:
+ return Send_Maker;
+ case FINAL:
+ return Write_to_Client;
+ case SCALAR_SUB_QUERY:
+ return Scalar_Sub_Query;
+ case IN_SUB_QUERY:
+ return In_Sub_Query;
+ case ALL_ANY_SUB_QUERY:
+ return All_Any_Sub_Query;
+ case RENAME_FIELD:
+ return Renamed_Filed;
+ default:
+ //not happen
+ }
+ return Write_to_Client; //not happen
+ }
+}
diff --git a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java
index 7476f14d60..5bdd238ecf 100644
--- a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java
+++ b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java
@@ -13,17 +13,17 @@
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler;
import com.actiontech.dble.plan.util.ComplexQueryPlanUtil;
import com.actiontech.dble.plan.util.ReferenceHandlerInfo;
+import com.actiontech.dble.route.RouteResultsetNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TraceResult implements Cloneable {
private static final Logger LOGGER = LoggerFactory.getLogger(TraceResult.class);
+ private boolean prepareFinished = false;
private long veryStartPrepare;
private long veryStart;
private TraceRecord requestStartPrepare;
@@ -32,6 +32,7 @@ public class TraceResult implements Cloneable {
private TraceRecord parseStart; //requestEnd
private TraceRecord routeStart; //parseEnd
private TraceRecord preExecuteStart; //routeEnd
+ private RouteResultsetNode[] dataNodes;
private TraceRecord preExecuteEnd;
private TraceRecord adtCommitBegin; //auto Distributed Transaction commit begin
@@ -45,8 +46,11 @@ public class TraceResult implements Cloneable {
private ConcurrentMap recordStartMap = new ConcurrentHashMap<>();
private ConcurrentMap recordEndMap = new ConcurrentHashMap<>();
private long veryEnd;
+ private boolean complexQuery = false;
+ private boolean subQuery = false;
public void setVeryStartPrepare(long veryStartPrepare) {
+ prepareFinished = false;
this.veryStartPrepare = veryStartPrepare;
}
@@ -70,6 +74,21 @@ public void setPreExecuteEnd(TraceRecord preExecuteEnd) {
this.preExecuteEnd = preExecuteEnd;
}
+ public RouteResultsetNode[] getDataNodes() {
+ return dataNodes;
+ }
+
+ public void setDataNodes(RouteResultsetNode[] dataNodes) {
+ if (this.dataNodes == null) {
+ this.dataNodes = dataNodes;
+ } else {
+ RouteResultsetNode[] tempDataNodes = new RouteResultsetNode[this.dataNodes.length + dataNodes.length];
+ System.arraycopy(this.dataNodes, 0, tempDataNodes, 0, this.dataNodes.length);
+ System.arraycopy(dataNodes, 0, tempDataNodes, this.dataNodes.length, dataNodes.length);
+ this.dataNodes = tempDataNodes;
+ }
+ }
+
public void setSimpleHandler(ResponseHandler simpleHandler) {
this.simpleHandler = simpleHandler;
}
@@ -86,6 +105,14 @@ public void setAdtCommitEnd(TraceRecord adtCommitEnd) {
this.adtCommitEnd = adtCommitEnd;
}
+ public void setComplexQuery(boolean complexQuery) {
+ this.complexQuery = complexQuery;
+ }
+
+ public void setSubQuery(boolean subQuery) {
+ this.subQuery = subQuery;
+ }
+
public synchronized Boolean addToConnFlagMap(String item) {
return connFlagMap.putIfAbsent(item, true);
}
@@ -125,10 +152,14 @@ public void setVeryEnd(long veryEnd) {
}
public void ready() {
+ prepareFinished = true;
clear();
veryStart = veryStartPrepare;
requestStart = requestStartPrepare;
parseStart = parseStartPrepare;
+ veryStartPrepare = 0;
+ requestStartPrepare = null;
+ parseStartPrepare = null;
}
private void clear() {
@@ -138,9 +169,11 @@ private void clear() {
routeStart = null;
preExecuteStart = null;
preExecuteEnd = null;
+ dataNodes = null;
adtCommitBegin = null;
adtCommitEnd = null;
-
+ complexQuery = false;
+ subQuery = false;
simpleHandler = null;
builder = null; //for complex query
connFlagMap.clear();
@@ -157,6 +190,67 @@ private void clear() {
veryEnd = 0;
}
+ public List genRunningSQLStage() {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("start genRunningSQLStage");
+ }
+ List lst = new ArrayList<>();
+ if (!prepareFinished) {
+ if (requestStartPrepare == null) {
+ return lst;
+ } else {
+ if (parseStartPrepare == null) {
+ lst.add(genTraceRecord("Read_SQL", requestStartPrepare.getTimestamp()));
+ return lst;
+ } else {
+ lst.add(genTraceRecord("Read_SQL", requestStartPrepare.getTimestamp(), parseStartPrepare.getTimestamp()));
+ lst.add(genTraceRecord("Parse_SQL", parseStartPrepare.getTimestamp()));
+ return lst;
+ }
+ }
+ }
+ lst.add(genTraceRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp()));
+
+ if (routeStart == null) {
+ lst.add(genTraceRecord("Parse_SQL", parseStart.getTimestamp()));
+ return lst;
+ } else {
+ lst.add(genTraceRecord("Parse_SQL", parseStart.getTimestamp(), routeStart.getTimestamp()));
+ }
+
+ if (preExecuteStart == null) {
+ lst.add(genTraceRecord("Route_Calculation", routeStart.getTimestamp()));
+ return lst;
+ } else {
+ lst.add(genTraceRecord("Route_Calculation", routeStart.getTimestamp(), preExecuteStart.getTimestamp()));
+ }
+
+ if (preExecuteEnd == null) {
+ lst.add(genTraceRecord("Prepare_to_Push/Optimize", preExecuteStart.getTimestamp()));
+ return lst;
+ } else {
+ lst.add(genTraceRecord("Prepare_to_Push/Optimize", preExecuteStart.getTimestamp(), preExecuteEnd.getTimestamp()));
+ }
+ if (simpleHandler != null) {
+ genRunningSimpleResults(lst);
+ return lst;
+ } else if (builder != null) {
+ genRunningComplexQueryResults(lst);
+ return lst;
+ } else if (subQuery) {
+ lst.add(genTraceRecord("Doing_SubQuery", preExecuteEnd.getTimestamp()));
+ return lst;
+ } else if (dataNodes == null || complexQuery) {
+ lst.add(genTraceRecord("Generate_Query_Explain", preExecuteEnd.getTimestamp()));
+ return lst;
+ } else {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("not support trace this query or unfinished");
+ }
+ return lst;
+ }
+ }
+
public List genTraceResult() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("start genTraceResult");
@@ -190,6 +284,76 @@ public List genTraceResult() {
return lst;
}
+ private void genRunningComplexQueryResults(List lst) {
+ List results = ComplexQueryPlanUtil.getComplexQueryResult(builder);
+ long lastChildFinished = preExecuteEnd.getTimestamp();
+ for (ReferenceHandlerInfo result : results) {
+ DMLResponseHandler handler = result.getHandler();
+ if (handler instanceof BaseSelectHandler) {
+ Map fetchStartRecordMap = connReceivedMap.get(handler);
+ if (fetchStartRecordMap == null) {
+ if (!result.isNestLoopQuery()) {
+ lst.add(genTraceRecord("Execute_SQL", lastChildFinished, result.getName(), result.getRefOrSQL())); // lastChildFinished may is Long.MAX_VALUE
+ } else {
+ lst.add(genTraceRecord("Generate_New_Query", lastChildFinished)); // lastChildFinished may is Long.MAX_VALUE
+ }
+ lst.add(genTraceRecord("Fetch_result", result.getName(), result.getRefOrSQL()));
+ } else {
+ TraceRecord fetchStartRecord = fetchStartRecordMap.values().iterator().next();
+ if (!result.isNestLoopQuery()) {
+ lst.add(genTraceRecord("Execute_SQL", lastChildFinished, fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
+ } else {
+ TraceRecord handlerStart = recordStartMap.get(handler);
+ TraceRecord handlerEnd = recordEndMap.get(handler);
+ if (handlerStart == null) {
+ lst.add(genTraceRecord("Generate_New_Query", lastChildFinished)); // lastChildFinished may is Long.MAX_VALUE
+ } else if (handlerEnd == null) {
+ lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp()));
+ lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), result.getName(), result.getRefOrSQL()));
+ } else {
+ lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp()));
+ lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
+ }
+ }
+ Map fetchEndRecordMap = connFinishedMap.get(handler);
+ if (fetchEndRecordMap == null) {
+ lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
+ } else {
+ TraceRecord fetchEndRecord = fetchEndRecordMap.values().iterator().next();
+ lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
+ }
+ }
+ } else if (handler instanceof OutputHandler) {
+ TraceRecord startWrite = recordStartMap.get(handler);
+ if (startWrite == null) {
+ lst.add(genTraceRecord("Write_to_Client"));
+ } else if (veryEnd == 0) {
+ lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp()));
+ } else {
+ lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp(), veryEnd));
+ }
+ } else {
+ TraceRecord handlerStart = recordStartMap.get(handler);
+ TraceRecord handlerEnd = recordEndMap.get(handler);
+ if (handlerStart == null) {
+ lst.add(genTraceRecord(result.getType()));
+ } else if (handlerEnd == null) {
+ lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), result.getName(), result.getRefOrSQL()));
+ } else {
+ lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
+ }
+
+ if (handler.getNextHandler() == null) {
+ if (handlerEnd != null) {
+ lastChildFinished = Math.max(lastChildFinished, handlerEnd.getTimestamp());
+ } else {
+ lastChildFinished = Long.MAX_VALUE;
+ }
+ }
+ }
+ }
+ }
+
private boolean genComplexQueryResults(List lst) {
lst.add(genTraceRecord("Try_Route_Calculation", routeStart.getTimestamp(), preExecuteStart.getTimestamp()));
lst.add(genTraceRecord("Try_to_Optimize", preExecuteStart.getTimestamp(), preExecuteEnd.getTimestamp()));
@@ -291,6 +455,98 @@ private boolean genSimpleResults(List lst) {
return false;
}
+ private void genRunningSimpleResults(List lst) {
+ Map connFetchStartMap = connReceivedMap.get(simpleHandler);
+
+ Set receivedNode = new HashSet<>();
+ long minFetchStart = Long.MAX_VALUE;
+ long maxFetchEnd = 0;
+ if (connFetchStartMap != null) {
+ Map connFetchEndMap = connFinishedMap.get(simpleHandler);
+ List executeList = new ArrayList<>(connFetchStartMap.size());
+ List fetchList = new ArrayList<>(connFetchStartMap.size());
+ for (Map.Entry fetchStart : connFetchStartMap.entrySet()) {
+ TraceRecord fetchStartRecord = fetchStart.getValue();
+ receivedNode.add(fetchStartRecord.getDataNode());
+ minFetchStart = Math.min(minFetchStart, fetchStartRecord.getTimestamp());
+ executeList.add(genTraceRecord("Execute_SQL", preExecuteEnd.getTimestamp(), fetchStartRecord.getTimestamp(), fetchStartRecord.getDataNode(), fetchStartRecord.getRef()));
+ if (connFetchEndMap == null) {
+ fetchList.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchStartRecord.getDataNode(), fetchStartRecord.getRef()));
+ } else {
+ TraceRecord fetchEndRecord = connFetchEndMap.get(fetchStart.getKey());
+ if (fetchEndRecord == null) {
+ fetchList.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchStartRecord.getDataNode(), fetchStartRecord.getRef()));
+ } else {
+ fetchList.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), fetchStartRecord.getDataNode(), fetchStartRecord.getRef()));
+ maxFetchEnd = Math.max(maxFetchEnd, fetchEndRecord.getTimestamp());
+ }
+ }
+ }
+ lst.addAll(executeList);
+ if (receivedNode.size() != dataNodes.length) {
+ for (RouteResultsetNode dataNode : dataNodes) {
+ if (!receivedNode.contains(dataNode.getName())) {
+ lst.add(genTraceRecord("Execute_SQL", preExecuteEnd.getTimestamp(), dataNode.getName(), dataNode.getStatement()));
+ fetchList.add(genTraceRecord("Fetch_result", dataNode.getName(), dataNode.getStatement()));
+ }
+ }
+ }
+ lst.addAll(fetchList);
+ } else {
+ for (RouteResultsetNode dataNode : dataNodes) {
+ if (!receivedNode.contains(dataNode.getName())) {
+ lst.add(genTraceRecord("Execute_SQL", preExecuteEnd.getTimestamp(), dataNode.getName(), dataNode.getStatement()));
+ lst.add(genTraceRecord("Fetch_result", dataNode.getName(), dataNode.getStatement()));
+ }
+ }
+ }
+ if (adtCommitBegin != null) {
+ lst.add(genTraceRecord("Distributed_Transaction_Prepare", maxFetchEnd, adtCommitBegin.getTimestamp()));
+ lst.add(genTraceRecord("Distributed_Transaction_Commit", adtCommitBegin.getTimestamp(), adtCommitEnd.getTimestamp()));
+ }
+ if (minFetchStart == Long.MAX_VALUE) {
+ lst.add(genTraceRecord("Write_to_Client"));
+ } else if (veryEnd == 0) {
+ lst.add(genTraceRecord("Write_to_Client", minFetchStart));
+ } else {
+ lst.add(genTraceRecord("Write_to_Client", minFetchStart, veryEnd));
+ }
+ }
+
+ private String[] genTraceRecord(String operation, long start) {
+ return genTraceRecord(operation, start, "-", "-");
+
+ }
+
+ private String[] genTraceRecord(String operation, long start, String dataNode, String ref) {
+ if (start == Long.MAX_VALUE) {
+ return genTraceRecord(operation, dataNode, ref);
+ }
+ String[] readQuery = new String[6];
+ readQuery[0] = operation;
+ readQuery[1] = nanoToMilliSecond(start - veryStart);
+ readQuery[2] = "unfinished";
+ readQuery[3] = "unknown";
+ readQuery[4] = dataNode;
+ readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " ");
+ return readQuery;
+ }
+
+ private String[] genTraceRecord(String operation, String dataNode, String ref) {
+ String[] readQuery = new String[6];
+ readQuery[0] = operation;
+ readQuery[1] = "not started";
+ readQuery[2] = "unfinished";
+ readQuery[3] = "unknown";
+ readQuery[4] = dataNode;
+ readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " ");
+ return readQuery;
+ }
+
+ private String[] genTraceRecord(String operation) {
+ return genTraceRecord(operation, "-", "-");
+ }
+
private String[] genTraceRecord(String operation, long start, long end) {
return genTraceRecord(operation, start, end, "-", "-");
}