Skip to content

Commit

Permalink
turn single value to multi values in insert after splitting dump file (
Browse files Browse the repository at this point in the history
…#1493)

* split dump file

* turn single value to multi values in insert after splitting dump file
  • Loading branch information
PanternBao authored and yanhuqing666 committed Nov 12, 2019
1 parent efa89e4 commit d075315
Show file tree
Hide file tree
Showing 15 changed files with 361 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class DumpFileConfig {
private int readQueueSize = 500;
private String writePath;
private int writeQueueSize = 500;
private int maxValues = 10000;

public String getReadFile() {
return readFile;
Expand Down Expand Up @@ -44,4 +45,11 @@ public void setWriteQueueSize(int writeQueueSize) {
this.writeQueueSize = writeQueueSize;
}

public int getMaxValues() {
return maxValues;
}

public void setMaxValues(int maxValues) {
this.maxValues = maxValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ public final class DumpFileContext {
private TableConfig tableConfig;
private int partitionColumnIndex = -1;
private int incrementColumnIndex = -1;

private boolean isSkip = false;
private boolean globalCheck = DbleServer.getInstance().getConfig().getSystem().getUseGlobleTableCheck() == 1;
private DumpFileWriter writer;
private List<ErrorMsg> errors;
private boolean needSkipError;
private DumpFileConfig config;

public DumpFileContext(DumpFileWriter writer) {
public DumpFileContext(DumpFileWriter writer, DumpFileConfig config) {
this.writer = writer;
this.errors = new ArrayList<>(10);
this.config = config;
}

public void setStmt(String stmt) {
Expand Down Expand Up @@ -80,6 +83,8 @@ public void setTable(String table) throws DumpException {
}
this.table = table;
this.isSkip = false;
this.partitionColumnIndex = -1;
this.incrementColumnIndex = -1;
this.needSkipError = false;
if (this.schema == null) {
throw new DumpException("Can't tell which schema the table[" + table + "] belongs to.");
Expand Down Expand Up @@ -138,4 +143,9 @@ public boolean isNeedSkipError() {
public void setNeedSkipError(boolean needSkipError) {
this.needSkipError = needSkipError;
}

public DumpFileConfig getConfig() {
return config;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLNonTransientException;
import java.sql.SQLSyntaxErrorException;
import java.util.concurrent.BlockingQueue;
import java.util.regex.Matcher;

/**
* @author Baofengqi
Expand All @@ -22,9 +23,9 @@ public final class DumpFileExecutor implements Runnable {
private BlockingQueue<String> queue;
private DumpFileContext context;

public DumpFileExecutor(BlockingQueue<String> queue, DumpFileWriter writer) {
public DumpFileExecutor(BlockingQueue<String> queue, DumpFileWriter writer, DumpFileConfig config) {
this.queue = queue;
this.context = new DumpFileContext(writer);
this.context = new DumpFileContext(writer, config);
}

@Override
Expand All @@ -36,19 +37,10 @@ public void run() {
stmt = queue.take();
context.setStmt(stmt);
int type = ServerParse.parse(stmt);
if (ServerParse.CREATE_DATABASE != type && context.getSchema() == null) {
writer.writeAll(stmt);
// pre handle
if (preHandle(writer, type, stmt)) {
continue;
}
// footer
if (stmt.contains("=@OLD_")) {
writer.writeAll(stmt);
continue;
}
if (stmt.equals(DumpFileReader.EOF)) {
writer.writeAll(stmt);
return;
}

SQLStatement statement = null;
// parse ddl or create database
Expand All @@ -63,19 +55,25 @@ public void run() {
if (ServerParse.INSERT == type && !context.isPushDown()) {
statement = RouteStrategyFactory.getRouteStrategy().parserSQL(stmt);
}
StatementHandler handler = StatementHandlerManager.getHandler(statement);
StatementHandler handler = StatementHandlerManager.getHandler(context, statement);
if (handler.preHandle(context, statement)) {
continue;
}
handler.handle(context, statement);

} catch (SQLNonTransientException e) {
} catch (DumpException | SQLSyntaxErrorException e) {
String currentStmt = context.getStmt().length() <= 1024 ? context.getStmt() : context.getStmt().substring(0, 1024);
context.skipCurrentContext();
context.addError(e.getMessage());
} catch (InterruptedException e) {
context.addError("current stmt[" + currentStmt + "] error,because:" + e.getMessage());
} catch (Exception e) {
LOGGER.warn("dump file executor exit, due to :" + e.getMessage());
context.addError("dump file executor exit, due to :" + e.getMessage());
try {
writer.writeAll(DumpFileReader.EOF);
} catch (InterruptedException ex) {
// ignore
LOGGER.warn("dump file executor exit.");
}
return;
}
}
}
Expand All @@ -84,4 +82,35 @@ public DumpFileContext getContext() {
return context;
}

private boolean preHandle(DumpFileWriter writer, int type, String stmt) throws InterruptedException {
// push down statement util containing schema
if (!(ServerParse.CREATE_DATABASE == type || ServerParse.USE == (0xff & type)) && context.getSchema() == null) {
writer.writeAll(stmt);
return true;
}
// skip view
if ((ServerParse.MYSQL_CMD_COMMENT == type || ServerParse.MYSQL_COMMENT == type) && skipView(stmt)) {
return true;
}
// footer
if (stmt.contains("=@OLD_")) {
writer.writeAll(stmt);
return true;
}
if (stmt.equals(DumpFileReader.EOF)) {
writer.writeAll(stmt);
return true;
}
return false;
}

private boolean skipView(String stmt) {
Matcher matcher = DumpFileReader.HINT.matcher(stmt);
if (matcher.find()) {
int type = ServerParse.parse(matcher.group(1));
return type >= ServerParse.CREATE_VIEW && type <= ServerParse.ALTER_VIEW;
}
return false;
}

}
20 changes: 10 additions & 10 deletions src/main/java/com/actiontech/dble/manager/dump/DumpFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.regex.Pattern;

/**
* @author Baofengqi
Expand All @@ -17,7 +18,8 @@ public final class DumpFileReader {

public static final Logger LOGGER = LoggerFactory.getLogger(DumpFileReader.class);
public static final String EOF = "dump file eof";
private String tempStr;
public static final Pattern HINT = Pattern.compile("/\\*!\\d+\\s+(.*)\\*/", Pattern.CASE_INSENSITIVE);
private StringBuilder tempStr = new StringBuilder(200);
private BlockingQueue<String> readQueue;
private FileChannel fileChannel;

Expand All @@ -36,7 +38,7 @@ public void start(BlockingQueue<String> queue) throws IOException {
byteRead = fileChannel.read(buffer);
}
if (tempStr != null) {
this.readQueue.put(tempStr);
this.readQueue.put(tempStr.toString());
this.tempStr = null;
}
} catch (IOException e) {
Expand Down Expand Up @@ -66,17 +68,15 @@ private void readSQLByEOF(byte[] linesByte, int byteRead) throws InterruptedExce

int i = 0;
if (tempStr != null) {
if (len > 1) {
this.readQueue.put(tempStr + lines[0]);
this.tempStr = null;
} else {
tempStr += lines[0];
}
tempStr.append(lines[0]);
i = 1;
}
if (!endWithEOF) {
this.tempStr = lines[len - 1];
if (len > 1 && !endWithEOF) {
len = len - 1;
if (tempStr != null) {
this.readQueue.put(tempStr.toString());
}
tempStr = new StringBuilder(lines[len]);
}
for (; i < len; i++) {
this.readQueue.put(lines[i]);
Expand Down
25 changes: 15 additions & 10 deletions src/main/java/com/actiontech/dble/manager/dump/DumpFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,45 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class DumpFileWriter {

public static final Logger LOGGER = LoggerFactory.getLogger(DumpFileWriter.class);
private Map<String, DataNodeWriter> dataNodeWriters = new HashMap<>();
private AtomicInteger finished;
private Map<String, DataNodeWriter> dataNodeWriters = new ConcurrentHashMap<>();
private AtomicInteger finished = new AtomicInteger(0);

public void open(String writePath, int writeQueueSize) throws IOException {
Set<String> dataNodes = DbleServer.getInstance().getConfig().getDataNodes().keySet();
this.finished = new AtomicInteger(dataNodes.size());
for (String dataNode : dataNodes) {
DataNodeWriter writer = new DataNodeWriter(writeQueueSize);
DataNodeWriter writer = new DataNodeWriter(dataNode, writeQueueSize);
writer.open(writePath + dataNode + ".dump");
dataNodeWriters.put(dataNode, writer);
}
}

public void start() {
for (DataNodeWriter writer : dataNodeWriters.values()) {
new Thread(writer).start();
new Thread(writer, "dataNode-writer-" + finished.incrementAndGet()).start();
}
}

public void write(String dataNode, String stmt, boolean isChanged) throws InterruptedException {
public void write(String dataNode, String stmt, boolean isChanged, boolean needEOF) throws InterruptedException {
DataNodeWriter writer = this.dataNodeWriters.get(dataNode);
if (writer != null) {
if (isChanged) writer.write("\n");
writer.write(stmt);
writer.write(";");
if (needEOF) writer.write(";");
}
}

public void write(String dataNode, String stmt) throws InterruptedException {
write(dataNode, stmt, false);
write(dataNode, stmt, true, true);
}

public void writeAll(String stmt) throws InterruptedException {
Expand All @@ -64,12 +63,17 @@ public boolean isFinished() {
class DataNodeWriter implements Runnable {
private FileChannel fileChannel;
private BlockingQueue<String> queue;
private String dataNode;

DataNodeWriter(int queueSize) {
DataNodeWriter(String dataNode, int queueSize) {
this.dataNode = dataNode;
this.queue = new ArrayBlockingQueue<>(queueSize);
}

void open(String fileName) throws IOException {
if (FileUtils.exists(fileName)) {
FileUtils.delete(fileName);
}
this.fileChannel = FileUtils.open(fileName, "rw");
}

Expand All @@ -79,6 +83,7 @@ void write(String stmt) throws InterruptedException {

void close() throws IOException {
this.fileChannel.close();
dataNodeWriters.remove(dataNode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,69 @@
public class CreateTableHandler extends DefaultHandler {

@Override
public void handle(DumpFileContext context, SQLStatement sqlStatement) throws DumpException, InterruptedException {
public boolean preHandle(DumpFileContext context, SQLStatement sqlStatement) throws DumpException, InterruptedException {
MySqlCreateTableStatement create = (MySqlCreateTableStatement) sqlStatement;
String tableName = StringUtil.removeBackQuote(create.getTableSource().getName().getSimpleName());
context.setTable(tableName);

boolean isFinished = preHandle(context);
if (isFinished) {
return;
if (super.preHandle(context, sqlStatement)) {
return true;
}

boolean isChanged = false;
TableConfig tableConfig = context.getTableConfig();
// check column
List<SQLTableElement> columns = create.getTableElementList();
if (tableConfig.isAutoIncrement() || tableConfig.getPartitionColumn() != null) {
// check columns for sharing column index or increment column index
checkColumns(context, columns);
// partition column check
if (tableConfig.getPartitionColumn() != null && context.getPartitionColumnIndex() == -1) {
throw new DumpException("table[" + context.getTable() + "] can't find partition column in create.");
}
}
return false;
}

@Override
public void handle(DumpFileContext context, SQLStatement sqlStatement) throws InterruptedException {
boolean isChanged = false;
List<SQLTableElement> columns = ((MySqlCreateTableStatement) sqlStatement).getTableElementList();
TableConfig tableConfig = context.getTableConfig();
if (tableConfig.isAutoIncrement()) {
// add increment column if not exists
if (tableConfig.isAutoIncrement() && context.getIncrementColumnIndex() == -1) {
if (context.getIncrementColumnIndex() == -1) {
SQLColumnDefinition column = new SQLColumnDefinition();
column.setDataType(new SQLCharacterDataType("bigint"));
column.setDefaultExpr(new SQLNullExpr());
column.setName(new SQLIdentifierExpr(tableConfig.getTrueIncrementColumn()));
columns.add(column);
context.setPartitionColumnIndex(columns.size());
isChanged = true;
}

// partition column check
if (tableConfig.getPartitionColumn() != null && context.getPartitionColumnIndex() == -1) {
throw new DumpException("table[" + context.getTable() + "] can't find partition column in create.");
} else {
SQLColumnDefinition column = (SQLColumnDefinition) columns.get(context.getIncrementColumnIndex());
if (!column.getDataType().getName().equals("bigint")) {
context.addError("data type of increment column isn't bigint, dble replaced it by itself.");
column.setDataType(new SQLCharacterDataType("bigint"));
isChanged = true;
}
}
}

// if table is global, add column
if (tableConfig.isGlobalTable() && context.isGlobalCheck()) {
// if table is global, add column
columns.add(GlobalTableUtil.createCheckColumn());
isChanged = true;
}

String stmt = isChanged ? SQLUtils.toMySqlString(create) : context.getStmt();
String stmt = isChanged ? SQLUtils.toMySqlString(sqlStatement) : context.getStmt();
for (String dataNode : tableConfig.getDataNodes()) {
context.getWriter().write(dataNode, stmt, isChanged);
context.getWriter().write(dataNode, stmt);
}
}

private void checkColumns(DumpFileContext context, List<SQLTableElement> columns) {
SQLTableElement column;
TableConfig tableConfig = context.getTableConfig();
for (int j = 0; j < columns.size(); j++) {
SQLTableElement column = columns.get(j);
column = columns.get(j);
if (!(columns.get(j) instanceof SQLColumnDefinition)) {
continue;
}
Expand Down
Loading

0 comments on commit d075315

Please sign in to comment.