Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

turn single value to multi values in insert after splitting dump file #1493

Merged
merged 7 commits into from
Nov 12, 2019
Prev Previous commit
turn single value to multi values in insert after splitting dump file
  • Loading branch information
PanternBao committed Nov 12, 2019
commit 77750f53a2f722e4ae114e05750e91345e0a9a53
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class DumpFileConfig {
private int readQueueSize = 500;
private String writePath;
private int writeQueueSize = 500;
private int maxValues = 10000;

public String getReadFile() {
return readFile;
Expand Down Expand Up @@ -44,4 +45,11 @@ public void setWriteQueueSize(int writeQueueSize) {
this.writeQueueSize = writeQueueSize;
}

public int getMaxValues() {
return maxValues;
}

public void setMaxValues(int maxValues) {
this.maxValues = maxValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ public final class DumpFileContext {
private TableConfig tableConfig;
private int partitionColumnIndex = -1;
private int incrementColumnIndex = -1;

private boolean isSkip = false;
private boolean globalCheck = DbleServer.getInstance().getConfig().getSystem().getUseGlobleTableCheck() == 1;
private DumpFileWriter writer;
private List<ErrorMsg> errors;
private boolean needSkipError;
private DumpFileConfig config;

public DumpFileContext(DumpFileWriter writer) {
public DumpFileContext(DumpFileWriter writer, DumpFileConfig config) {
this.writer = writer;
this.errors = new ArrayList<>(10);
this.config = config;
}

public void setStmt(String stmt) {
Expand Down Expand Up @@ -80,6 +83,8 @@ public void setTable(String table) throws DumpException {
}
this.table = table;
this.isSkip = false;
this.partitionColumnIndex = -1;
this.incrementColumnIndex = -1;
this.needSkipError = false;
if (this.schema == null) {
throw new DumpException("Can't tell which schema the table[" + table + "] belongs to.");
Expand Down Expand Up @@ -138,4 +143,9 @@ public boolean isNeedSkipError() {
public void setNeedSkipError(boolean needSkipError) {
this.needSkipError = needSkipError;
}

public DumpFileConfig getConfig() {
return config;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLNonTransientException;
import java.sql.SQLSyntaxErrorException;
import java.util.concurrent.BlockingQueue;
import java.util.regex.Matcher;

/**
* @author Baofengqi
Expand All @@ -22,9 +23,9 @@ public final class DumpFileExecutor implements Runnable {
private BlockingQueue<String> queue;
private DumpFileContext context;

public DumpFileExecutor(BlockingQueue<String> queue, DumpFileWriter writer) {
public DumpFileExecutor(BlockingQueue<String> queue, DumpFileWriter writer, DumpFileConfig config) {
this.queue = queue;
this.context = new DumpFileContext(writer);
this.context = new DumpFileContext(writer, config);
}

@Override
Expand All @@ -36,19 +37,10 @@ public void run() {
stmt = queue.take();
context.setStmt(stmt);
int type = ServerParse.parse(stmt);
if (ServerParse.CREATE_DATABASE != type && context.getSchema() == null) {
writer.writeAll(stmt);
// pre handle
if (preHandle(writer, type, stmt)) {
continue;
}
// footer
if (stmt.contains("=@OLD_")) {
writer.writeAll(stmt);
continue;
}
if (stmt.equals(DumpFileReader.EOF)) {
writer.writeAll(stmt);
return;
}

SQLStatement statement = null;
// parse ddl or create database
Expand All @@ -63,19 +55,25 @@ public void run() {
if (ServerParse.INSERT == type && !context.isPushDown()) {
statement = RouteStrategyFactory.getRouteStrategy().parserSQL(stmt);
}
StatementHandler handler = StatementHandlerManager.getHandler(statement);
StatementHandler handler = StatementHandlerManager.getHandler(context, statement);
if (handler.preHandle(context, statement)) {
continue;
}
handler.handle(context, statement);

} catch (SQLNonTransientException e) {
} catch (DumpException | SQLSyntaxErrorException e) {
String currentStmt = context.getStmt().length() <= 1024 ? context.getStmt() : context.getStmt().substring(0, 1024);
context.skipCurrentContext();
context.addError(e.getMessage());
} catch (InterruptedException e) {
context.addError("current stmt[" + currentStmt + "] error,because:" + e.getMessage());
} catch (Exception e) {
LOGGER.warn("dump file executor exit, due to :" + e.getMessage());
context.addError("dump file executor exit, due to :" + e.getMessage());
try {
writer.writeAll(DumpFileReader.EOF);
} catch (InterruptedException ex) {
// ignore
LOGGER.warn("dump file executor exit.");
}
return;
}
}
}
Expand All @@ -84,4 +82,35 @@ public DumpFileContext getContext() {
return context;
}

private boolean preHandle(DumpFileWriter writer, int type, String stmt) throws InterruptedException {
// push down statement util containing schema
if (!(ServerParse.CREATE_DATABASE == type || ServerParse.USE == (0xff & type)) && context.getSchema() == null) {
writer.writeAll(stmt);
return true;
}
// skip view
if ((ServerParse.MYSQL_CMD_COMMENT == type || ServerParse.MYSQL_COMMENT == type) && skipView(stmt)) {
return true;
}
// footer
if (stmt.contains("=@OLD_")) {
writer.writeAll(stmt);
return true;
}
if (stmt.equals(DumpFileReader.EOF)) {
writer.writeAll(stmt);
return true;
}
return false;
}

private boolean skipView(String stmt) {
Matcher matcher = DumpFileReader.HINT.matcher(stmt);
if (matcher.find()) {
int type = ServerParse.parse(matcher.group(1));
return type >= ServerParse.CREATE_VIEW && type <= ServerParse.ALTER_VIEW;
}
return false;
}

}
20 changes: 10 additions & 10 deletions src/main/java/com/actiontech/dble/manager/dump/DumpFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.regex.Pattern;

/**
* @author Baofengqi
Expand All @@ -17,7 +18,8 @@ public final class DumpFileReader {

public static final Logger LOGGER = LoggerFactory.getLogger(DumpFileReader.class);
public static final String EOF = "dump file eof";
private String tempStr;
public static final Pattern HINT = Pattern.compile("/\\*!\\d+\\s+(.*)\\*/", Pattern.CASE_INSENSITIVE);
private StringBuilder tempStr = new StringBuilder(200);
private BlockingQueue<String> readQueue;
private FileChannel fileChannel;

Expand All @@ -36,7 +38,7 @@ public void start(BlockingQueue<String> queue) throws IOException {
byteRead = fileChannel.read(buffer);
}
if (tempStr != null) {
this.readQueue.put(tempStr);
this.readQueue.put(tempStr.toString());
this.tempStr = null;
}
} catch (IOException e) {
Expand Down Expand Up @@ -66,17 +68,15 @@ private void readSQLByEOF(byte[] linesByte, int byteRead) throws InterruptedExce

int i = 0;
if (tempStr != null) {
if (len > 1) {
this.readQueue.put(tempStr + lines[0]);
this.tempStr = null;
} else {
tempStr += lines[0];
}
tempStr.append(lines[0]);
i = 1;
}
if (!endWithEOF) {
this.tempStr = lines[len - 1];
if (len > 1 && !endWithEOF) {
len = len - 1;
if (tempStr != null) {
this.readQueue.put(tempStr.toString());
}
tempStr = new StringBuilder(lines[len]);
}
for (; i < len; i++) {
this.readQueue.put(lines[i]);
Expand Down
25 changes: 15 additions & 10 deletions src/main/java/com/actiontech/dble/manager/dump/DumpFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,45 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class DumpFileWriter {

public static final Logger LOGGER = LoggerFactory.getLogger(DumpFileWriter.class);
private Map<String, DataNodeWriter> dataNodeWriters = new HashMap<>();
private AtomicInteger finished;
private Map<String, DataNodeWriter> dataNodeWriters = new ConcurrentHashMap<>();
private AtomicInteger finished = new AtomicInteger(0);

public void open(String writePath, int writeQueueSize) throws IOException {
Set<String> dataNodes = DbleServer.getInstance().getConfig().getDataNodes().keySet();
this.finished = new AtomicInteger(dataNodes.size());
for (String dataNode : dataNodes) {
DataNodeWriter writer = new DataNodeWriter(writeQueueSize);
DataNodeWriter writer = new DataNodeWriter(dataNode, writeQueueSize);
writer.open(writePath + dataNode + ".dump");
dataNodeWriters.put(dataNode, writer);
}
}

public void start() {
for (DataNodeWriter writer : dataNodeWriters.values()) {
new Thread(writer).start();
new Thread(writer, "dataNode-writer-" + finished.incrementAndGet()).start();
}
}

public void write(String dataNode, String stmt, boolean isChanged) throws InterruptedException {
public void write(String dataNode, String stmt, boolean isChanged, boolean needEOF) throws InterruptedException {
DataNodeWriter writer = this.dataNodeWriters.get(dataNode);
if (writer != null) {
if (isChanged) writer.write("\n");
writer.write(stmt);
writer.write(";");
if (needEOF) writer.write(";");
}
}

public void write(String dataNode, String stmt) throws InterruptedException {
write(dataNode, stmt, false);
write(dataNode, stmt, true, true);
}

public void writeAll(String stmt) throws InterruptedException {
Expand All @@ -64,12 +63,17 @@ public boolean isFinished() {
class DataNodeWriter implements Runnable {
private FileChannel fileChannel;
private BlockingQueue<String> queue;
private String dataNode;

DataNodeWriter(int queueSize) {
DataNodeWriter(String dataNode, int queueSize) {
this.dataNode = dataNode;
this.queue = new ArrayBlockingQueue<>(queueSize);
}

void open(String fileName) throws IOException {
if (FileUtils.exists(fileName)) {
FileUtils.delete(fileName);
}
this.fileChannel = FileUtils.open(fileName, "rw");
}

Expand All @@ -79,6 +83,7 @@ void write(String stmt) throws InterruptedException {

void close() throws IOException {
this.fileChannel.close();
dataNodeWriters.remove(dataNode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,69 @@
public class CreateTableHandler extends DefaultHandler {

@Override
public void handle(DumpFileContext context, SQLStatement sqlStatement) throws DumpException, InterruptedException {
public boolean preHandle(DumpFileContext context, SQLStatement sqlStatement) throws DumpException, InterruptedException {
MySqlCreateTableStatement create = (MySqlCreateTableStatement) sqlStatement;
String tableName = StringUtil.removeBackQuote(create.getTableSource().getName().getSimpleName());
context.setTable(tableName);

boolean isFinished = preHandle(context);
if (isFinished) {
return;
if (super.preHandle(context, sqlStatement)) {
return true;
}

boolean isChanged = false;
TableConfig tableConfig = context.getTableConfig();
// check column
List<SQLTableElement> columns = create.getTableElementList();
if (tableConfig.isAutoIncrement() || tableConfig.getPartitionColumn() != null) {
// check columns for sharing column index or increment column index
checkColumns(context, columns);
// partition column check
if (tableConfig.getPartitionColumn() != null && context.getPartitionColumnIndex() == -1) {
throw new DumpException("table[" + context.getTable() + "] can't find partition column in create.");
}
}
return false;
}

@Override
public void handle(DumpFileContext context, SQLStatement sqlStatement) throws InterruptedException {
boolean isChanged = false;
List<SQLTableElement> columns = ((MySqlCreateTableStatement) sqlStatement).getTableElementList();
TableConfig tableConfig = context.getTableConfig();
if (tableConfig.isAutoIncrement()) {
// add increment column if not exists
if (tableConfig.isAutoIncrement() && context.getIncrementColumnIndex() == -1) {
if (context.getIncrementColumnIndex() == -1) {
SQLColumnDefinition column = new SQLColumnDefinition();
column.setDataType(new SQLCharacterDataType("bigint"));
column.setDefaultExpr(new SQLNullExpr());
column.setName(new SQLIdentifierExpr(tableConfig.getTrueIncrementColumn()));
columns.add(column);
context.setPartitionColumnIndex(columns.size());
isChanged = true;
}

// partition column check
if (tableConfig.getPartitionColumn() != null && context.getPartitionColumnIndex() == -1) {
throw new DumpException("table[" + context.getTable() + "] can't find partition column in create.");
} else {
SQLColumnDefinition column = (SQLColumnDefinition) columns.get(context.getIncrementColumnIndex());
if (!column.getDataType().getName().equals("bigint")) {
context.addError("data type of increment column isn't bigint, dble replaced it by itself.");
column.setDataType(new SQLCharacterDataType("bigint"));
isChanged = true;
}
}
}

// if table is global, add column
if (tableConfig.isGlobalTable() && context.isGlobalCheck()) {
// if table is global, add column
columns.add(GlobalTableUtil.createCheckColumn());
isChanged = true;
}

String stmt = isChanged ? SQLUtils.toMySqlString(create) : context.getStmt();
String stmt = isChanged ? SQLUtils.toMySqlString(sqlStatement) : context.getStmt();
for (String dataNode : tableConfig.getDataNodes()) {
context.getWriter().write(dataNode, stmt, isChanged);
context.getWriter().write(dataNode, stmt);
}
}

private void checkColumns(DumpFileContext context, List<SQLTableElement> columns) {
SQLTableElement column;
TableConfig tableConfig = context.getTableConfig();
for (int j = 0; j < columns.size(); j++) {
SQLTableElement column = columns.get(j);
column = columns.get(j);
if (!(columns.get(j) instanceof SQLColumnDefinition)) {
continue;
}
Expand Down
Loading