From 17b0580f34cc322636352a351902540719e74873 Mon Sep 17 00:00:00 2001 From: "tiger.yan" Date: Thu, 25 Apr 2019 21:23:32 -0500 Subject: [PATCH 1/5] split merge hander #1139 (#1140) --- .../handler/builder/BaseHandlerBuilder.java | 16 +- .../nio/handler/builder/HandlerBuilder.java | 4 +- .../builder/NoNameNodeHandlerBuilder.java | 4 +- .../query/impl/MultiNodeEasyMergeHandler.java | 112 ++++++++ .../impl/MultiNodeMergeAndOrderHandler.java | 241 ++++++++++++++++ .../query/impl/MultiNodeMergeHandler.java | 260 +----------------- .../dble/plan/util/ComplexQueryPlanUtil.java | 12 +- 7 files changed, 386 insertions(+), 263 deletions(-) create mode 100644 src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java create mode 100644 src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java index 0229107658..7d5b050009 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java @@ -9,8 +9,8 @@ import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.GlobalVisitor; import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.*; -import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.AggregateHandler; +import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.AllAnySubQueryHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.InSubQueryHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.SingleRowSubQueryHandler; @@ -186,8 +186,8 @@ protected final void noShardBuild() { } } - MultiNodeMergeHandler mh = new MultiNodeMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(), - session, null); + MultiNodeMergeHandler mh = new MultiNodeEasyMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(), + session); addHandler(mh); } @@ -391,11 +391,15 @@ public static long getSequenceId() { protected void buildMergeHandler(PlanNode planNode, RouteResultsetNode[] rrssArray) { hBuilder.checkRRSs(rrssArray); - MultiNodeMergeHandler mh = null; List orderBys = planNode.getGroupBys().size() > 0 ? planNode.getGroupBys() : planNode.getOrderBys(); + boolean isEasyMerge = rrssArray.length == 1 || (orderBys == null || orderBys.size() == 0); - mh = new MultiNodeMergeHandler(getSequenceId(), rrssArray, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session, - orderBys); + MultiNodeMergeHandler mh; + if (isEasyMerge) { + mh = new MultiNodeEasyMergeHandler(getSequenceId(), rrssArray, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session); + } else { + mh = new MultiNodeMergeAndOrderHandler(getSequenceId(), rrssArray, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session, orderBys); + } addHandler(mh); } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java index f4f9dd3298..3ee368da0c 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java @@ -69,8 +69,8 @@ public BaseHandlerBuilder build() throws Exception { //set slave only into rrsNode for (DMLResponseHandler startHandler : fh.getMerges()) { MultiNodeMergeHandler mergeHandler = (MultiNodeMergeHandler) startHandler; - for (BaseSelectHandler bshandler : mergeHandler.getExeHandlers()) { - bshandler.getRrss().setRunOnSlave(this.session.getComplexRrs().getRunOnSlave()); + for (BaseSelectHandler baseHandler : mergeHandler.getExeHandlers()) { + baseHandler.getRrss().setRunOnSlave(this.session.getComplexRrs().getRunOnSlave()); } } session.endComplexRoute(); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/NoNameNodeHandlerBuilder.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/NoNameNodeHandlerBuilder.java index 7512117e96..65095db31c 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/NoNameNodeHandlerBuilder.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/NoNameNodeHandlerBuilder.java @@ -7,6 +7,7 @@ import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.PushDownVisitor; import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; +import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeEasyMergeHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler; import com.actiontech.dble.config.model.SchemaConfig; import com.actiontech.dble.plan.node.NoNameNode; @@ -54,8 +55,7 @@ public void buildOwn() { String randomDatenode = getRandomNode(schemaConfig.getAllDataNodes()); RouteResultsetNode[] rrss = new RouteResultsetNode[]{new RouteResultsetNode(randomDatenode, ServerParse.SELECT, sql)}; hBuilder.checkRRSs(rrss); - MultiNodeMergeHandler mh = new MultiNodeMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(), - session, null); + MultiNodeMergeHandler mh = new MultiNodeEasyMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session); addHandler(mh); } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java new file mode 100644 index 0000000000..ef8645c17e --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2016-2019 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.backend.mysql.nio.handler.query.impl; + +import com.actiontech.dble.backend.BackendConnection; +import com.actiontech.dble.backend.mysql.nio.MySQLConnection; +import com.actiontech.dble.net.mysql.FieldPacket; +import com.actiontech.dble.net.mysql.RowDataPacket; +import com.actiontech.dble.route.RouteResultsetNode; +import com.actiontech.dble.server.NonBlockingSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * mergeHandler will merge data,if contains aggregate function,use group by handler + * + * @author ActionTech + */ +public class MultiNodeEasyMergeHandler extends MultiNodeMergeHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeEasyMergeHandler.class); + + public MultiNodeEasyMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session) { + super(id, route, autocommit, session); + this.merges.add(this); + } + + @Override + public void execute() throws Exception { + synchronized (exeHandlers) { + if (terminate.get()) + return; + for (BaseSelectHandler exeHandler : exeHandlers) { + session.setHandlerStart(exeHandler); //base start execute + MySQLConnection exeConn = exeHandler.initConnection(); + if (exeConn != null) { + exeConn.setComplexQuery(true); + exeHandler.execute(exeConn); + } + } + } + } + + @Override + public void fieldEofResponse(byte[] header, List fields, List fieldPackets, byte[] eof, + boolean isLeft, BackendConnection conn) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(conn.toString() + "'s field is reached."); + } + session.setHandlerStart(this); + // if terminated + if (terminate.get()) { + return; + } + lock.lock(); // for combine + try { + if (this.fieldPackets.isEmpty()) { + this.fieldPackets = fieldPackets; + nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, conn); + } + startEasyMerge(); + if (++reachedConCount == route.length) { + session.allBackendConnReceive(); + } + } finally { + lock.unlock(); + } + } + + @Override + public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) { + if (terminate.get()) + return true; + return nextHandler.rowResponse(null, rowPacket, this.isLeft, conn); + } + + @Override + public void rowEofResponse(byte[] data, boolean isLeft, BackendConnection conn) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(conn.toString() + " 's rowEof is reached."); + } + + if (this.terminate.get()) + return; + lock.lock(); + try { + if (reachedConCount == route.length) { + session.setHandlerEnd(this); + nextHandler.rowEofResponse(null, this.isLeft, conn); + } + } finally { + lock.unlock(); + } + } + + @Override + protected void ownThreadJob(Object... objects) { + } + + @Override + protected void terminateThread() throws Exception { + recycleConn(); + } + + @Override + protected void recycleResources() { + } +} diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java new file mode 100644 index 0000000000..bc89303e3f --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java @@ -0,0 +1,241 @@ +/* + * Copyright (C) 2016-2019 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.backend.mysql.nio.handler.query.impl; + +import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.BackendConnection; +import com.actiontech.dble.backend.mysql.nio.MySQLConnection; +import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap; +import com.actiontech.dble.backend.mysql.nio.handler.util.HeapItem; +import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator; +import com.actiontech.dble.net.mysql.FieldPacket; +import com.actiontech.dble.net.mysql.RowDataPacket; +import com.actiontech.dble.plan.Order; +import com.actiontech.dble.route.RouteResultsetNode; +import com.actiontech.dble.server.NonBlockingSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * mergeHandler will merge data,if contains aggregate function,use group by handler + * + * @author ActionTech + */ +public class MultiNodeMergeAndOrderHandler extends MultiNodeMergeHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeMergeAndOrderHandler.class); + + private final int queueSize; + // map;conn->blocking queue.if receive row packet, add to the queue,if receive rowEof packet, add NullHeapItem into queue; + private Map> queues; + private List orderBys; + private RowDataComparator rowComparator; + private volatile boolean noNeedRows = false; + + public MultiNodeMergeAndOrderHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session, + List orderBys) { + super(id, route, autocommit, session); + this.orderBys = orderBys; + this.queueSize = DbleServer.getInstance().getConfig().getSystem().getMergeQueueSize(); + this.queues = new ConcurrentHashMap<>(); + this.merges.add(this); + } + + @Override + public void execute() throws Exception { + synchronized (exeHandlers) { + if (terminate.get()) + return; + for (BaseSelectHandler exeHandler : exeHandlers) { + session.setHandlerStart(exeHandler); //base start execute + MySQLConnection exeConn = exeHandler.initConnection(); + if (exeConn != null) { + exeConn.setComplexQuery(true); + queues.put(exeConn, new LinkedBlockingQueue<>(queueSize)); + exeHandler.execute(exeConn); + } + } + } + } + + @Override + public void fieldEofResponse(byte[] header, List fields, List fieldPackets, byte[] eof, + boolean isLeft, BackendConnection conn) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(conn.toString() + "'s field is reached."); + } + session.setHandlerStart(this); + // if terminated + if (terminate.get()) { + return; + } + lock.lock(); // for combine + try { + if (this.fieldPackets.isEmpty()) { + this.fieldPackets = fieldPackets; + rowComparator = new RowDataComparator(this.fieldPackets, orderBys, this.isAllPushDown(), this.type()); + nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, conn); + } + if (++reachedConCount == route.length) { + session.allBackendConnReceive(); + startOwnThread(); + } + } finally { + lock.unlock(); + } + } + + @Override + public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) { + if (terminate.get() || noNeedRows) + return true; + + MySQLConnection mySQLConn = (MySQLConnection) conn; + BlockingQueue queue = queues.get(mySQLConn); + if (queue == null) + return true; + HeapItem item = new HeapItem(row, rowPacket, mySQLConn); + try { + queue.put(item); + } catch (InterruptedException e) { + //ignore error + } + return false; + } + + @Override + public void rowEofResponse(byte[] data, boolean isLeft, BackendConnection conn) { + MySQLConnection mySQLConn = (MySQLConnection) conn; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(mySQLConn.toString() + " 's rowEof is reached."); + } + + if (this.terminate.get()) + return; + BlockingQueue queue = queues.get(mySQLConn); + if (queue == null) + return; + try { + queue.put(HeapItem.nullItem()); + } catch (InterruptedException e) { + //ignore error + } + } + + @Override + protected void ownThreadJob(Object... objects) { + try { + ArrayMinHeap heap = new ArrayMinHeap<>(new Comparator() { + @Override + public int compare(HeapItem o1, HeapItem o2) { + RowDataPacket row1 = o1.getRowPacket(); + RowDataPacket row2 = o2.getRowPacket(); + if (row1 == null || row2 == null) { + if (row1 == row2) + return 0; + if (row1 == null) + return -1; + return 1; + } + return rowComparator.compare(row1, row2); + } + }); + // init heap + for (Entry> entry : queues.entrySet()) { + HeapItem firstItem = entry.getValue().take(); + heap.add(firstItem); + } + while (!heap.isEmpty()) { + if (terminate.get()) + return; + HeapItem top = heap.peak(); + if (top.isNullItem()) { + heap.poll(); + } else { + BlockingQueue topItemQueue = queues.get(top.getIndex()); + HeapItem item = topItemQueue.take(); + heap.replaceTop(item); + if (nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), this.isLeft, top.getIndex())) { + noNeedRows = true; + while (!heap.isEmpty()) { + HeapItem itemToDiscard = heap.poll(); + if (!itemToDiscard.isNullItem()) { + BlockingQueue discardQueue = queues.get(itemToDiscard.getIndex()); + while (true) { + if (discardQueue.take().isNullItem() || terminate.get()) { + break; + } + } + } + } + } + } + } + if (LOGGER.isDebugEnabled()) { + String executeQueries = getRoutesSql(route); + LOGGER.debug(executeQueries + " heap send eof: "); + } + session.setHandlerEnd(this); + nextHandler.rowEofResponse(null, this.isLeft, queues.keySet().iterator().next()); + } catch (Exception e) { + String msg = "Merge thread error, " + e.getLocalizedMessage(); + LOGGER.info(msg, e); + session.onQueryError(msg.getBytes()); + } + } + + @Override + protected void terminateThread() throws Exception { + for (Entry> entry : this.queues.entrySet()) { + // add EOF to signal atoMerge thread + entry.getValue().clear(); + entry.getValue().put(new HeapItem(null, null, entry.getKey())); + } + recycleConn(); + } + + @Override + protected void recycleResources() { + Iterator>> iterator = this.queues.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + // fair lock queue,poll for clear + while (true) { + if (entry.getValue().poll() == null) { + break; + } + } + iterator.remove(); + } + } + + private String getRoutesSql(RouteResultsetNode[] nodes) { + StringBuilder sb = new StringBuilder(); + sb.append('{'); + Map> sqlMap = new HashMap<>(); + for (RouteResultsetNode rrss : nodes) { + String sql = rrss.getStatement(); + if (!sqlMap.containsKey(sql)) { + List rrssList = new ArrayList<>(); + rrssList.add(rrss); + sqlMap.put(sql, rrssList); + } else { + List rrssList = sqlMap.get(sql); + rrssList.add(rrss); + } + } + for (Entry> entry : sqlMap.entrySet()) { + sb.append(entry.getKey()).append(entry.getValue()).append(';'); + } + sb.append('}'); + return sb.toString(); + } +} diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java index 839c2bfb11..a3a7ca573d 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java @@ -5,29 +5,15 @@ package com.actiontech.dble.backend.mysql.nio.handler.query.impl; -import com.actiontech.dble.DbleServer; -import com.actiontech.dble.backend.BackendConnection; -import com.actiontech.dble.backend.mysql.nio.MySQLConnection; import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.OwnThreadDMLHandler; -import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap; -import com.actiontech.dble.backend.mysql.nio.handler.util.HeapItem; -import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator; import com.actiontech.dble.config.ErrorCode; -import com.actiontech.dble.net.mysql.FieldPacket; -import com.actiontech.dble.net.mysql.RowDataPacket; -import com.actiontech.dble.plan.Order; import com.actiontech.dble.plan.common.exception.MySQLOutPutException; import com.actiontech.dble.route.RouteResultsetNode; import com.actiontech.dble.server.NonBlockingSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.locks.ReentrantLock; /** @@ -35,23 +21,14 @@ * * @author ActionTech */ -public class MultiNodeMergeHandler extends OwnThreadDMLHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeMergeHandler.class); +public abstract class MultiNodeMergeHandler extends OwnThreadDMLHandler { - private final int queueSize; - private final ReentrantLock lock; - private final List exeHandlers; - // map;conn->blocking queue.if receive row packet, add to the queue,if receive rowEof packet, add NullHeapItem into queue; - private Map> queues; - private List orderBys; - private RowDataComparator rowComparator; - private RouteResultsetNode[] route; - private int reachedConCount; - private boolean isEasyMerge; - private volatile boolean noNeedRows = false; + protected final ReentrantLock lock; + final List exeHandlers; + protected RouteResultsetNode[] route; + int reachedConCount; - public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session, - List orderBys) { + public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session) { super(id, session); this.exeHandlers = new ArrayList<>(); this.lock = new ReentrantLock(); @@ -63,204 +40,14 @@ public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autoco this.exeHandlers.add(exeHandler); } this.route = route; - this.orderBys = orderBys; - this.queueSize = DbleServer.getInstance().getConfig().getSystem().getMergeQueueSize(); - this.isEasyMerge = route.length == 1 || (orderBys == null || orderBys.size() == 0); - this.queues = new ConcurrentHashMap<>(); - this.merges.add(this); } + public abstract void execute() throws Exception; + public List getExeHandlers() { return exeHandlers; } - public void execute() throws Exception { - synchronized (exeHandlers) { - if (terminate.get()) - return; - for (BaseSelectHandler exeHandler : exeHandlers) { - session.setHandlerStart(exeHandler); //base start execute - MySQLConnection exeConn = exeHandler.initConnection(); - if (exeConn != null) { - exeConn.setComplexQuery(true); - queues.put(exeConn, new LinkedBlockingQueue(queueSize)); - exeHandler.execute(exeConn); - } - } - } - } - - @Override - public void fieldEofResponse(byte[] header, List fields, List fieldPackets, byte[] eof, - boolean isLeft, BackendConnection conn) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(conn.toString() + "'s field is reached."); - } - session.setHandlerStart(this); - // if terminated - if (terminate.get()) { - return; - } - lock.lock(); // for combine - try { - if (this.fieldPackets.isEmpty()) { - this.fieldPackets = fieldPackets; - rowComparator = makeRowDataSorter(); - nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, conn); - } - if (isEasyMerge) { - startEasyMerge(); - } else { - if (++reachedConCount == route.length) { - session.allBackendConnReceive(); - startOwnThread(); - } - } - } finally { - lock.unlock(); - } - } - - @Override - public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) { - if (terminate.get() || noNeedRows) - return true; - - if (isEasyMerge) { - nextHandler.rowResponse(null, rowPacket, this.isLeft, conn); - } else { - MySQLConnection mySQLConn = (MySQLConnection) conn; - BlockingQueue queue = queues.get(mySQLConn); - if (queue == null) - return true; - HeapItem item = new HeapItem(row, rowPacket, mySQLConn); - try { - queue.put(item); - } catch (InterruptedException e) { - //ignore error - } - } - return false; - } - - @Override - public void rowEofResponse(byte[] data, boolean isLeft, BackendConnection conn) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(conn.toString() + " 's rowEof is reached."); - } - - if (this.terminate.get()) - return; - if (isEasyMerge) { - lock.lock(); - try { - if (++reachedConCount == route.length) { - session.setHandlerEnd(this); - nextHandler.rowEofResponse(null, this.isLeft, conn); - } - } finally { - lock.unlock(); - } - } else { - BlockingQueue queue = queues.get(conn); - if (queue == null) - return; - try { - queue.put(HeapItem.nullItem()); - } catch (InterruptedException e) { - //ignore error - } - } - } - - @Override - protected void ownThreadJob(Object... objects) { - try { - ArrayMinHeap heap = new ArrayMinHeap<>(new Comparator() { - - @Override - public int compare(HeapItem o1, HeapItem o2) { - RowDataPacket row1 = o1.getRowPacket(); - RowDataPacket row2 = o2.getRowPacket(); - if (row1 == null || row2 == null) { - if (row1 == row2) - return 0; - if (row1 == null) - return -1; - return 1; - } - return rowComparator.compare(row1, row2); - } - }); - // init heap - for (Map.Entry> entry : queues.entrySet()) { - HeapItem firstItem = entry.getValue().take(); - heap.add(firstItem); - } - while (!heap.isEmpty()) { - if (terminate.get()) - return; - HeapItem top = heap.peak(); - if (top.isNullItem()) { - heap.poll(); - } else { - BlockingQueue topItemQueue = queues.get(top.getIndex()); - HeapItem item = topItemQueue.take(); - heap.replaceTop(item); - if (nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), this.isLeft, top.getIndex())) { - noNeedRows = true; - while (!heap.isEmpty()) { - HeapItem itemToDiscard = heap.poll(); - if (!itemToDiscard.isNullItem()) { - BlockingQueue discardQueue = queues.get(itemToDiscard.getIndex()); - while (true) { - if (discardQueue.take().isNullItem() || terminate.get()) { - break; - } - } - } - } - } - } - } - if (LOGGER.isDebugEnabled()) { - String executeQueries = getRoutesSql(route); - LOGGER.debug(executeQueries + " heap send eof: "); - } - session.setHandlerEnd(this); - nextHandler.rowEofResponse(null, this.isLeft, queues.keySet().iterator().next()); - } catch (Exception e) { - String msg = "Merge thread error, " + e.getLocalizedMessage(); - LOGGER.info(msg, e); - session.onQueryError(msg.getBytes()); - } - } - - @Override - protected void terminateThread() throws Exception { - for (Entry> entry : this.queues.entrySet()) { - // add EOF to signal atoMerge thread - entry.getValue().clear(); - entry.getValue().put(new HeapItem(null, null, entry.getKey())); - } - recycleConn(); - } - - @Override - protected void recycleResources() { - Iterator>> iterator = this.queues.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - // fair lock queue,poll for clear - while (true) { - if (entry.getValue().poll() == null) { - break; - } - } - iterator.remove(); - } - } - protected void recycleConn() { synchronized (exeHandlers) { for (BaseSelectHandler exeHandler : exeHandlers) { @@ -284,36 +71,9 @@ private void terminatePreHandler(DMLResponseHandler handler) { } } - private RowDataComparator makeRowDataSorter() { - if (!isEasyMerge) - return new RowDataComparator(this.fieldPackets, orderBys, this.isAllPushDown(), this.type()); - return null; - } - @Override public HandlerType type() { return HandlerType.MERGE; } - private String getRoutesSql(RouteResultsetNode[] nodes) { - StringBuilder sb = new StringBuilder(); - sb.append('{'); - Map> sqlMap = new HashMap<>(); - for (RouteResultsetNode rrss : nodes) { - String sql = rrss.getStatement(); - if (!sqlMap.containsKey(sql)) { - List rrssList = new ArrayList<>(); - rrssList.add(rrss); - sqlMap.put(sql, rrssList); - } else { - List rrssList = sqlMap.get(sql); - rrssList.add(rrss); - } - } - for (Map.Entry> entry : sqlMap.entrySet()) { - sb.append(entry.getKey()).append(entry.getValue()).append(';'); - } - sb.append('}'); - return sb.toString(); - } } diff --git a/src/main/java/com/actiontech/dble/plan/util/ComplexQueryPlanUtil.java b/src/main/java/com/actiontech/dble/plan/util/ComplexQueryPlanUtil.java index d1b5ed9dd2..ec5b45b242 100644 --- a/src/main/java/com/actiontech/dble/plan/util/ComplexQueryPlanUtil.java +++ b/src/main/java/com/actiontech/dble/plan/util/ComplexQueryPlanUtil.java @@ -8,8 +8,8 @@ import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder; import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.*; -import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.AggregateHandler; +import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.join.JoinHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.join.NotInHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.AllAnySubQueryHandler; @@ -74,10 +74,16 @@ private static String buildHandlerTree(DMLResponseHandler endHandler, Map mergeList = new ArrayList<>(); mergeList.addAll(((MultiNodeMergeHandler) startHandler).getExeHandlers()); - String mergeNode = genHandlerName("MERGE", nameMap); - ReferenceHandlerInfo refInfo = new ReferenceHandlerInfo(mergeNode, "MERGE", mergeHandler); + String mergeNode = genHandlerName(mergeName, nameMap); + ReferenceHandlerInfo refInfo = new ReferenceHandlerInfo(mergeNode, mergeName, mergeHandler); handlerMap.put(mergeHandler, refInfo); refMap.put(mergeNode, refInfo); for (BaseSelectHandler exeHandler : mergeList) { From ca67bc2d2b133086644f01e7161c7aa9949c7f00 Mon Sep 17 00:00:00 2001 From: yanhuqing Date: Sun, 28 Apr 2019 10:30:42 +0800 Subject: [PATCH 2/5] reachedConCount thread safe #1139 --- .../nio/handler/query/impl/MultiNodeEasyMergeHandler.java | 3 ++- .../mysql/nio/handler/query/impl/MultiNodeMergeHandler.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java index ef8645c17e..7fcce41589 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java @@ -23,6 +23,7 @@ */ public class MultiNodeEasyMergeHandler extends MultiNodeMergeHandler { private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeEasyMergeHandler.class); + private int rowEndConCount = 0; public MultiNodeEasyMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session) { super(id, route, autocommit, session); @@ -88,7 +89,7 @@ public void rowEofResponse(byte[] data, boolean isLeft, BackendConnection conn) return; lock.lock(); try { - if (reachedConCount == route.length) { + if (++rowEndConCount == route.length) { session.setHandlerEnd(this); nextHandler.rowEofResponse(null, this.isLeft, conn); } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java index a3a7ca573d..adfd70df44 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java @@ -26,7 +26,7 @@ public abstract class MultiNodeMergeHandler extends OwnThreadDMLHandler { protected final ReentrantLock lock; final List exeHandlers; protected RouteResultsetNode[] route; - int reachedConCount; + int reachedConCount = 0; public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session) { super(id, session); From 3e1ef65389be352d96ce0fc32ba27cb338448674 Mon Sep 17 00:00:00 2001 From: Lordess John Date: Thu, 28 May 2020 10:04:33 +0800 Subject: [PATCH 3/5] import feature from 2.19.11.0/rel to 2.19.03/lts * #1564 [Improve] add STAGE for show @@connection.sql --- .../nio/handler/query/DMLResponseHandler.java | 2 +- .../query/impl/MultiNodeEasyMergeHandler.java | 5 ++ .../impl/MultiNodeMergeAndOrderHandler.java | 5 ++ .../query/impl/MultiNodeMergeHandler.java | 5 -- .../dble/manager/ManagerConnection.java | 5 ++ .../manager/response/ShowConnectionSQL.java | 12 +++- .../dble/net/FrontendConnection.java | 3 + .../dble/server/NonBlockingSession.java | 23 +++++++ .../dble/server/ServerConnection.java | 5 ++ .../actiontech/dble/server/SessionStage.java | 62 +++++++++++++++++++ 10 files changed, 120 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/actiontech/dble/server/SessionStage.java diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/DMLResponseHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/DMLResponseHandler.java index 8eafebd834..8bf7c8e7e0 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/DMLResponseHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/DMLResponseHandler.java @@ -11,7 +11,7 @@ public interface DMLResponseHandler extends ResponseHandler { enum HandlerType { - DIRECT, TEMPTABLE, BASESEL, REFRESHFP, MERGE, JOIN, WHERE, GROUPBY, HAVING, ORDERBY, LIMIT, UNION, DISTINCT, SENDMAKER, FINAL, SCALAR_SUB_QUERY, IN_SUB_QUERY, ALL_ANY_SUB_QUERY, RENAME_FIELD + TEMPTABLE, BASESEL, EASY_MERGE, MERGE_AND_ORDER, JOIN, NOT_IN, WHERE, GROUPBY, HAVING, ORDERBY, LIMIT, UNION, DISTINCT, SENDMAKER, FINAL, SCALAR_SUB_QUERY, IN_SUB_QUERY, ALL_ANY_SUB_QUERY, RENAME_FIELD } HandlerType type(); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java index 7fcce41589..27a229aec1 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeEasyMergeHandler.java @@ -110,4 +110,9 @@ protected void terminateThread() throws Exception { @Override protected void recycleResources() { } + + @Override + public HandlerType type() { + return HandlerType.EASY_MERGE; + } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java index bc89303e3f..cc9c7600b3 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java @@ -238,4 +238,9 @@ private String getRoutesSql(RouteResultsetNode[] nodes) { sb.append('}'); return sb.toString(); } + + @Override + public HandlerType type() { + return HandlerType.MERGE_AND_ORDER; + } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java index adfd70df44..b33a668583 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java @@ -71,9 +71,4 @@ private void terminatePreHandler(DMLResponseHandler handler) { } } - @Override - public HandlerType type() { - return HandlerType.MERGE; - } - } diff --git a/src/main/java/com/actiontech/dble/manager/ManagerConnection.java b/src/main/java/com/actiontech/dble/manager/ManagerConnection.java index eda04c993e..a12c0d0d05 100644 --- a/src/main/java/com/actiontech/dble/manager/ManagerConnection.java +++ b/src/main/java/com/actiontech/dble/manager/ManagerConnection.java @@ -41,6 +41,11 @@ public void startProcess() { //do nothing } + @Override + public void markFinished() { + //do nothing + } + @Override public void handle(final byte[] data) { handler.handle(data); diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java index 1949034248..5aea6eec2f 100644 --- a/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java +++ b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java @@ -15,6 +15,7 @@ import com.actiontech.dble.net.mysql.FieldPacket; import com.actiontech.dble.net.mysql.ResultSetHeaderPacket; import com.actiontech.dble.net.mysql.RowDataPacket; +import com.actiontech.dble.server.ServerConnection; import com.actiontech.dble.util.FormatUtil; import com.actiontech.dble.util.LongUtil; import com.actiontech.dble.util.StringUtil; @@ -29,7 +30,7 @@ public final class ShowConnectionSQL { private ShowConnectionSQL() { } - private static final int FIELD_COUNT = 7; + private static final int FIELD_COUNT = 8; private static final ResultSetHeaderPacket HEADER = PacketUtil.getHeader(FIELD_COUNT); private static final FieldPacket[] FIELDS = new FieldPacket[FIELD_COUNT]; private static final EOFPacket EOF = new EOFPacket(); @@ -60,6 +61,9 @@ private ShowConnectionSQL() { FIELDS[i] = PacketUtil.getField("SQL", Fields.FIELD_TYPE_VAR_STRING); FIELDS[i++].setPacketId(++packetId); + FIELDS[i] = PacketUtil.getField("STAGE", Fields.FIELD_TYPE_VAR_STRING); + FIELDS[i].setPacketId(++packetId); + EOF.setPacketId(++packetId); } @@ -113,6 +117,12 @@ private static RowDataPacket getRow(FrontendConnection c, String charset) { executeSql = c.getExecuteSql().length() <= 1024 ? c.getExecuteSql() : c.getExecuteSql().substring(0, 1024); } row.add(StringUtil.encode(executeSql, charset)); + if (c instanceof ServerConnection) { + ServerConnection sc = (ServerConnection) c; + row.add(StringUtil.encode(sc.getSession2().getSessionStage().toString(), charset)); + } else { + row.add(StringUtil.encode("Manager connection", charset)); + } return row; } diff --git a/src/main/java/com/actiontech/dble/net/FrontendConnection.java b/src/main/java/com/actiontech/dble/net/FrontendConnection.java index 6aa731709c..342ffdd503 100755 --- a/src/main/java/com/actiontech/dble/net/FrontendConnection.java +++ b/src/main/java/com/actiontech/dble/net/FrontendConnection.java @@ -211,6 +211,7 @@ public void writeErrMessage(byte id, int vendorCode, String msg) { } protected void writeErrMessage(byte id, int vendorCode, String sqlState, String msg) { + markFinished(); ErrorPacket err = new ErrorPacket(); err.setPacketId(id); err.setErrNo(vendorCode); @@ -223,6 +224,8 @@ protected void writeErrMessage(byte id, int vendorCode, String sqlState, String public abstract void startProcess(); + protected abstract void markFinished(); + public void initDB(byte[] data) { MySQLMessage mm = new MySQLMessage(data); diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java index 3be272f445..40c70a5f26 100644 --- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java @@ -108,6 +108,7 @@ public class NonBlockingSession implements Session { private volatile boolean traceEnable = false; private volatile TraceResult traceResult = new TraceResult(); private volatile RouteResultset complexRrs = null; + private volatile SessionStage sessionStage = SessionStage.Init; public NonBlockingSession(ServerConnection source) { this.source = source; @@ -127,7 +128,9 @@ public ServerConnection getSource() { } void setRequestTime() { + sessionStage = SessionStage.Read_SQL; long requestTime = 0; + if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { requestTime = System.nanoTime(); traceResult.setVeryStartPrepare(requestTime); @@ -158,6 +161,7 @@ void setRequestTime() { } void startProcess() { + sessionStage = SessionStage.Parse_SQL; if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { traceResult.setParseStartPrepare(new TraceRecord(System.nanoTime())); } @@ -168,6 +172,7 @@ void startProcess() { } public void endParse() { + sessionStage = SessionStage.Route_Calculation; if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { traceResult.ready(); traceResult.setRouteStart(new TraceRecord(System.nanoTime())); @@ -180,6 +185,7 @@ public void endParse() { void endRoute(RouteResultset rrs) { + sessionStage = SessionStage.Prepare_to_Push; if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { traceResult.setPreExecuteStart(new TraceRecord(System.nanoTime())); } @@ -212,6 +218,7 @@ public void readyToDeliver() { } public void setPreExecuteEnd() { + sessionStage = SessionStage.Execute_SQL; if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { traceResult.setPreExecuteEnd(new TraceRecord(System.nanoTime())); traceResult.clearConnReceivedMap(); @@ -220,6 +227,7 @@ public void setPreExecuteEnd() { } public void setBackendRequestTime(long backendID) { + sessionStage = SessionStage.First_Node_Fetching_Result; if (!timeCost) { return; } @@ -290,6 +298,7 @@ public void allBackendConnReceive() { } public void setResponseTime(boolean isSuccess) { + sessionStage = SessionStage.Finished; long responseTime = 0; if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { responseTime = System.nanoTime(); @@ -312,7 +321,12 @@ public void setResponseTime(boolean isSuccess) { QueryTimeCostContainer.getInstance().add(queryTimeCost); } + public void setStageFinished() { + sessionStage = SessionStage.Finished; + } + public void setBackendResponseEndTime(MySQLConnection conn) { + sessionStage = SessionStage.First_Node_Fetched_Result; if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { RouteResultsetNode node = (RouteResultsetNode) conn.getAttachment(); ResponseHandler responseHandler = conn.getRespHandler(); @@ -331,6 +345,7 @@ public void setBackendResponseEndTime(MySQLConnection conn) { } public void setBeginCommitTime() { + sessionStage = SessionStage.Distributed_Transaction_Commit; if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { traceResult.setAdtCommitBegin(new TraceRecord(System.nanoTime())); } @@ -349,6 +364,10 @@ public void setHandlerStart(DMLResponseHandler handler) { } public void setHandlerEnd(DMLResponseHandler handler) { + if (handler.getNextHandler() != null) { + DMLResponseHandler next = handler.getNextHandler(); + sessionStage = SessionStage.changeFromHandlerType(next.type()); + } if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { traceResult.addToRecordEndMap(handler, new TraceRecord(System.nanoTime())); } @@ -391,6 +410,10 @@ public boolean isNeedWaitFinished() { return needWaitFinished; } + public SessionStage getSessionStage() { + return sessionStage; + } + /** * SET CANCELABLE STATUS */ diff --git a/src/main/java/com/actiontech/dble/server/ServerConnection.java b/src/main/java/com/actiontech/dble/server/ServerConnection.java index dacd4ac629..7a47803f5e 100644 --- a/src/main/java/com/actiontech/dble/server/ServerConnection.java +++ b/src/main/java/com/actiontech/dble/server/ServerConnection.java @@ -180,6 +180,11 @@ public void startProcess() { session.startProcess(); } + @Override + public void markFinished() { + session.setStageFinished(); + } + public void executeTask() { for (Pair> task : contextTask) { switch (task.getKey()) { diff --git a/src/main/java/com/actiontech/dble/server/SessionStage.java b/src/main/java/com/actiontech/dble/server/SessionStage.java new file mode 100644 index 0000000000..59a2c9d2f5 --- /dev/null +++ b/src/main/java/com/actiontech/dble/server/SessionStage.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2016-2019 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.server; + +import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; + +public enum SessionStage { + Init, + Read_SQL, Parse_SQL, Route_Calculation, Prepare_to_Push, Execute_SQL, First_Node_Fetching_Result, + First_Node_Fetched_Result, Distributed_Transaction_Commit, Finished, Generate_New_Query, + Nested_Loop, Easy_Merge, Merge_and_Order, Join, Not_In, Where_Filter, Aggregate, Having_filter, + Order, Limit, Union, Distinct, Send_Maker, Write_to_Client, Scalar_Sub_Query, In_Sub_Query, All_Any_Sub_Query, Renamed_Filed,; + + public static SessionStage changeFromHandlerType(DMLResponseHandler.HandlerType handlerType) { + switch (handlerType) { + case TEMPTABLE: + return Nested_Loop; + case BASESEL: + return Generate_New_Query; + case EASY_MERGE: + return Easy_Merge; + case MERGE_AND_ORDER: + return Merge_and_Order; + case JOIN: + return Join; + case NOT_IN: + return Not_In; + case WHERE: + return Where_Filter; + case GROUPBY: + return Aggregate; + case HAVING: + return Having_filter; + case ORDERBY: + return Order; + case LIMIT: + return Limit; + case UNION: + return Union; + case DISTINCT: + return Distinct; + case SENDMAKER: + return Send_Maker; + case FINAL: + return Write_to_Client; + case SCALAR_SUB_QUERY: + return Scalar_Sub_Query; + case IN_SUB_QUERY: + return In_Sub_Query; + case ALL_ANY_SUB_QUERY: + return All_Any_Sub_Query; + case RENAME_FIELD: + return Renamed_Filed; + default: + //not happen + } + return Write_to_Client; //not happen + } +} From 74ad660245a44f5b3a8640787486d872be1591ca Mon Sep 17 00:00:00 2001 From: Lordess John Date: Fri, 29 May 2020 07:41:59 +0800 Subject: [PATCH 4/5] import feature from 2.19.11.0/rel to 2.19.03/lts * #1575 [Improve]add an manager command to show recently(doing or done) query detail stage --- .../nio/handler/MultiNodeDdlHandler.java | 2 +- .../mysql/nio/handler/SingleNodeHandler.java | 2 +- .../impl/MultiNodeMergeAndOrderHandler.java | 1 + .../query/impl/MultiNodeMergeHandler.java | 1 + .../dble/manager/handler/ShowHandler.java | 8 + .../manager/response/ShowConnectionSQL.java | 1 - .../response/ShowConnectionSQLStatus.java | 125 +++++++++ .../dble/manager/response/ShowHelp.java | 1 + .../dble/route/parser/ManagerParseShow.java | 102 ++++++- .../dble/server/NonBlockingSession.java | 33 ++- .../dble/server/trace/TraceResult.java | 264 +++++++++++++++++- 11 files changed, 527 insertions(+), 13 deletions(-) create mode 100644 src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQLStatus.java diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java index 037d0de2d8..dde700ebb4 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java @@ -255,7 +255,7 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, BackendConnection c } finishedTest = true; session.setTraceSimpleHandler(handler); - session.setPreExecuteEnd(); + session.setPreExecuteEnd(false); handler.execute(); } catch (Exception e) { LOGGER.warn(String.valueOf(source) + oriRrs, e); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java index d91835a94f..2b5b7e1ab5 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java @@ -112,7 +112,7 @@ private void execute(BackendConnection conn) { TxnLogHelper.putTxnLog(session.getSource(), node.getStatement()); } session.readyToDeliver(); - session.setPreExecuteEnd(); + session.setPreExecuteEnd(false); conn.execute(node, session.getSource(), isAutocommit); } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java index cc9c7600b3..462632151f 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java @@ -47,6 +47,7 @@ public MultiNodeMergeAndOrderHandler(long id, RouteResultsetNode[] route, boolea this.queueSize = DbleServer.getInstance().getConfig().getSystem().getMergeQueueSize(); this.queues = new ConcurrentHashMap<>(); this.merges.add(this); + } @Override diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java index b33a668583..57b1043ef7 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java @@ -40,6 +40,7 @@ public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autoco this.exeHandlers.add(exeHandler); } this.route = route; + session.setRouteResultToTrace(route); } public abstract void execute() throws Exception; diff --git a/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java b/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java index ab678a1879..68693ce6fa 100644 --- a/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java +++ b/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java @@ -223,6 +223,14 @@ public static void handle(String stmt, ManagerConnection c, int offset) { case ManagerParseShow.DDL_STATE: ShowDdlState.execute(c); break; + case ManagerParseShow.CONNECTION_SQL_STATUS: + String id = stmt.substring(rs >>> 8).trim(); + if (StringUtil.isEmpty(id)) { + c.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement"); + } else { + ShowConnectionSQLStatus.execute(c, id); + } + break; default: if (isSupportShow(stmt)) { Iterator iterator = DbleServer.getInstance().getConfig().getDataHosts().values().iterator(); diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java index 5aea6eec2f..b3664a1dbf 100644 --- a/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java +++ b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQL.java @@ -125,5 +125,4 @@ private static RowDataPacket getRow(FrontendConnection c, String charset) { } return row; } - } diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQLStatus.java b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQLStatus.java new file mode 100644 index 0000000000..1f47a73ef4 --- /dev/null +++ b/src/main/java/com/actiontech/dble/manager/response/ShowConnectionSQLStatus.java @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2016-2019 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.manager.response; + +import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.mysql.PacketUtil; +import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.config.Fields; +import com.actiontech.dble.manager.ManagerConnection; +import com.actiontech.dble.net.FrontendConnection; +import com.actiontech.dble.net.NIOProcessor; +import com.actiontech.dble.net.mysql.EOFPacket; +import com.actiontech.dble.net.mysql.FieldPacket; +import com.actiontech.dble.net.mysql.ResultSetHeaderPacket; +import com.actiontech.dble.net.mysql.RowDataPacket; +import com.actiontech.dble.server.ServerConnection; +import com.actiontech.dble.server.status.SlowQueryLog; +import com.actiontech.dble.util.StringUtil; + +import java.nio.ByteBuffer; +import java.util.List; + +public final class ShowConnectionSQLStatus { + private ShowConnectionSQLStatus() { + } + private static final int FIELD_COUNT = 6; + private static final ResultSetHeaderPacket HEADER = PacketUtil.getHeader(FIELD_COUNT); + private static final FieldPacket[] FIELDS = new FieldPacket[FIELD_COUNT]; + private static final EOFPacket EOF = new EOFPacket(); + + static { + int i = 0; + byte packetId = 0; + HEADER.setPacketId(++packetId); + + FIELDS[i] = PacketUtil.getField("OPERATION", Fields.FIELD_TYPE_LONG); + FIELDS[i++].setPacketId(++packetId); + + FIELDS[i] = PacketUtil.getField("START(ms)", Fields.FIELD_TYPE_VAR_STRING); + FIELDS[i++].setPacketId(++packetId); + + FIELDS[i] = PacketUtil.getField("END(ms)", Fields.FIELD_TYPE_VAR_STRING); + FIELDS[i++].setPacketId(++packetId); + + FIELDS[i] = PacketUtil.getField("DURATION(ms)", Fields.FIELD_TYPE_VAR_STRING); + FIELDS[i++].setPacketId(++packetId); + + FIELDS[i] = PacketUtil.getField("DATA_NODE", Fields.FIELD_TYPE_VAR_STRING); + FIELDS[i++].setPacketId(++packetId); + + FIELDS[i] = PacketUtil.getField("SQL/REF", Fields.FIELD_TYPE_VAR_STRING); + FIELDS[i].setPacketId(++packetId); + EOF.setPacketId(++packetId); + } + + public static void execute(ManagerConnection c, String id) { + if (!SlowQueryLog.getInstance().isEnableSlowLog()) { + c.writeErrMessage(ErrorCode.ER_YES, "please enable @@slow_query_log first"); + return; + } + long realId = 0; + try { + realId = Long.parseLong(id); + } catch (NumberFormatException e) { + c.writeErrMessage(ErrorCode.ER_YES, "front_id must be a number"); + return; + } + NIOProcessor[] processors = DbleServer.getInstance().getFrontProcessors(); + FrontendConnection target = null; + for (NIOProcessor p : processors) { + for (FrontendConnection fc : p.getFrontends().values()) { + if (fc != null && fc.getId() == realId) { + target = fc; + break; + } + } + } + if (target == null) { + c.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " doesn't exist"); + return; + } + if (target instanceof ManagerConnection) { + c.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " is a manager connection"); + return; + } + ByteBuffer buffer = c.allocate(); + + // write header + buffer = HEADER.write(buffer, c, true); + + // write fields + for (FieldPacket field : FIELDS) { + buffer = field.write(buffer, c, true); + } + + // write eof + buffer = EOF.write(buffer, c, true); + + // write rows + byte packetId = EOF.getPacketId(); + + List results = ((ServerConnection) target).getSession2().genRunningSQLStage(); + if (results != null) { + for (String[] result : results) { + RowDataPacket row = new RowDataPacket(FIELD_COUNT); + for (int i = 0; i < FIELD_COUNT; i++) { + row.add(StringUtil.encode(result[i], c.getCharset().getResults())); + } + row.setPacketId(++packetId); + buffer = row.write(buffer, c, true); + } + } + + // write last eof + EOFPacket lastEof = new EOFPacket(); + lastEof.setPacketId(++packetId); + buffer = lastEof.write(buffer, c, true); + + // write buffer + c.write(buffer); + } +} diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowHelp.java b/src/main/java/com/actiontech/dble/manager/response/ShowHelp.java index 93c0a21bf7..c9c3fdb909 100644 --- a/src/main/java/com/actiontech/dble/manager/response/ShowHelp.java +++ b/src/main/java/com/actiontech/dble/manager/response/ShowHelp.java @@ -114,6 +114,7 @@ private static RowDataPacket getRow(String stmt, String desc, String charset) { HELPS.put("show @@session", "Report front session details"); HELPS.put("show @@session.xa", "Report front session and associated xa transaction details"); HELPS.put("show @@connection.sql", "Report connection sql"); + HELPS.put("show @@connection.sql.status where FRONT_ID= ?;", "Show current connection sql status and detail"); HELPS.put("show @@sql", "Report SQL list"); // helps.put("show @@sql where id = ?", "Report specify SQL"); HELPS.put("show @@sql.high", "Report Hight Frequency SQL"); diff --git a/src/main/java/com/actiontech/dble/route/parser/ManagerParseShow.java b/src/main/java/com/actiontech/dble/route/parser/ManagerParseShow.java index 5c6e51cd65..949643e06c 100644 --- a/src/main/java/com/actiontech/dble/route/parser/ManagerParseShow.java +++ b/src/main/java/com/actiontech/dble/route/parser/ManagerParseShow.java @@ -38,6 +38,7 @@ private ManagerParseShow() { public static final int TIME_CURRENT = 22; public static final int TIME_STARTUP = 23; public static final int VERSION = 24; + public static final int CONNECTION_SQL_STATUS = 26; public static final int CONNECTION_SQL = 27; public static final int DATANODE_SCHEMA = 28; public static final int DATASOURCE_WHERE = 29; @@ -1604,8 +1605,14 @@ private static int show2ConnectonSQL(String stmt, int offset) { char c1 = stmt.charAt(++offset); char c2 = stmt.charAt(++offset); if ((c1 == 'q' || c1 == 'Q') && (c2 == 'l' || c2 == 'L')) { - if (ParseUtil.isErrorTail(++offset, stmt)) { - return OTHER; + while (stmt.length() > ++offset) { + if (ParseUtil.isSpace(stmt.charAt(offset))) { + continue; + } else if ('.' == stmt.charAt(offset)) { + return show2ConnectonStatusCheck(stmt, offset); + } else { + return OTHER; + } } return CONNECTION_SQL; } @@ -1613,6 +1620,97 @@ private static int show2ConnectonSQL(String stmt, int offset) { return OTHER; } + private static int show2ConnectonStatusCheck(String stmt, int offset) { + if (stmt.length() > offset + "STATUS".length()) { + char c1 = stmt.charAt(++offset); + char c2 = stmt.charAt(++offset); + char c3 = stmt.charAt(++offset); + char c4 = stmt.charAt(++offset); + char c5 = stmt.charAt(++offset); + char c6 = stmt.charAt(++offset); + if ((c1 == 'S' || c1 == 's') && (c2 == 'T' || c2 == 't') && (c3 == 'A' || c3 == 'a') && + (c4 == 'T' || c4 == 't') && (c5 == 'U' || c5 == 'u') && (c6 == 'S' || c6 == 's')) { + while (stmt.length() > ++offset) { + if (ParseUtil.isSpace(stmt.charAt(offset))) { + continue; + } + switch (stmt.charAt(offset)) { + case 'W': + case 'w': + if (!ParseUtil.isSpace(stmt.charAt(offset - 1))) { + return OTHER; + } + return show2ConnectonStatusWhereCheck(stmt, offset); + default: + return OTHER; + } + } + } + } + return OTHER; + } + + private static int show2ConnectonStatusWhereCheck(String stmt, int offset) { + if (stmt.length() > offset + "HERE".length()) { + char c1 = stmt.charAt(++offset); + char c2 = stmt.charAt(++offset); + char c3 = stmt.charAt(++offset); + char c4 = stmt.charAt(++offset); + if ((c1 == 'H' || c1 == 'h') && (c2 == 'E' || c2 == 'e') && (c3 == 'R' || c3 == 'r') && + (c4 == 'E' || c4 == 'e')) { + while (stmt.length() > ++offset) { + if (ParseUtil.isSpace(stmt.charAt(offset))) { + continue; + } + switch (stmt.charAt(offset)) { + case 'f': + case 'F': + if (!ParseUtil.isSpace(stmt.charAt(offset - 1))) { + return OTHER; + } + return show2ConnectonStatusFrontCheck(stmt, offset); + default: + return OTHER; + } + } + } + } + return OTHER; + } + + private static int show2ConnectonStatusFrontCheck(String stmt, int offset) { + if (stmt.length() > offset + "RONT_ID".length()) { + char c1 = stmt.charAt(++offset); + char c2 = stmt.charAt(++offset); + char c3 = stmt.charAt(++offset); + char c4 = stmt.charAt(++offset); + char c5 = stmt.charAt(++offset); + char c6 = stmt.charAt(++offset); + char c7 = stmt.charAt(++offset); + if ((c1 == 'R' || c1 == 'r') && (c2 == 'O' || c2 == 'o') && + (c3 == 'N' || c3 == 'n') && (c4 == 'T' || c4 == 't') && c5 == '_' && + (c6 == 'I' || c6 == 'i') && (c7 == 'D' || c7 == 'd')) { + while (stmt.length() > ++offset) { + if (ParseUtil.isSpace(stmt.charAt(offset))) { + continue; + } + switch (stmt.charAt(offset)) { + case '=': + while (stmt.length() > ++offset) { + if (!ParseUtil.isSpace(stmt.charAt(offset))) { + return (offset << 8) | CONNECTION_SQL_STATUS; + } + } + return OTHER; + default: + return OTHER; + } + } + } + } + return OTHER; + } + // SHOW @@TIME.CURRENT private static int show2TimeCCheck(String stmt, int offset) { if (stmt.length() > offset + "URRENT".length()) { diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java index 40c70a5f26..c5bf1edf1c 100644 --- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java @@ -217,15 +217,22 @@ public void readyToDeliver() { provider.readyToDeliver(source.getId()); } - public void setPreExecuteEnd() { + public void setPreExecuteEnd(boolean isComplexQuery) { sessionStage = SessionStage.Execute_SQL; if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { + traceResult.setComplexQuery(isComplexQuery); traceResult.setPreExecuteEnd(new TraceRecord(System.nanoTime())); traceResult.clearConnReceivedMap(); traceResult.clearConnFlagMap(); } } + public void setSubQuery() { + if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { + traceResult.setSubQuery(true); + } + } + public void setBackendRequestTime(long backendID) { sessionStage = SessionStage.First_Node_Fetching_Result; if (!timeCost) { @@ -381,6 +388,15 @@ public List genTraceResult() { } } + public List genRunningSQLStage() { + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + TraceResult tmpResult = (TraceResult) traceResult.clone(); + return tmpResult.genRunningSQLStage(); + } else { + return null; + } + } + @Override public int getTargetCount() { return target.size(); @@ -458,6 +474,8 @@ public void execute(RouteResultset rrs) { } return; } + + setRouteResultToTrace(rrs.getNodes()); if (this.getSessionXaID() != null && this.xaState == TxState.TX_INITIALIZE_STATE) { this.xaState = TxState.TX_STARTED_STATE; } @@ -482,6 +500,12 @@ public void execute(RouteResultset rrs) { } } + public void setRouteResultToTrace(RouteResultsetNode[] nodes) { + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + traceResult.setDataNodes(nodes); + } + } + private void executeMultiResultSet(RouteResultset rrs) { if (rrs.getSqlType() == ServerParse.DDL) { /* @@ -505,7 +529,7 @@ private void executeMultiResultSet(RouteResultset rrs) { } else if (ServerParse.SELECT == rrs.getSqlType() && rrs.getGroupByCols() != null) { MultiNodeSelectHandler multiNodeSelectHandler = new MultiNodeSelectHandler(rrs, this); setTraceSimpleHandler(multiNodeSelectHandler); - setPreExecuteEnd(); + setPreExecuteEnd(false); readyToDeliver(); if (this.isPrepared()) { multiNodeSelectHandler.setPrepared(true); @@ -522,7 +546,7 @@ private void executeMultiResultSet(RouteResultset rrs) { } else { MultiNodeQueryHandler multiNodeHandler = new MultiNodeQueryHandler(rrs, this); setTraceSimpleHandler(multiNodeHandler); - setPreExecuteEnd(); + setPreExecuteEnd(false); readyToDeliver(); if (this.isPrepared()) { multiNodeHandler.setPrepared(true); @@ -593,8 +617,9 @@ public void executeMultiSelect(RouteResultset rrs) { return; } } - setPreExecuteEnd(); + setPreExecuteEnd(true); if (PlanUtil.containsSubQuery(node)) { + setSubQuery(); final PlanNode finalNode = node; DbleServer.getInstance().getComplexQueryExecutor().execute(new Runnable() { //sub Query build will be blocked, so use ComplexQueryExecutor diff --git a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java index 7476f14d60..5bdd238ecf 100644 --- a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java +++ b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java @@ -13,17 +13,17 @@ import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler; import com.actiontech.dble.plan.util.ComplexQueryPlanUtil; import com.actiontech.dble.plan.util.ReferenceHandlerInfo; +import com.actiontech.dble.route.RouteResultsetNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class TraceResult implements Cloneable { private static final Logger LOGGER = LoggerFactory.getLogger(TraceResult.class); + private boolean prepareFinished = false; private long veryStartPrepare; private long veryStart; private TraceRecord requestStartPrepare; @@ -32,6 +32,7 @@ public class TraceResult implements Cloneable { private TraceRecord parseStart; //requestEnd private TraceRecord routeStart; //parseEnd private TraceRecord preExecuteStart; //routeEnd + private RouteResultsetNode[] dataNodes; private TraceRecord preExecuteEnd; private TraceRecord adtCommitBegin; //auto Distributed Transaction commit begin @@ -45,8 +46,11 @@ public class TraceResult implements Cloneable { private ConcurrentMap recordStartMap = new ConcurrentHashMap<>(); private ConcurrentMap recordEndMap = new ConcurrentHashMap<>(); private long veryEnd; + private boolean complexQuery = false; + private boolean subQuery = false; public void setVeryStartPrepare(long veryStartPrepare) { + prepareFinished = false; this.veryStartPrepare = veryStartPrepare; } @@ -70,6 +74,21 @@ public void setPreExecuteEnd(TraceRecord preExecuteEnd) { this.preExecuteEnd = preExecuteEnd; } + public RouteResultsetNode[] getDataNodes() { + return dataNodes; + } + + public void setDataNodes(RouteResultsetNode[] dataNodes) { + if (this.dataNodes == null) { + this.dataNodes = dataNodes; + } else { + RouteResultsetNode[] tempDataNodes = new RouteResultsetNode[this.dataNodes.length + dataNodes.length]; + System.arraycopy(this.dataNodes, 0, tempDataNodes, 0, this.dataNodes.length); + System.arraycopy(dataNodes, 0, tempDataNodes, this.dataNodes.length, dataNodes.length); + this.dataNodes = tempDataNodes; + } + } + public void setSimpleHandler(ResponseHandler simpleHandler) { this.simpleHandler = simpleHandler; } @@ -86,6 +105,14 @@ public void setAdtCommitEnd(TraceRecord adtCommitEnd) { this.adtCommitEnd = adtCommitEnd; } + public void setComplexQuery(boolean complexQuery) { + this.complexQuery = complexQuery; + } + + public void setSubQuery(boolean subQuery) { + this.subQuery = subQuery; + } + public synchronized Boolean addToConnFlagMap(String item) { return connFlagMap.putIfAbsent(item, true); } @@ -125,10 +152,14 @@ public void setVeryEnd(long veryEnd) { } public void ready() { + prepareFinished = true; clear(); veryStart = veryStartPrepare; requestStart = requestStartPrepare; parseStart = parseStartPrepare; + veryStartPrepare = 0; + requestStartPrepare = null; + parseStartPrepare = null; } private void clear() { @@ -138,9 +169,11 @@ private void clear() { routeStart = null; preExecuteStart = null; preExecuteEnd = null; + dataNodes = null; adtCommitBegin = null; adtCommitEnd = null; - + complexQuery = false; + subQuery = false; simpleHandler = null; builder = null; //for complex query connFlagMap.clear(); @@ -157,6 +190,67 @@ private void clear() { veryEnd = 0; } + public List genRunningSQLStage() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("start genRunningSQLStage"); + } + List lst = new ArrayList<>(); + if (!prepareFinished) { + if (requestStartPrepare == null) { + return lst; + } else { + if (parseStartPrepare == null) { + lst.add(genTraceRecord("Read_SQL", requestStartPrepare.getTimestamp())); + return lst; + } else { + lst.add(genTraceRecord("Read_SQL", requestStartPrepare.getTimestamp(), parseStartPrepare.getTimestamp())); + lst.add(genTraceRecord("Parse_SQL", parseStartPrepare.getTimestamp())); + return lst; + } + } + } + lst.add(genTraceRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp())); + + if (routeStart == null) { + lst.add(genTraceRecord("Parse_SQL", parseStart.getTimestamp())); + return lst; + } else { + lst.add(genTraceRecord("Parse_SQL", parseStart.getTimestamp(), routeStart.getTimestamp())); + } + + if (preExecuteStart == null) { + lst.add(genTraceRecord("Route_Calculation", routeStart.getTimestamp())); + return lst; + } else { + lst.add(genTraceRecord("Route_Calculation", routeStart.getTimestamp(), preExecuteStart.getTimestamp())); + } + + if (preExecuteEnd == null) { + lst.add(genTraceRecord("Prepare_to_Push/Optimize", preExecuteStart.getTimestamp())); + return lst; + } else { + lst.add(genTraceRecord("Prepare_to_Push/Optimize", preExecuteStart.getTimestamp(), preExecuteEnd.getTimestamp())); + } + if (simpleHandler != null) { + genRunningSimpleResults(lst); + return lst; + } else if (builder != null) { + genRunningComplexQueryResults(lst); + return lst; + } else if (subQuery) { + lst.add(genTraceRecord("Doing_SubQuery", preExecuteEnd.getTimestamp())); + return lst; + } else if (dataNodes == null || complexQuery) { + lst.add(genTraceRecord("Generate_Query_Explain", preExecuteEnd.getTimestamp())); + return lst; + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("not support trace this query or unfinished"); + } + return lst; + } + } + public List genTraceResult() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("start genTraceResult"); @@ -190,6 +284,76 @@ public List genTraceResult() { return lst; } + private void genRunningComplexQueryResults(List lst) { + List results = ComplexQueryPlanUtil.getComplexQueryResult(builder); + long lastChildFinished = preExecuteEnd.getTimestamp(); + for (ReferenceHandlerInfo result : results) { + DMLResponseHandler handler = result.getHandler(); + if (handler instanceof BaseSelectHandler) { + Map fetchStartRecordMap = connReceivedMap.get(handler); + if (fetchStartRecordMap == null) { + if (!result.isNestLoopQuery()) { + lst.add(genTraceRecord("Execute_SQL", lastChildFinished, result.getName(), result.getRefOrSQL())); // lastChildFinished may is Long.MAX_VALUE + } else { + lst.add(genTraceRecord("Generate_New_Query", lastChildFinished)); // lastChildFinished may is Long.MAX_VALUE + } + lst.add(genTraceRecord("Fetch_result", result.getName(), result.getRefOrSQL())); + } else { + TraceRecord fetchStartRecord = fetchStartRecordMap.values().iterator().next(); + if (!result.isNestLoopQuery()) { + lst.add(genTraceRecord("Execute_SQL", lastChildFinished, fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL())); + } else { + TraceRecord handlerStart = recordStartMap.get(handler); + TraceRecord handlerEnd = recordEndMap.get(handler); + if (handlerStart == null) { + lst.add(genTraceRecord("Generate_New_Query", lastChildFinished)); // lastChildFinished may is Long.MAX_VALUE + } else if (handlerEnd == null) { + lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp())); + lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), result.getName(), result.getRefOrSQL())); + } else { + lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp())); + lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL())); + } + } + Map fetchEndRecordMap = connFinishedMap.get(handler); + if (fetchEndRecordMap == null) { + lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL())); + } else { + TraceRecord fetchEndRecord = fetchEndRecordMap.values().iterator().next(); + lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), result.getName(), result.getRefOrSQL())); + } + } + } else if (handler instanceof OutputHandler) { + TraceRecord startWrite = recordStartMap.get(handler); + if (startWrite == null) { + lst.add(genTraceRecord("Write_to_Client")); + } else if (veryEnd == 0) { + lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp())); + } else { + lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp(), veryEnd)); + } + } else { + TraceRecord handlerStart = recordStartMap.get(handler); + TraceRecord handlerEnd = recordEndMap.get(handler); + if (handlerStart == null) { + lst.add(genTraceRecord(result.getType())); + } else if (handlerEnd == null) { + lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), result.getName(), result.getRefOrSQL())); + } else { + lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL())); + } + + if (handler.getNextHandler() == null) { + if (handlerEnd != null) { + lastChildFinished = Math.max(lastChildFinished, handlerEnd.getTimestamp()); + } else { + lastChildFinished = Long.MAX_VALUE; + } + } + } + } + } + private boolean genComplexQueryResults(List lst) { lst.add(genTraceRecord("Try_Route_Calculation", routeStart.getTimestamp(), preExecuteStart.getTimestamp())); lst.add(genTraceRecord("Try_to_Optimize", preExecuteStart.getTimestamp(), preExecuteEnd.getTimestamp())); @@ -291,6 +455,98 @@ private boolean genSimpleResults(List lst) { return false; } + private void genRunningSimpleResults(List lst) { + Map connFetchStartMap = connReceivedMap.get(simpleHandler); + + Set receivedNode = new HashSet<>(); + long minFetchStart = Long.MAX_VALUE; + long maxFetchEnd = 0; + if (connFetchStartMap != null) { + Map connFetchEndMap = connFinishedMap.get(simpleHandler); + List executeList = new ArrayList<>(connFetchStartMap.size()); + List fetchList = new ArrayList<>(connFetchStartMap.size()); + for (Map.Entry fetchStart : connFetchStartMap.entrySet()) { + TraceRecord fetchStartRecord = fetchStart.getValue(); + receivedNode.add(fetchStartRecord.getDataNode()); + minFetchStart = Math.min(minFetchStart, fetchStartRecord.getTimestamp()); + executeList.add(genTraceRecord("Execute_SQL", preExecuteEnd.getTimestamp(), fetchStartRecord.getTimestamp(), fetchStartRecord.getDataNode(), fetchStartRecord.getRef())); + if (connFetchEndMap == null) { + fetchList.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchStartRecord.getDataNode(), fetchStartRecord.getRef())); + } else { + TraceRecord fetchEndRecord = connFetchEndMap.get(fetchStart.getKey()); + if (fetchEndRecord == null) { + fetchList.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchStartRecord.getDataNode(), fetchStartRecord.getRef())); + } else { + fetchList.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), fetchStartRecord.getDataNode(), fetchStartRecord.getRef())); + maxFetchEnd = Math.max(maxFetchEnd, fetchEndRecord.getTimestamp()); + } + } + } + lst.addAll(executeList); + if (receivedNode.size() != dataNodes.length) { + for (RouteResultsetNode dataNode : dataNodes) { + if (!receivedNode.contains(dataNode.getName())) { + lst.add(genTraceRecord("Execute_SQL", preExecuteEnd.getTimestamp(), dataNode.getName(), dataNode.getStatement())); + fetchList.add(genTraceRecord("Fetch_result", dataNode.getName(), dataNode.getStatement())); + } + } + } + lst.addAll(fetchList); + } else { + for (RouteResultsetNode dataNode : dataNodes) { + if (!receivedNode.contains(dataNode.getName())) { + lst.add(genTraceRecord("Execute_SQL", preExecuteEnd.getTimestamp(), dataNode.getName(), dataNode.getStatement())); + lst.add(genTraceRecord("Fetch_result", dataNode.getName(), dataNode.getStatement())); + } + } + } + if (adtCommitBegin != null) { + lst.add(genTraceRecord("Distributed_Transaction_Prepare", maxFetchEnd, adtCommitBegin.getTimestamp())); + lst.add(genTraceRecord("Distributed_Transaction_Commit", adtCommitBegin.getTimestamp(), adtCommitEnd.getTimestamp())); + } + if (minFetchStart == Long.MAX_VALUE) { + lst.add(genTraceRecord("Write_to_Client")); + } else if (veryEnd == 0) { + lst.add(genTraceRecord("Write_to_Client", minFetchStart)); + } else { + lst.add(genTraceRecord("Write_to_Client", minFetchStart, veryEnd)); + } + } + + private String[] genTraceRecord(String operation, long start) { + return genTraceRecord(operation, start, "-", "-"); + + } + + private String[] genTraceRecord(String operation, long start, String dataNode, String ref) { + if (start == Long.MAX_VALUE) { + return genTraceRecord(operation, dataNode, ref); + } + String[] readQuery = new String[6]; + readQuery[0] = operation; + readQuery[1] = nanoToMilliSecond(start - veryStart); + readQuery[2] = "unfinished"; + readQuery[3] = "unknown"; + readQuery[4] = dataNode; + readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " "); + return readQuery; + } + + private String[] genTraceRecord(String operation, String dataNode, String ref) { + String[] readQuery = new String[6]; + readQuery[0] = operation; + readQuery[1] = "not started"; + readQuery[2] = "unfinished"; + readQuery[3] = "unknown"; + readQuery[4] = dataNode; + readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " "); + return readQuery; + } + + private String[] genTraceRecord(String operation) { + return genTraceRecord(operation, "-", "-"); + } + private String[] genTraceRecord(String operation, long start, long end) { return genTraceRecord(operation, start, end, "-", "-"); } From 923fa87f872f3126513186ad19deee8482afd523 Mon Sep 17 00:00:00 2001 From: Lordess John Date: Fri, 29 May 2020 07:49:40 +0800 Subject: [PATCH 5/5] release 2.19.03.4/lts --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d0b7c15f0c..89d34ef35a 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ 4.0.0 com.actiontech dble - 2.19.03.3 + 2.19.03.4 jar dble-server The project of dble-server