Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

field can terminated by string in load data #1484

Merged
merged 5 commits into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
split dump file
  • Loading branch information
PanternBao committed Nov 5, 2019
commit 9854648aeba13974c7f352618078ad9c62e49b6d
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,31 @@ public void run() {
stmt = queue.take();
context.setStmt(stmt);
int type = ServerParse.parse(stmt);
if ((type == ServerParse.MYSQL_COMMENT || type == ServerParse.MYSQL_CMD_COMMENT) &&
context.getSchema() == null) {
if (ServerParse.CREATE_DATABASE != type && context.getSchema() == null) {
writer.writeAll(stmt);
continue;
}
// footer
if (stmt.contains("OLD_")) {
if (stmt.contains("=@OLD_")) {
writer.writeAll(stmt);
continue;
}
if (stmt.equals(DumpFileReader.EOF)) {
writer.writeAll(stmt);
return;
}
// parse

SQLStatement statement = null;
if (ServerParse.DDL == type || ServerParse.CREATE_DATABASE == type) {
// parse ddl or create database
if (ServerParse.DDL == type || ServerParse.CREATE_DATABASE == type || ServerParse.USE == (0xff & type)) {
stmt = stmt.replace("/*!", "/*#");
statement = RouteStrategyFactory.getRouteStrategy().parserSQL(stmt);
}
// if ddl is wrong,the following statement is skip.
if (context.isSkip()) {
continue;
} else if (ServerParse.INSERT == type && !context.isPushDown()) {
}
if (ServerParse.INSERT == type && !context.isPushDown()) {
statement = RouteStrategyFactory.getRouteStrategy().parserSQL(stmt);
}
StatementHandler handler = StatementHandlerManager.getHandler(statement);
Expand Down
29 changes: 19 additions & 10 deletions src/main/java/com/actiontech/dble/manager/dump/DumpFileReader.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.actiontech.dble.manager.dump;

import com.actiontech.dble.backend.mysql.store.fs.FileUtils;
import com.actiontech.dble.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -32,7 +31,7 @@ public void start(BlockingQueue<String> queue) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(0x20000);
int byteRead = fileChannel.read(buffer);
while (byteRead != -1) {
readSQLByEOF(buffer.array());
readSQLByEOF(buffer.array(), byteRead);
buffer.clear();
byteRead = fileChannel.read(buffer);
}
Expand All @@ -59,17 +58,27 @@ public void start(BlockingQueue<String> queue) throws IOException {
}

// read one statement by ;
private void readSQLByEOF(byte[] linesByte) throws InterruptedException {
String[] lines = new String(linesByte, StandardCharsets.UTF_8).split(";");
int length = lines.length;
for (int i = 0; i < length; i++) {
if (i == 0 && this.tempStr != null) {
private void readSQLByEOF(byte[] linesByte, int byteRead) throws InterruptedException {
String stmts = new String(linesByte, 0, byteRead, StandardCharsets.UTF_8);
boolean endWithEOF = stmts.endsWith(";");
String[] lines = stmts.split(";");
int len = lines.length;

int i = 0;
if (tempStr != null) {
if (len > 1) {
this.readQueue.put(tempStr + lines[0]);
this.tempStr = null;
} else if (i == length - 1 && !StringUtil.isEmpty(lines[i])) {
this.tempStr = lines[i];
continue;
} else {
tempStr += lines[0];
}
i = 1;
}
if (!endWithEOF) {
this.tempStr = lines[len - 1];
len = len - 1;
}
for (; i < len; i++) {
this.readQueue.put(lines[i]);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void write(String dataNode, String stmt) throws InterruptedException {
public void writeAll(String stmt) throws InterruptedException {
for (DataNodeWriter writer : dataNodeWriters.values()) {
writer.write(stmt);
writer.write(";");
}
}

Expand Down Expand Up @@ -87,8 +88,8 @@ public void run() {
while (true) {
stmt = this.queue.take();
if (stmt.equals(DumpFileReader.EOF)) {
finished.decrementAndGet();
close();
finished.decrementAndGet();
return;
}
if (this.fileChannel != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public void handle(DumpFileContext context, SQLStatement sqlStatement) throws Du
boolean isAutoIncrement = tableConfig.isAutoIncrement();
long time = new Date().getTime();
for (SQLInsertStatement.ValuesClause valueClause : insert.getValuesList()) {
// 全局序列
boolean isChanged = false;
if (isAutoIncrement) {
if (!handleIncrementColumn(context, valueClause.getValues())) continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,36 @@
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLCreateDatabaseStatement;
import com.alibaba.druid.sql.ast.statement.SQLUseStatement;

import java.util.Map;
import java.util.Set;

public class CreateDatabaseHandler implements StatementHandler {
public class SchemaHandler implements StatementHandler {

@Override
public void handle(DumpFileContext context, SQLStatement sqlStatement) throws DumpException, InterruptedException {
SQLCreateDatabaseStatement createDatabase = (SQLCreateDatabaseStatement) sqlStatement;
String schema = StringUtil.removeBackQuote(createDatabase.getName().getSimpleName());
SchemaConfig schemaConfig = DbleServer.getInstance().getConfig().getSchemas().get(schema);
String schema;
if (sqlStatement instanceof SQLUseStatement) {
SQLUseStatement use = (SQLUseStatement) sqlStatement;
schema = use.getDatabase().getSimpleName();
} else {
SQLCreateDatabaseStatement createDatabase = (SQLCreateDatabaseStatement) sqlStatement;
schema = createDatabase.getName().getSimpleName();
}
String realSchema = StringUtil.removeBackQuote(schema);
SchemaConfig schemaConfig = DbleServer.getInstance().getConfig().getSchemas().get(realSchema);
if (schemaConfig == null) {
throw new DumpException("schema[" + schema + "] doesn't exist in config.");
}
context.setSchema(schema);
context.setSchema(realSchema);
context.setDefaultDataNode(schemaConfig.getDataNode());
context.setTable(null);

Set<String> allDataNodes = DbleServer.getInstance().getConfig().getSchemas().get(schema).getAllDataNodes();
Set<String> allDataNodes = DbleServer.getInstance().getConfig().getSchemas().get(realSchema).getAllDataNodes();
Map<String, PhysicalDBNode> dbs = DbleServer.getInstance().getConfig().getDataNodes();
for (String dataNode : allDataNodes) {
context.getWriter().write(dataNode, context.getStmt().replace("`" + schema + "`", "`" + dbs.get(dataNode).getDatabase() + "`"));
context.getWriter().write(dataNode, context.getStmt().replace(schema, "`" + dbs.get(dataNode).getDatabase() + "`"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLCreateDatabaseStatement;
import com.alibaba.druid.sql.ast.statement.SQLDropTableStatement;
import com.alibaba.druid.sql.ast.statement.SQLUseStatement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement;

Expand All @@ -12,13 +13,13 @@
public final class StatementHandlerManager {

public enum Type {
INSERT, CREATE_DATABASE, CREATE_TABLE, DROP, DEFAULT
INSERT, DATABASE, CREATE_TABLE, DROP, DEFAULT
}

private static Map<Type, StatementHandler> handlers = new ConcurrentHashMap<>(8);

static {
handlers.put(Type.CREATE_DATABASE, new CreateDatabaseHandler());
handlers.put(Type.DATABASE, new SchemaHandler());
handlers.put(Type.INSERT, new InsertHandler());
handlers.put(Type.CREATE_TABLE, new CreateTableHandler());
handlers.put(Type.DROP, new DropHandler());
Expand All @@ -31,8 +32,8 @@ private StatementHandlerManager() {
public static StatementHandler getHandler(SQLStatement statement) {
if (statement instanceof SQLDropTableStatement) {
return handlers.get(Type.DROP);
} else if (statement instanceof SQLCreateDatabaseStatement) {
return handlers.get(Type.CREATE_DATABASE);
} else if (statement instanceof SQLCreateDatabaseStatement || statement instanceof SQLUseStatement) {
return handlers.get(Type.DATABASE);
} else if (statement instanceof MySqlCreateTableStatement) {
return handlers.get(Type.CREATE_TABLE);
} else if (statement instanceof MySqlInsertStatement) {
Expand Down