Skip to content

Commit

Permalink
charset actiontech#35
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhuqing666 committed Sep 25, 2017
1 parent 5949c3d commit ef7ca64
Show file tree
Hide file tree
Showing 98 changed files with 1,931 additions and 1,236 deletions.
6 changes: 0 additions & 6 deletions findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,6 @@
<Bug pattern="SF_SWITCH_NO_DEFAULT"/>
<Class name="com.actiontech.dble.server.parser.ServerParseSelect"/>
</Match>
<!-- need refactor -->
<Match>
<Bug category="STYLE"/>
<Bug pattern="SF_SWITCH_NO_DEFAULT"/>
<Class name="com.actiontech.dble.server.response.CharacterSet"/>
</Match>

<!-- use enum? -->
<Match>
Expand Down
602 changes: 312 additions & 290 deletions src/main/java/com/actiontech/dble/backend/mysql/CharsetUtil.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private static byte[] encode(String src, String charset) {

public static FieldPacket getField(String name, String orgName, int type) {
FieldPacket packet = new FieldPacket();
packet.setCharsetIndex(CharsetUtil.getIndex(UTF8));
packet.setCharsetIndex(CharsetUtil.getCharsetDefaultIndex(UTF8));
packet.setName(encode(name, UTF8));
packet.setOrgName(encode(orgName, UTF8));
packet.setType(type);
Expand All @@ -50,7 +50,7 @@ public static FieldPacket getField(String name, String orgName, int type) {

public static FieldPacket getField(String name, int type) {
FieldPacket packet = new FieldPacket();
packet.setCharsetIndex(CharsetUtil.getIndex(UTF8));
packet.setCharsetIndex(CharsetUtil.getCharsetDefaultIndex(UTF8));
packet.setName(encode(name, UTF8));
packet.setType(type);
return packet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.ServerConnection;
import com.actiontech.dble.server.SystemVariables;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.util.TimeUtil;
import com.actiontech.dble.util.exception.UnknownTxIsolationException;
Expand Down Expand Up @@ -223,7 +224,10 @@ public void authenticate() {
packet.setPacketId(1);
packet.setClientFlags(clientFlags);
packet.setMaxPacketSize(maxPacketSize);
packet.setCharsetIndex(this.charsetIndex);
//TODO:CHECK
int charsetIndex = CharsetUtil.getCharsetDefaultIndex(SystemVariables.getDefaultValue("character_set_server"));
packet.setCharsetIndex(charsetIndex);

packet.setUser(user);
try {
packet.setPassword(passwd(password, handshake));
Expand All @@ -250,21 +254,27 @@ public boolean isClosedOrQuit() {
return isClosed() || isQuit.get();
}

protected void sendQueryCmd(String query) {
protected void sendQueryCmd(String query, CharsetNames clientCharset) {
CommandPacket packet = new CommandPacket();
packet.setPacketId(0);
packet.setCommand(MySQLPacket.COM_QUERY);
try {
packet.setArg(query.getBytes(CharsetUtil.getJavaCharset(charset)));
packet.setArg(query.getBytes(CharsetUtil.getJavaCharset(clientCharset.getClient())));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
lastTime = TimeUtil.currentTimeMillis();
packet.write(this);
}

private static void getCharsetCommand(StringBuilder sb, int clientCharIndex) {
sb.append("SET names ").append(CharsetUtil.getCharset(clientCharIndex)).append(";");
private static void getCharsetCommand(StringBuilder sb, CharsetNames clientCharset) {
sb.append("SET CHARACTER_SET_CLIENT = ");
sb.append(clientCharset.getClient());
sb.append(",CHARACTER_SET_RESULTS = ");
sb.append(clientCharset.getResults());
sb.append(",COLLATION_CONNECTION = ");
sb.append(clientCharset.getCollation());
sb.append(";");
}

private static void getTxIsolationCommand(StringBuilder sb, int txIsolation) {
Expand Down Expand Up @@ -306,7 +316,7 @@ public void execute(RouteResultsetNode rrn, ServerConnection sc,
if (!sc.isAutocommit() && !sc.isTxstart() && modifiedSQLExecuted) {
sc.setTxstart(true);
}
synAndDoExecute(xaTxId, rrn, sc.getCharsetIndex(), sc.getTxIsolation(), isAutoCommit);
synAndDoExecute(xaTxId, rrn, sc.getCharset(), sc.getTxIsolation(), isAutoCommit);
}

public String getConnXID(NonBlockingSession session) {
Expand All @@ -319,7 +329,7 @@ public String getConnXID(NonBlockingSession session) {
}

private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
int clientCharSetIndex, int clientTxIsoLation,
CharsetNames clientCharset, int clientTxIsoLation,
boolean expectAutocommit) {
String xaCmd = null;
boolean conAutoComit = this.autocommit;
Expand All @@ -332,20 +342,13 @@ private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
xaSyn = 1;
}
int schemaSyn = conSchema.equals(oldSchema) ? 0 : 1;
int charsetSyn = 0;
if (this.charsetIndex != clientCharSetIndex) {
//need to syn the charset of connection.
//set current connection charset to client charset.
//otherwise while sending commend to server the charset will not coincidence.
setCharset(CharsetUtil.getCharset(clientCharSetIndex));
charsetSyn = 1;
}
int charsetSyn = (charsetName.equals(clientCharset)) ? 0 : 1;
int txIsoLationSyn = (txIsolation == clientTxIsoLation) ? 0 : 1;
int autoCommitSyn = (conAutoComit == expectAutocommit) ? 0 : 1;
int synCount = schemaSyn + charsetSyn + txIsoLationSyn + autoCommitSyn + xaSyn;
if (synCount == 0) {
// not need syn connection
sendQueryCmd(rrn.getStatement());
sendQueryCmd(rrn.getStatement(), clientCharset);
return;
}
CommandPacket schemaCmd = null;
Expand All @@ -356,7 +359,7 @@ private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
}

if (charsetSyn == 1) {
getCharsetCommand(sb, clientCharSetIndex);
getCharsetCommand(sb, clientCharset);
}
if (txIsoLationSyn == 1) {
getTxIsolationCommand(sb, clientTxIsoLation);
Expand All @@ -374,7 +377,7 @@ private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
}
metaDataSyned = false;
statusSync = new StatusSync(conSchema,
clientCharSetIndex, clientTxIsoLation, expectAutocommit,
clientCharset, clientTxIsoLation, expectAutocommit,
synCount);
// syn schema
if (schemaCmd != null) {
Expand All @@ -383,7 +386,7 @@ private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
// and our query sql to multi command at last
sb.append(rrn.getStatement() + ";");
// syn and execute others
this.sendQueryCmd(sb.toString());
this.sendQueryCmd(sb.toString(), clientCharset);
// waiting syn result...

}
Expand All @@ -406,7 +409,7 @@ public void query(String query) throws UnsupportedEncodingException {
RouteResultsetNode rrn = new RouteResultsetNode("default",
ServerParse.SELECT, query);

synAndDoExecute(null, rrn, this.charsetIndex, this.txIsolation, true);
synAndDoExecute(null, rrn, this.charsetName, this.txIsolation, true);

}

Expand Down Expand Up @@ -456,13 +459,12 @@ public void terminate(String reason) {
}

public void commit() {

COMMIT.write(this);

}

public void execCmd(String cmd) {
this.sendQueryCmd(cmd);
this.sendQueryCmd(cmd, this.charsetName);
}

public void rollback() {
Expand Down Expand Up @@ -541,7 +543,7 @@ public void setBorrowed(boolean borrowed) {
public String toString() {
return "MySQLConnection [id=" + id + ", lastTime=" + lastTime + ", user=" + user + ", schema=" + schema +
", old shema=" + oldSchema + ", borrowed=" + borrowed + ", fromSlaveDB=" + fromSlaveDB + ", threadId=" +
threadId + ", charset=" + charset + ", txIsolation=" + txIsolation + ", autocommit=" + autocommit +
threadId + "," + charsetName.toString() + ", txIsolation=" + txIsolation + ", autocommit=" + autocommit +
", attachment=" + attachment + ", respHandler=" + respHandler + ", host=" + host + ", port=" + port +
", statusSync=" + statusSync + ", writeQueue=" + this.getWriteQueue().size() +
", modifiedSQLExecuted=" + modifiedSQLExecuted + "]";
Expand Down Expand Up @@ -586,17 +588,17 @@ public boolean syncAndExcute() {

private static class StatusSync {
private final String schema;
private final Integer charsetIndex;
private final CharsetNames clientCharset;
private final Integer txtIsolation;
private final Boolean autocommit;
private final AtomicInteger synCmdCount;

StatusSync(String schema,
Integer charsetIndex, Integer txtIsolation, Boolean autocommit,
CharsetNames clientCharset, Integer txtIsolation, Boolean autocommit,
int synCount) {
super();
this.schema = schema;
this.charsetIndex = charsetIndex;
this.clientCharset = clientCharset;
this.txtIsolation = txtIsolation;
this.autocommit = autocommit;
this.synCmdCount = new AtomicInteger(synCount);
Expand All @@ -619,8 +621,8 @@ private void updateConnectionInfo(MySQLConnection conn) {
conn.schema = schema;
conn.oldSchema = conn.schema;
}
if (charsetIndex != null) {
conn.setCharset(CharsetUtil.getCharset(charsetIndex));
if (clientCharset != null) {
conn.setCharsetName(clientCharset);
}
if (txtIsolation != null) {
conn.txIsolation = txtIsolation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private void processHandShakePacket(byte[] data) {
int charsetIndex = (packet.getServerCharsetIndex() & 0xff);
String charset = CharsetUtil.getCharset(charsetIndex);
if (charset != null) {
source.setCharset(charset);
source.setCharacterSet(charset);
} else {
throw new RuntimeException("Unknown charsetIndex:" + charsetIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.actiontech.dble.backend.mysql.nio.handler;

import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.CharsetUtil;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.server.NonBlockingSession;
Expand Down Expand Up @@ -72,7 +73,7 @@ public void errorResponse(byte[] data, BackendConnection conn) {
err.read(data);
String msg = null;
try {
msg = new String(err.getMessage(), conn.getCharset());
msg = new String(err.getMessage(), CharsetUtil.getJavaCharset(conn.getCharset().getResults()));
} catch (UnsupportedEncodingException e) {
msg = new String(err.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void connectionClose(BackendConnection conn, String reason) {
ErrorPacket errPacket = new ErrorPacket();
errPacket.setPacketId(++packetId);
errPacket.setErrno(ErrorCode.ER_ABORTING_CONNECTION);
errPacket.setMessage(StringUtil.encode(reason, session.getSource().getCharset()));
errPacket.setMessage(StringUtil.encode(reason, session.getSource().getCharset().getResults()));
err = errPacket;

lock.lock();
Expand Down Expand Up @@ -155,7 +155,7 @@ public void connectionError(Throwable e, BackendConnection conn) {
ErrorPacket errPacket = new ErrorPacket();
errPacket.setPacketId(++packetId);
errPacket.setErrno(ErrorCode.ER_ABORTING_CONNECTION);
errPacket.setMessage(StringUtil.encode(e.toString(), session.getSource().getCharset()));
errPacket.setMessage(StringUtil.encode(e.toString(), session.getSource().getCharset().getResults()));
err = errPacket;

lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected ErrorPacket createErrPkg(String errmgs) {
lock.unlock();
}
err.setErrno(ErrorCode.ER_UNKNOWN_ERROR);
err.setMessage(StringUtil.encode(errmgs, session.getSource().getCharset()));
err.setMessage(StringUtil.encode(errmgs, session.getSource().getCharset().getResults()));
return err;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void connectionClose(BackendConnection conn, String reason) {
ErrorPacket errPacket = new ErrorPacket();
errPacket.setPacketId(++packetId);
errPacket.setErrno(ErrorCode.ER_ABORTING_CONNECTION);
errPacket.setMessage(StringUtil.encode(reason, session.getSource().getCharset()));
errPacket.setMessage(StringUtil.encode(reason, session.getSource().getCharset().getResults()));
err = errPacket;
lock.lock();
try {
Expand All @@ -211,7 +211,7 @@ public void connectionError(Throwable e, BackendConnection conn) {
ErrorPacket errPacket = new ErrorPacket();
errPacket.setPacketId(++packetId);
errPacket.setErrno(ErrorCode.ER_ABORTING_CONNECTION);
errPacket.setMessage(StringUtil.encode(e.toString(), session.getSource().getCharset()));
errPacket.setMessage(StringUtil.encode(e.toString(), session.getSource().getCharset().getResults()));
err = errPacket;
lock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacke
List<FieldPacket> fieldPackets = new ArrayList<>(2);
packetId = ShowTables.writeFullTablesHeader(buffer, source, showTableSchema, fieldPackets);
if (info.getWhere() != null) {
MySQLItemVisitor mev = new MySQLItemVisitor(source.getSchema(), source.getCharsetIndex());
MySQLItemVisitor mev = new MySQLItemVisitor(source.getSchema(), source.getCharset().getResultsIndex());
info.getWhereExpr().accept(mev);
sourceFields = HandlerTool.createFields(fieldPackets);
whereItem = HandlerTool.createItem(mev.getItem(), sourceFields, 0, false, DMLResponseHandler.HandlerType.WHERE);
Expand All @@ -73,7 +73,7 @@ public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacke
public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) {
RowDataPacket rowDataPacket = new RowDataPacket(1);
rowDataPacket.read(row);
String table = StringUtil.decode(rowDataPacket.fieldValues.get(0), session.getSource().getCharset());
String table = StringUtil.decode(rowDataPacket.fieldValues.get(0), session.getSource().getCharset().getResults());
if (shardingTablesMap.containsKey(table)) {
this.netOutBytes += row.length;
this.selectRows++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void connectionClose(BackendConnection conn, String reason) {
ErrorPacket err = new ErrorPacket();
err.setPacketId(++packetId);
err.setErrno(ErrorCode.ER_ERROR_ON_CLOSE);
err.setMessage(StringUtil.encode(reason, session.getSource().getCharset()));
err.setMessage(StringUtil.encode(reason, session.getSource().getCharset().getResults()));
this.backConnectionErr(err, conn);
session.getSource().close(reason);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.datasource.PhysicalDBNode;
import com.actiontech.dble.backend.mysql.CharsetUtil;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
import com.actiontech.dble.config.ServerConfig;
Expand Down Expand Up @@ -146,7 +147,7 @@ public void errorResponse(byte[] err, BackendConnection conn) {
errPacket.read(err);
String errMsg;
try {
errMsg = new String(errPacket.getMessage(), conn.getCharset());
errMsg = new String(errPacket.getMessage(), CharsetUtil.getJavaCharset(conn.getCharset().getResults()));
} catch (UnsupportedEncodingException e) {
errMsg = "UnsupportedEncodingException:" + conn.getCharset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.CharsetUtil;
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
import com.actiontech.dble.backend.mysql.nio.handler.util.HandlerTool;
import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator;
Expand Down Expand Up @@ -70,8 +71,8 @@ public void fieldEofResponse(byte[] headernull, List<byte[]> fieldsnull, final L
List<Order> orders = this.fixedOrders;
if (orders == null)
orders = HandlerTool.makeOrder(this.distincts);
RowDataComparator cmptor = new RowDataComparator(this.fieldPackets, orders, this.isAllPushDown(), type(), conn.getCharset());
localResult = new DistinctLocalResult(pool, sourceFields.size(), cmptor, conn.getCharset()).
RowDataComparator cmptor = new RowDataComparator(this.fieldPackets, orders, this.isAllPushDown(), type());
localResult = new DistinctLocalResult(pool, sourceFields.size(), cmptor, CharsetUtil.getJavaCharset(conn.getCharset().getResults())).
setMemSizeController(session.getOtherBufferMC());
nextHandler.fieldEofResponse(null, null, this.fieldPackets, null, this.isLeft, conn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,7 @@ private void terminatePreHandler(DMLResponseHandler handler) {

private RowDataComparator makeRowDataSorter(MySQLConnection conn) {
if (!isEasyMerge)
return new RowDataComparator(this.fieldPackets, orderBys, this.isAllPushDown(), this.type(),
conn.getCharset());
return new RowDataComparator(this.fieldPackets, orderBys, this.isAllPushDown(), this.type());
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.CharsetUtil;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.backend.mysql.nio.handler.query.OwnThreadDMLHandler;
import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator;
Expand Down Expand Up @@ -54,8 +55,8 @@ public void fieldEofResponse(byte[] headernull, List<byte[]> fieldsnull, final L
this.pool = DbleServer.getInstance().getBufferPool();

this.fieldPackets = fieldPackets;
RowDataComparator cmp = new RowDataComparator(this.fieldPackets, orders, isAllPushDown(), type(), conn.getCharset());
localResult = new SortedLocalResult(pool, fieldPackets.size(), cmp, conn.getCharset()).
RowDataComparator cmp = new RowDataComparator(this.fieldPackets, orders, isAllPushDown(), type());
localResult = new SortedLocalResult(pool, fieldPackets.size(), cmp, CharsetUtil.getJavaCharset(conn.getCharset().getResults())).
setMemSizeController(session.getOrderBufferMC());
nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, conn);
startOwnThread(conn);
Expand Down
Loading

0 comments on commit ef7ca64

Please sign in to comment.