Skip to content

Commit

Permalink
[FLINK-14490][table-api] Introduce UnresolvedIdentifier
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Oct 29, 2019
1 parent 54e69c3 commit 0fe3fe1
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
Expand Down Expand Up @@ -233,7 +234,7 @@ public Table scan(String... tablePath) {
}

private Optional<CatalogQueryOperation> scanInternal(String... tablePath) {
ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(tablePath);
ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(tablePath));
return catalogManager.getTable(objectIdentifier)
.map(t -> new CatalogQueryOperation(objectIdentifier, t.getSchema()));
}
Expand Down Expand Up @@ -333,7 +334,8 @@ public void insertInto(Table table, String path, String... pathContinued) {
List<String> fullPath = new ArrayList<>(Arrays.asList(pathContinued));
fullPath.add(0, path);

ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(fullPath.toArray(new String[0]));
ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(
UnresolvedIdentifier.of(fullPath.toArray(new String[0])));
List<ModifyOperation> modifyOperations = Collections.singletonList(
new CatalogSinkModifyOperation(
objectIdentifier,
Expand Down Expand Up @@ -452,9 +454,10 @@ private void buffer(List<ModifyOperation> modifyOperations) {

private ObjectIdentifier getTemporaryObjectIdentifier(String name) {
return catalogManager.qualifyIdentifier(
catalogManager.getBuiltInCatalogName(),
catalogManager.getBuiltInDatabaseName(),
name);
UnresolvedIdentifier.of(
catalogManager.getBuiltInCatalogName(),
catalogManager.getBuiltInDatabaseName(),
name));
}

private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
Expand Down Expand Up @@ -517,7 +520,7 @@ private void registerTableSinkInternal(String name, TableSink<?> tableSink) {
}

private Optional<CatalogBaseTable> getCatalogTable(String... name) {
return catalogManager.getTable(catalogManager.qualifyIdentifier(name));
return catalogManager.getTable(catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(name)));
}

protected TableImpl createTable(QueryOperation tableOperation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -116,7 +115,7 @@ public Set<String> getCatalogs() {
* Gets the current catalog that will be used when resolving table path.
*
* @return the current catalog
* @see CatalogManager#qualifyIdentifier(String...)
* @see CatalogManager#qualifyIdentifier(UnresolvedIdentifier)
*/
public String getCurrentCatalog() {
return currentCatalogName;
Expand All @@ -127,7 +126,7 @@ public String getCurrentCatalog() {
*
* @param catalogName catalog name to set as current catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
* @see CatalogManager#qualifyIdentifier(String...)
* @see CatalogManager#qualifyIdentifier(UnresolvedIdentifier)
*/
public void setCurrentCatalog(String catalogName) throws CatalogNotExistException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty.");
Expand All @@ -152,7 +151,7 @@ public void setCurrentCatalog(String catalogName) throws CatalogNotExistExceptio
* Gets the current database name that will be used when resolving table path.
*
* @return the current database
* @see CatalogManager#qualifyIdentifier(String...)
* @see CatalogManager#qualifyIdentifier(UnresolvedIdentifier)
*/
public String getCurrentDatabase() {
return currentDatabaseName;
Expand All @@ -164,7 +163,7 @@ public String getCurrentDatabase() {
*
* @param databaseName database name to set as current database name
* @throws CatalogException thrown if the database doesn't exist in the current catalog
* @see CatalogManager#qualifyIdentifier(String...)
* @see CatalogManager#qualifyIdentifier(UnresolvedIdentifier)
* @see CatalogManager#setCurrentCatalog(String)
*/
public void setCurrentDatabase(String databaseName) {
Expand Down Expand Up @@ -210,7 +209,7 @@ public String getBuiltInDatabaseName() {

/**
* Retrieves a fully qualified table. If the path is not yet fully qualified use
* {@link #qualifyIdentifier(String...)} first.
* {@link #qualifyIdentifier(UnresolvedIdentifier)} first.
*
* @param objectIdentifier full path of the table to retrieve
* @return table that the path points to.
Expand All @@ -232,42 +231,16 @@ public Optional<CatalogBaseTable> getTable(ObjectIdentifier objectIdentifier) {

/**
* Returns the full name of the given table path, this name may be padded
* with current catalog/database name based on the {@code paths} length.
* with current catalog/database name based on the {@code identifier's} length.
*
* @param path Table path whose format can be "catalog.db.table", "db.table" or "table"
* @return An array of complete table path
* @param identifier an unresolved identifier
* @return a fully qualified object identifier
*/
public ObjectIdentifier qualifyIdentifier(String... path) {
if (path == null) {
throw new ValidationException("Table paths can not be null!");
}
if (path.length < 1 || path.length > 3) {
throw new ValidationException("Table paths length must be " +
"between 1(inclusive) and 3(inclusive)");
}
if (Arrays.stream(path).anyMatch(StringUtils::isNullOrWhitespaceOnly)) {
throw new ValidationException("Table paths contain null or " +
"while-space-only string");
}

String catalogName;
String dbName;
String tableName;
if (path.length == 3) {
catalogName = path[0];
dbName = path[1];
tableName = path[2];
} else if (path.length == 2) {
catalogName = getCurrentCatalog();
dbName = path[0];
tableName = path[1];
} else {
catalogName = getCurrentCatalog();
dbName = getCurrentDatabase();
tableName = path[0];
}

return ObjectIdentifier.of(catalogName, dbName, tableName);
public ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier) {
return ObjectIdentifier.of(
identifier.getCatalogName().orElseGet(this::getCurrentCatalog),
identifier.getDatabaseName().orElseGet(this::getCurrentDatabase),
identifier.getObjectName());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.catalog;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Identifier of an object, such as table, view, function or type in a catalog. This identifier
* cannot be used directly to access an object in a {@link CatalogManager}, but has to be first
* fully resolved into {@link ObjectIdentifier}.
*/
@Internal
public class UnresolvedIdentifier {

private final String catalogName;

private final String databaseName;

private final String objectName;

/**
* Constructs an {@link UnresolvedIdentifier} from an array of identifier segments.
* The length of the path must be between 1 (only object name) and 3 (fully qualified
* identifier with catalog, database and object name).
*
* @param path array of identifier segments
* @return an identifier that must be resolved before accessing an object from a {@link CatalogManager}
*/
public static UnresolvedIdentifier of(String... path) {
if (path == null) {
throw new ValidationException("Object identifier can not be null!");
}
if (path.length < 1 || path.length > 3) {
throw new ValidationException("Object identifier must consist of 1 to 3 parts.");
}
if (Arrays.stream(path).anyMatch(StringUtils::isNullOrWhitespaceOnly)) {
throw new ValidationException("Parts of the object identifier are null or whitespace-only.");
}

if (path.length == 3) {
return new UnresolvedIdentifier(path[0], path[1], path[2]);
} else if (path.length == 2) {
return new UnresolvedIdentifier(null, path[0], path[1]);
} else {
return new UnresolvedIdentifier(null, null, path[0]);
}
}

private UnresolvedIdentifier(
@Nullable String catalogName,
@Nullable String databaseName,
String objectName) {
this.catalogName = catalogName;
this.databaseName = databaseName;
this.objectName = Preconditions.checkNotNull(objectName, "Object name must not be null.");
}

public Optional<String> getCatalogName() {
return Optional.ofNullable(catalogName);
}

public Optional<String> getDatabaseName() {
return Optional.ofNullable(databaseName);
}

public String getObjectName() {
return objectName;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnresolvedIdentifier that = (UnresolvedIdentifier) o;
return catalogName.equals(that.catalogName) &&
databaseName.equals(that.databaseName) &&
objectName.equals(that.objectName);
}

@Override
public int hashCode() {
return Objects.hash(catalogName, databaseName, objectName);
}

@Override
public String toString() {
return Stream.of(
catalogName,
databaseName,
objectName
).filter(Objects::nonNull)
.map(EncodingUtils::escapeIdentifier)
.collect(Collectors.joining("."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
Expand Down Expand Up @@ -139,7 +140,8 @@ private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
properties,
tableComment);

ObjectIdentifier identifier = catalogManager.qualifyIdentifier(sqlCreateTable.fullTableName());
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);

return new CreateTableOperation(
identifier,
Expand All @@ -149,7 +151,8 @@ private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {

/** Convert DROP TABLE statement. */
private Operation convertDropTable(SqlDropTable sqlDropTable) {
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(sqlDropTable.fullTableName());
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlDropTable.fullTableName());
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);

return new DropTableOperation(identifier, sqlDropTable.getIfExists());
}
Expand All @@ -159,7 +162,8 @@ private Operation convertSqlInsert(RichSqlInsert insert) {
// get name of sink table
List<String> targetTablePath = ((SqlIdentifier) insert.getTargetTable()).names;

ObjectIdentifier identifier = catalogManager.qualifyIdentifier(targetTablePath.toArray(new String[0]));
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(targetTablePath.toArray(new String[0]));
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);

return new CatalogSinkModifyOperation(
identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => J
import org.apache.flink.table.api.java.{StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.internal.{StreamTableEnvironmentImpl => ScalaStreamTableEnvImpl}
import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog, UnresolvedIdentifier}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.delegation.{Executor, ExecutorFactory, PlannerFactory}
import org.apache.flink.table.expressions.Expression
Expand Down Expand Up @@ -956,7 +956,8 @@ class TestingTableEnvironment private(

override def insertInto(table: Table, path: String, pathContinued: String*): Unit = {
val fullPath = List(path) ++ pathContinued.toList
val identifier = catalogManager.qualifyIdentifier(fullPath: _*)
val unresolvedIdentifier = UnresolvedIdentifier.of(fullPath:_*)
val identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier)

val modifyOperations = List(new CatalogSinkModifyOperation(identifier, table.getQueryOperation))
if (isEagerOperationTranslation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.PlannerQueryOperation;
Expand Down Expand Up @@ -139,7 +140,8 @@ private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
properties,
tableComment);

ObjectIdentifier identifier = catalogManager.qualifyIdentifier(sqlCreateTable.fullTableName());
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);

return new CreateTableOperation(
identifier,
Expand All @@ -149,7 +151,8 @@ private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {

/** Convert DROP TABLE statement. */
private Operation convertDropTable(SqlDropTable sqlDropTable) {
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(sqlDropTable.fullTableName());
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlDropTable.fullTableName());
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);

return new DropTableOperation(identifier, sqlDropTable.getIfExists());
}
Expand All @@ -164,7 +167,8 @@ private Operation convertSqlInsert(RichSqlInsert insert) {
// get name of sink table
List<String> targetTablePath = ((SqlIdentifier) insert.getTargetTable()).names;

ObjectIdentifier identifier = catalogManager.qualifyIdentifier(targetTablePath.toArray(new String[0]));
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(targetTablePath.toArray(new String[0]));
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);

return new CatalogSinkModifyOperation(
identifier,
Expand Down
Loading

0 comments on commit 0fe3fe1

Please sign in to comment.