Skip to content

Commit

Permalink
[FLINK-14490] Use ObjectIdentifier in TableOperations
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Oct 29, 2019
1 parent b7cd6a9 commit 44c85e3
Show file tree
Hide file tree
Showing 22 changed files with 348 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,10 @@ public void insertInto(Table table, String path, String... pathContinued) {
List<String> fullPath = new ArrayList<>(Arrays.asList(pathContinued));
fullPath.add(0, path);

ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(fullPath.toArray(new String[0]));
List<ModifyOperation> modifyOperations = Collections.singletonList(
new CatalogSinkModifyOperation(
fullPath,
objectIdentifier,
table.getQueryOperation()));

if (isEagerOperationTranslation()) {
Expand Down Expand Up @@ -366,15 +367,15 @@ public void sqlUpdate(String stmt) {
}
} else if (operation instanceof CreateTableOperation) {
CreateTableOperation createTableOperation = (CreateTableOperation) operation;
ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(createTableOperation.getTablePath());
catalogManager.createTable(
createTableOperation.getCatalogTable(),
objectIdentifier,
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
} else if (operation instanceof DropTableOperation) {
DropTableOperation dropTableOperation = (DropTableOperation) operation;
ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(dropTableOperation.getTableName());
catalogManager.dropTable(objectIdentifier, dropTableOperation.isIfExists());
catalogManager.dropTable(
dropTableOperation.getTableIdentifier(),
dropTableOperation.isIfExists());
} else {
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@
@Internal
public class CatalogQueryOperation implements QueryOperation {

private final ObjectIdentifier objectIdentifier;
private final ObjectIdentifier tableIdentifier;
private final TableSchema tableSchema;

public CatalogQueryOperation(ObjectIdentifier objectIdentifier, TableSchema tableSchema) {
this.objectIdentifier = objectIdentifier;
public CatalogQueryOperation(ObjectIdentifier tableIdentifier, TableSchema tableSchema) {
this.tableIdentifier = tableIdentifier;
this.tableSchema = tableSchema;
}

public ObjectIdentifier getObjectIdentifier() {
return objectIdentifier;
public ObjectIdentifier getTableIdentifier() {
return tableIdentifier;
}

@Override
Expand All @@ -53,7 +53,7 @@ public TableSchema getTableSchema() {
@Override
public String asSummaryString() {
Map<String, Object> args = new LinkedHashMap<>();
args.put("identifier", objectIdentifier);
args.put("identifier", tableIdentifier);
args.put("fields", tableSchema.getFieldNames());

return OperationUtils.formatWithChildren("CatalogTable", args, getChildren(), Operation::asSummaryString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ObjectIdentifier;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -33,27 +33,28 @@
@Internal
public class CatalogSinkModifyOperation implements ModifyOperation {

private final ObjectIdentifier tableIdentifier;
private final Map<String, String> staticPartitions;
private final List<String> tablePath;
private final QueryOperation child;
private final boolean overwrite;

public CatalogSinkModifyOperation(List<String> tablePath, QueryOperation child) {
this(tablePath, child, new HashMap<>(), false);
public CatalogSinkModifyOperation(ObjectIdentifier tableIdentifier, QueryOperation child) {
this(tableIdentifier, child, new HashMap<>(), false);
}

public CatalogSinkModifyOperation(List<String> tablePath,
public CatalogSinkModifyOperation(
ObjectIdentifier tableIdentifier,
QueryOperation child,
Map<String, String> staticPartitions,
boolean overwrite) {
this.tablePath = tablePath;
this.tableIdentifier = tableIdentifier;
this.child = child;
this.staticPartitions = staticPartitions;
this.overwrite = overwrite;
}

public List<String> getTablePath() {
return tablePath;
public ObjectIdentifier getTableIdentifier() {
return tableIdentifier;
}

public Map<String, String> getStaticPartitions() {
Expand All @@ -77,7 +78,7 @@ public <T> T accept(ModifyOperationVisitor<T> visitor) {
@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("tablePath", tablePath);
params.put("identifier", tableIdentifier);
params.put("staticPartitions", staticPartitions);
params.put("overwrite", overwrite);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

Expand All @@ -30,14 +31,15 @@
* Operation to describe a CREATE TABLE statement.
*/
public class CreateTableOperation implements CreateOperation {
private final String[] tablePath;
private final ObjectIdentifier tableIdentifier;
private CatalogTable catalogTable;
private boolean ignoreIfExists;

public CreateTableOperation(String[] tablePath,
public CreateTableOperation(
ObjectIdentifier tableIdentifier,
CatalogTable catalogTable,
boolean ignoreIfExists) {
this.tablePath = tablePath;
this.tableIdentifier = tableIdentifier;
this.catalogTable = catalogTable;
this.ignoreIfExists = ignoreIfExists;
}
Expand All @@ -46,8 +48,8 @@ public CatalogTable getCatalogTable() {
return catalogTable;
}

public String[] getTablePath() {
return tablePath;
public ObjectIdentifier getTableIdentifier() {
return tableIdentifier;
}

public boolean isIgnoreIfExists() {
Expand All @@ -58,7 +60,7 @@ public boolean isIgnoreIfExists() {
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("catalogTable", catalogTable.toProperties());
params.put("tablePath", tablePath);
params.put("identifier", tableIdentifier);
params.put("ignoreIfExists", ignoreIfExists);

return OperationUtils.formatWithChildren(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

Expand All @@ -29,16 +30,16 @@
* Operation to describe a DROP TABLE statement.
*/
public class DropTableOperation implements DropOperation {
private final String[] tableName;
private final ObjectIdentifier tableIdentifier;
private final boolean ifExists;

public DropTableOperation(String[] tableName, boolean ifExists) {
this.tableName = tableName;
public DropTableOperation(ObjectIdentifier tableIdentifier, boolean ifExists) {
this.tableIdentifier = tableIdentifier;
this.ifExists = ifExists;
}

public String[] getTableName() {
return this.tableName;
public ObjectIdentifier getTableIdentifier() {
return this.tableIdentifier;
}

public boolean isIfExists() {
Expand All @@ -48,7 +49,7 @@ public boolean isIfExists() {
@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("tableName", tableName);
params.put("identifier", tableIdentifier);
params.put("IfExists", ifExists);

return OperationUtils.formatWithChildren(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.flink.sql.parser.dml.RichSqlInsert;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
Expand Down Expand Up @@ -60,28 +62,36 @@
*/
public class SqlToOperationConverter {
private FlinkPlannerImpl flinkPlanner;
private CatalogManager catalogManager;

//~ Constructors -----------------------------------------------------------

private SqlToOperationConverter(FlinkPlannerImpl flinkPlanner) {
private SqlToOperationConverter(
FlinkPlannerImpl flinkPlanner,
CatalogManager catalogManager) {
this.flinkPlanner = flinkPlanner;
this.catalogManager = catalogManager;
}

/**
* This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different
* SqlNode will have it's implementation in the #convert(type) method whose 'type' argument
* is subclass of {@code SqlNode}.
*
* @param flinkPlanner FlinkPlannerImpl to convertCreateTable sql node to rel node
* @param sqlNode SqlNode to execute on
* @param flinkPlanner FlinkPlannerImpl to convertCreateTable sql node to rel node
* @param catalogManager CatalogManager to resolve full path for operations
* @param sqlNode SqlNode to execute on
*/
public static Operation convert(FlinkPlannerImpl flinkPlanner, SqlNode sqlNode) {
public static Operation convert(
FlinkPlannerImpl flinkPlanner,
CatalogManager catalogManager,
SqlNode sqlNode) {
// validate the query
final SqlNode validated = flinkPlanner.validate(sqlNode);
SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager);
if (validated instanceof SqlCreateTable) {
return converter.convertCreateTable((SqlCreateTable) validated);
} if (validated instanceof SqlDropTable) {
} else if (validated instanceof SqlDropTable) {
return converter.convertDropTable((SqlDropTable) validated);
} else if (validated instanceof RichSqlInsert) {
return converter.convertSqlInsert((RichSqlInsert) validated);
Expand Down Expand Up @@ -128,22 +138,34 @@ private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
partitionKeys,
properties,
tableComment);
return new CreateTableOperation(sqlCreateTable.fullTableName(), catalogTable,

ObjectIdentifier identifier = catalogManager.qualifyIdentifier(sqlCreateTable.fullTableName());

return new CreateTableOperation(
identifier,
catalogTable,
sqlCreateTable.isIfNotExists());
}

/** Convert DROP TABLE statement. */
private Operation convertDropTable(SqlDropTable sqlDropTable) {
return new DropTableOperation(sqlDropTable.fullTableName(), sqlDropTable.getIfExists());
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(sqlDropTable.fullTableName());

return new DropTableOperation(identifier, sqlDropTable.getIfExists());
}

/** Convert insert into statement. */
private Operation convertSqlInsert(RichSqlInsert insert) {
// get name of sink table
List<String> targetTablePath = ((SqlIdentifier) insert.getTargetTable()).names;

ObjectIdentifier identifier = catalogManager.qualifyIdentifier(targetTablePath.toArray(new String[0]));

return new CatalogSinkModifyOperation(
targetTablePath,
(PlannerQueryOperation) SqlToOperationConverter.convert(flinkPlanner,
identifier,
(PlannerQueryOperation) SqlToOperationConverter.convert(
flinkPlanner,
catalogManager,
insert.getSource()),
insert.getStaticPartitionKVs(),
insert.isOverwrite());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public <U> RelNode visit(CalculatedQueryOperation<U> calculatedTable) {

@Override
public RelNode visit(CatalogQueryOperation catalogTable) {
ObjectIdentifier objectIdentifier = catalogTable.getObjectIdentifier();
ObjectIdentifier objectIdentifier = catalogTable.getTableIdentifier();
return relBuilder.scan(
objectIdentifier.getCatalogName(),
objectIdentifier.getDatabaseName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ abstract class PlannerBase(
val parsed = planner.parse(stmt)
parsed match {
case insert: RichSqlInsert =>
List(SqlToOperationConverter.convert(planner, insert))
List(SqlToOperationConverter.convert(planner, catalogManager, insert))
case query if query.getKind.belongsTo(SqlKind.QUERY) =>
List(SqlToOperationConverter.convert(planner, query))
List(SqlToOperationConverter.convert(planner, catalogManager, query))
case ddl if ddl.getKind.belongsTo(SqlKind.DDL) =>
List(SqlToOperationConverter.convert(planner, ddl))
List(SqlToOperationConverter.convert(planner, catalogManager, ddl))
case _ =>
throw new TableException(s"Unsupported query: $stmt")
}
Expand Down Expand Up @@ -174,7 +174,7 @@ abstract class PlannerBase(

case catalogSink: CatalogSinkModifyOperation =>
val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
val identifier = catalogManager.qualifyIdentifier(catalogSink.getTablePath: _*)
val identifier = catalogSink.getTableIdentifier
getTableSink(identifier).map { case (table, sink) =>
TableSinkUtils.validateSink(catalogSink, identifier, sink, table.getPartitionKeys)
sink match {
Expand All @@ -190,10 +190,11 @@ abstract class PlannerBase(
s"${classOf[OverwritableTableSink].getSimpleName} but actually got " +
sink.getClass.getName)
}
LogicalSink.create(input, sink, catalogSink.getTablePath.mkString("."), table)
LogicalSink.create(input, sink, identifier.toString, table)
} match {
case Some(sinkRel) => sinkRel
case None => throw new TableException(s"Sink ${catalogSink.getTablePath} does not exists")
case None =>
throw new TableException(s"Sink ${catalogSink.getTableIdentifier} does not exists")
}

case outputConversion: OutputConversionModifyOperation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testCreateTableWithMinusInOptionKey() {
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = planner.parse(sql);
assert node instanceof SqlCreateTable;
Operation operation = SqlToOperationConverter.convert(planner, node);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node);
assert operation instanceof CreateTableOperation;
CreateTableOperation op = (CreateTableOperation) operation;
CatalogTable catalogTable = op.getCatalogTable();
Expand Down Expand Up @@ -325,7 +325,7 @@ public void testCreateTableWithFullDataTypes() {
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = planner.parse(sql);
assert node instanceof SqlCreateTable;
Operation operation = SqlToOperationConverter.convert(planner, node);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node);
TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema();
Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray();
assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes());
Expand All @@ -347,7 +347,7 @@ private static TestItem createTestItem(Object... args) {

private Operation parse(String sql, FlinkPlannerImpl planner) {
SqlNode node = planner.parse(sql);
return SqlToOperationConverter.convert(planner, node);
return SqlToOperationConverter.convert(planner, catalogManager, node);
}

private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
Expand Down
Loading

0 comments on commit 44c85e3

Please sign in to comment.