Skip to content

Commit

Permalink
set stament actiontech#78
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhuqing666 committed Sep 25, 2017
1 parent ef7ca64 commit ffddd07
Show file tree
Hide file tree
Showing 34 changed files with 1,416 additions and 1,887 deletions.
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<configuration>
<encoding>${app.encoding}</encoding>
<attach>true</attach>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public interface BackendConnection extends ClosableConnection {
void execute(RouteResultsetNode node, ServerConnection source,
boolean autocommit);

boolean syncAndExcute();
boolean syncAndExecute();

void rollback();

Expand Down
199 changes: 159 additions & 40 deletions src/main/java/com/actiontech/dble/backend/mysql/nio/MySQLConnection.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

/**
* company where id=(select company_id from customer where id=3); the one which
* return data (id) is the datanode to store child table's records
* return data (id) is the data node to store child table's records
*
* @author wuzhih, huqing.yan
*/
Expand Down Expand Up @@ -73,7 +73,7 @@ public String execute(String schema, ArrayList<String> dataNodes) {
PhysicalDBNode mysqlDN = conf.getDataNodes().get(dn);
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("execute in datanode " + dn);
LOGGER.debug("execute in data_node " + dn);
}
RouteResultsetNode node = new RouteResultsetNode(dn, ServerParse.SELECT, sql);
node.setRunOnSlave(false); // get child node from master
Expand Down Expand Up @@ -161,7 +161,7 @@ public void okResponse(byte[] ok, BackendConnection conn) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("okResponse " + conn);
}
boolean executeResponse = conn.syncAndExcute();
boolean executeResponse = conn.syncAndExecute();
if (executeResponse) {
finished.incrementAndGet();
if (canReleaseConn()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void connectionAcquired(BackendConnection conn) {

@Override
public void okResponse(byte[] data, BackendConnection conn) {
boolean executeResponse = conn.syncAndExcute();
boolean executeResponse = conn.syncAndExecute();
if (executeResponse) {
if (clearIfSessionClosed(session)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public MultiNodeDdlHandler(int sqlType, RouteResultset rrs, NonBlockingSession s
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("execute mutinode query " + rrs.getStatement());
LOGGER.debug("execute multiNode query " + rrs.getStatement());
}

this.rrs = RouteResultCopy.rrCopy(rrs, ServerParse.SELECT, STMT);
Expand Down Expand Up @@ -138,7 +138,7 @@ public void connectionClose(BackendConnection conn, String reason) {
errConnection = new ArrayList<>();
}
errConnection.add(conn);
if (!conn.syncAndExcute()) {
if (!conn.syncAndExecute()) {
return;
}
if (--nodeCount <= 0) {
Expand Down Expand Up @@ -167,7 +167,7 @@ public void connectionError(Throwable e, BackendConnection conn) {
errConnection = new ArrayList<>();
}
errConnection.add(conn);
if (!conn.syncAndExcute()) {
if (!conn.syncAndExecute()) {
return;
}
if (--nodeCount <= 0) {
Expand Down Expand Up @@ -196,7 +196,7 @@ public void errorResponse(byte[] data, BackendConnection conn) {
try {
if (!isFail())
setFail(err.toString());
if (!conn.syncAndExcute()) {
if (!conn.syncAndExecute()) {
return;
}
if (--nodeCount > 0)
Expand All @@ -210,18 +210,18 @@ public void errorResponse(byte[] data, BackendConnection conn) {
/* arriving here is impossible */
@Override
public void okResponse(byte[] data, BackendConnection conn) {
if (!conn.syncAndExcute()) {
if (!conn.syncAndExecute()) {
LOGGER.debug("MultiNodeDdlHandler should not arrive here(okResponse) !");
}
}

@Override
public void rowEofResponse(final byte[] eof, boolean isLeft, BackendConnection conn) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("on row end reseponse " + conn);
LOGGER.debug("on row end response " + conn);
}

if (errorRepsponsed.get()) {
if (errorResponsed.get()) {
return;
}

Expand Down Expand Up @@ -262,14 +262,14 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, BackendConnection c
}

@Override
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPacketsnull, byte[] eof,
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPacketsNull, byte[] eof,
boolean isLeft, BackendConnection conn) {
}

@Override
public boolean rowResponse(final byte[] row, RowDataPacket rowPacketnull, boolean isLeft, BackendConnection conn) {
public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolean isLeft, BackendConnection conn) {
/* It is impossible arriving here, because we set limit to 0 */
return errorRepsponsed.get();
return errorResponsed.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public abstract class MultiNodeHandler implements ResponseHandler {
private AtomicBoolean isFailed = new AtomicBoolean(false);
protected volatile String error;
protected byte packetId;
protected final AtomicBoolean errorRepsponsed = new AtomicBoolean(false);
protected final AtomicBoolean errorResponsed = new AtomicBoolean(false);

public MultiNodeHandler(NonBlockingSession session) {
if (session == null) {
Expand All @@ -48,7 +48,7 @@ public boolean isFail() {


protected boolean canClose(BackendConnection conn, boolean tryErrorFinish) {
// realse this connection if safe
// release this connection if safe
session.releaseConnectionIfSafe(conn, false);
boolean allFinished = false;
if (tryErrorFinish) {
Expand All @@ -69,9 +69,9 @@ public void errorResponse(byte[] data, BackendConnection conn) {
session.releaseConnectionIfSafe(conn, false);
ErrorPacket err = new ErrorPacket();
err.read(data);
String errmsg = new String(err.getMessage());
this.setFail(errmsg);
LOGGER.warn("error response from " + conn + " err " + errmsg + " code:" + err.getErrno());
String errMsg = new String(err.getMessage());
this.setFail(errMsg);
LOGGER.warn("error response from " + conn + " err " + errMsg + " code:" + err.getErrno());
this.tryErrorFinished(this.decrementCountBy(1));
}

Expand Down Expand Up @@ -109,7 +109,7 @@ protected void reset(int initCount) {
packetId = 0;
}

protected ErrorPacket createErrPkg(String errmgs) {
protected ErrorPacket createErrPkg(String errMsg) {
ErrorPacket err = new ErrorPacket();
lock.lock();
try {
Expand All @@ -118,13 +118,13 @@ protected ErrorPacket createErrPkg(String errmgs) {
lock.unlock();
}
err.setErrno(ErrorCode.ER_UNKNOWN_ERROR);
err.setMessage(StringUtil.encode(errmgs, session.getSource().getCharset().getResults()));
err.setMessage(StringUtil.encode(errMsg, session.getSource().getCharset().getResults()));
return err;
}

protected void tryErrorFinished(boolean allEnd) {
if (allEnd && !session.closed()) {
if (errorRepsponsed.compareAndSet(false, true)) {
if (errorResponsed.compareAndSet(false, true)) {
createErrPkg(this.error).write(session.getSource());
}
// clear session resources,release all
Expand All @@ -135,7 +135,7 @@ protected void tryErrorFinished(boolean allEnd) {
session.closeAndClearResources(error);
} else {
session.getSource().setTxInterrupt(this.error);
// clear resouces
// clear resources
clearResources();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void errorResponse(byte[] data, BackendConnection conn) {
@Override
public void okResponse(byte[] data, BackendConnection conn) {
this.netOutBytes += data.length;
boolean executeResponse = conn.syncAndExcute();
boolean executeResponse = conn.syncAndExecute();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("received ok response ,executeResponse:" + executeResponse + " from " + conn);
}
Expand Down Expand Up @@ -321,7 +321,7 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, BackendConnection c

this.netOutBytes += eof.length;

if (errorRepsponsed.get()) {
if (errorResponsed.get()) {
return;
}

Expand Down Expand Up @@ -645,7 +645,7 @@ private void mergeFieldEof(List<byte[]> fields, byte[] eof) throws IOException {
}

public void handleDataProcessException(Exception e) {
if (!errorRepsponsed.get()) {
if (!errorResponsed.get()) {
this.error = e.toString();
LOGGER.warn("caught exception ", e);
setFail(e.toString());
Expand All @@ -656,7 +656,7 @@ public void handleDataProcessException(Exception e) {
@Override
public boolean rowResponse(final byte[] row, RowDataPacket rowPacketnull, boolean isLeft, BackendConnection conn) {

if (errorRepsponsed.get()) {
if (errorResponsed.get()) {
// the connection has been closed or set to "txInterrupt" properly
//in tryErrorFinished() method! If we close it here, it can
// lead to tx error such as blocking rollback tx for ever.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/

package com.actiontech.dble.backend.mysql.nio.handler;

import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.net.mysql.ErrorPacket;
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class ResetConnHandler implements ResponseHandler {
public static final Logger LOGGER = LoggerFactory.getLogger(ResetConnHandler.class);
@Override
public void connectionError(Throwable e, BackendConnection conn) {
String msg = e.getMessage() == null ? e.toString() : e.getMessage();
LOGGER.warn(msg);
}


@Override
public void errorResponse(byte[] err, BackendConnection conn) {
ErrorPacket errPg = new ErrorPacket();
errPg.read(err);
conn.close(new String(errPg.getMessage()));
}

@Override
public void okResponse(byte[] ok, BackendConnection conn) {
MySQLConnection mysqlConn = (MySQLConnection) conn;
mysqlConn.resetContextStatus();
conn.release();
}

@Override
public void connectionAcquired(BackendConnection conn) {
//not happen
}
@Override
public void connectionClose(BackendConnection conn, String reason) {
LOGGER.warn(reason);
}
@Override
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof, boolean isLeft, BackendConnection conn) {
//not happen
}

@Override
public boolean rowResponse(byte[] rownull, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) {
//not happen
return false;
}

@Override
public void rowEofResponse(byte[] eof, boolean isLeft, BackendConnection conn) {
//not happen
}

@Override
public void relayPacketResponse(byte[] relayPacket, BackendConnection conn) {
//not happen
}

@Override
public void endPacketResponse(byte[] endPacket, BackendConnection conn) {
//not happen
}

@Override
public void writeQueueAvailable() {
//not happen
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void okResponse(byte[] data, BackendConnection conn) {
//
this.netOutBytes += data.length;

boolean executeResponse = conn.syncAndExcute();
boolean executeResponse = conn.syncAndExecute();
if (executeResponse) {
session.handleSpecial(rrs, session.getSource().getSchema(), true);
ServerConnection source = session.getSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void errorResponse(byte[] err, BackendConnection conn) {

@Override
public void okResponse(byte[] data, BackendConnection conn) {
boolean executeResponse = conn.syncAndExcute();
boolean executeResponse = conn.syncAndExecute();
if (executeResponse) {
boolean isEndPack = decrementCountBy(1);
session.releaseConnection(conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public RouteResultsetNode getRrss() {

@Override
public void okResponse(byte[] ok, BackendConnection conn) {
conn.syncAndExcute();
conn.syncAndExecute();
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/actiontech/dble/config/Versions.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public abstract class Versions {

public static final byte PROTOCOL_VERSION = 10;

private static byte[] serverVersion = "5.6.29-dble-2.17.08.0-dev-20170904220535".getBytes();
private static byte[] serverVersion = "5.6.29-dble-2.17.08.0-20170922131413".getBytes();
public static final byte[] VERSION_COMMENT = "dble Server (ActionTech)".getBytes();
public static final String ANNOTATION_NAME = "dble:";
public static final String ROOT_PREFIX = "dble";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class ShowBackend {
private ShowBackend() {
}

private static final int FIELD_COUNT = 18;
private static final int FIELD_COUNT = 20;
private static final ResultSetHeaderPacket HEADER = PacketUtil.getHeader(FIELD_COUNT);
private static final FieldPacket[] FIELDS = new FieldPacket[FIELD_COUNT];
private static final EOFPacket EOF = new EOFPacket();
Expand All @@ -41,8 +41,7 @@ private ShowBackend() {
int i = 0;
byte packetId = 0;
HEADER.setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("processor",
Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i] = PacketUtil.getField("processor", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("ID", Fields.FIELD_TYPE_LONG);
FIELDS[i++].setPacketId(++packetId);
Expand Down Expand Up @@ -76,10 +75,15 @@ private ShowBackend() {
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("CHARACTER_SET_RESULTS", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("TXLEVEL", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i] = PacketUtil.getField("TX_ISOLATION_LEVEL", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("AUTOCOMMIT", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("SYS_VARIABLES", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("USER_VARIABLES", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i].setPacketId(++packetId);

EOF.setPacketId(++packetId);
}

Expand Down Expand Up @@ -132,6 +136,8 @@ private static RowDataPacket getRow(BackendConnection c, String charset) {
row.add(conn.getCharset().getResults().getBytes());
row.add((conn.getTxIsolation() + "").getBytes());
row.add((conn.isAutocommit() + "").getBytes());
row.add(StringUtil.encode(conn.getStringOfSysVariables(), charset));
row.add(StringUtil.encode(conn.getStringOfUsrVariables(), charset));
return row;
}
}
Loading

0 comments on commit ffddd07

Please sign in to comment.