Skip to content

Commit

Permalink
split merge hander actiontech#1139 (actiontech#1140)
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhuqing666 authored and Lordess committed May 27, 2020
1 parent 48e3e64 commit 17b0580
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.GlobalVisitor;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.*;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.AggregateHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.AllAnySubQueryHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.InSubQueryHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.SingleRowSubQueryHandler;
Expand Down Expand Up @@ -186,8 +186,8 @@ protected final void noShardBuild() {
}
}

MultiNodeMergeHandler mh = new MultiNodeMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(),
session, null);
MultiNodeMergeHandler mh = new MultiNodeEasyMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(),
session);
addHandler(mh);
}

Expand Down Expand Up @@ -391,11 +391,15 @@ public static long getSequenceId() {

protected void buildMergeHandler(PlanNode planNode, RouteResultsetNode[] rrssArray) {
hBuilder.checkRRSs(rrssArray);
MultiNodeMergeHandler mh = null;
List<Order> orderBys = planNode.getGroupBys().size() > 0 ? planNode.getGroupBys() : planNode.getOrderBys();
boolean isEasyMerge = rrssArray.length == 1 || (orderBys == null || orderBys.size() == 0);

mh = new MultiNodeMergeHandler(getSequenceId(), rrssArray, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session,
orderBys);
MultiNodeMergeHandler mh;
if (isEasyMerge) {
mh = new MultiNodeEasyMergeHandler(getSequenceId(), rrssArray, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session);
} else {
mh = new MultiNodeMergeAndOrderHandler(getSequenceId(), rrssArray, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session, orderBys);
}
addHandler(mh);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public BaseHandlerBuilder build() throws Exception {
//set slave only into rrsNode
for (DMLResponseHandler startHandler : fh.getMerges()) {
MultiNodeMergeHandler mergeHandler = (MultiNodeMergeHandler) startHandler;
for (BaseSelectHandler bshandler : mergeHandler.getExeHandlers()) {
bshandler.getRrss().setRunOnSlave(this.session.getComplexRrs().getRunOnSlave());
for (BaseSelectHandler baseHandler : mergeHandler.getExeHandlers()) {
baseHandler.getRrss().setRunOnSlave(this.session.getComplexRrs().getRunOnSlave());
}
}
session.endComplexRoute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.PushDownVisitor;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeEasyMergeHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler;
import com.actiontech.dble.config.model.SchemaConfig;
import com.actiontech.dble.plan.node.NoNameNode;
Expand Down Expand Up @@ -54,8 +55,7 @@ public void buildOwn() {
String randomDatenode = getRandomNode(schemaConfig.getAllDataNodes());
RouteResultsetNode[] rrss = new RouteResultsetNode[]{new RouteResultsetNode(randomDatenode, ServerParse.SELECT, sql)};
hBuilder.checkRRSs(rrss);
MultiNodeMergeHandler mh = new MultiNodeMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(),
session, null);
MultiNodeMergeHandler mh = new MultiNodeEasyMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(), session);
addHandler(mh);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (C) 2016-2019 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/

package com.actiontech.dble.backend.mysql.nio.handler.query.impl;

import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* mergeHandler will merge data,if contains aggregate function,use group by handler
*
* @author ActionTech
*/
public class MultiNodeEasyMergeHandler extends MultiNodeMergeHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeEasyMergeHandler.class);

public MultiNodeEasyMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session) {
super(id, route, autocommit, session);
this.merges.add(this);
}

@Override
public void execute() throws Exception {
synchronized (exeHandlers) {
if (terminate.get())
return;
for (BaseSelectHandler exeHandler : exeHandlers) {
session.setHandlerStart(exeHandler); //base start execute
MySQLConnection exeConn = exeHandler.initConnection();
if (exeConn != null) {
exeConn.setComplexQuery(true);
exeHandler.execute(exeConn);
}
}
}
}

@Override
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof,
boolean isLeft, BackendConnection conn) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(conn.toString() + "'s field is reached.");
}
session.setHandlerStart(this);
// if terminated
if (terminate.get()) {
return;
}
lock.lock(); // for combine
try {
if (this.fieldPackets.isEmpty()) {
this.fieldPackets = fieldPackets;
nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, conn);
}
startEasyMerge();
if (++reachedConCount == route.length) {
session.allBackendConnReceive();
}
} finally {
lock.unlock();
}
}

@Override
public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) {
if (terminate.get())
return true;
return nextHandler.rowResponse(null, rowPacket, this.isLeft, conn);
}

@Override
public void rowEofResponse(byte[] data, boolean isLeft, BackendConnection conn) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(conn.toString() + " 's rowEof is reached.");
}

if (this.terminate.get())
return;
lock.lock();
try {
if (reachedConCount == route.length) {
session.setHandlerEnd(this);
nextHandler.rowEofResponse(null, this.isLeft, conn);
}
} finally {
lock.unlock();
}
}

@Override
protected void ownThreadJob(Object... objects) {
}

@Override
protected void terminateThread() throws Exception {
recycleConn();
}

@Override
protected void recycleResources() {
}
}
Loading

0 comments on commit 17b0580

Please sign in to comment.