Skip to content

Commit

Permalink
[FLINK-14490][table-planner] Extract CalciteParser from FlinkPlannerImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Oct 29, 2019
1 parent 44c85e3 commit 54e69c3
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 108 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.calcite;

import org.apache.flink.table.api.SqlParserException;

import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;

/**
* Thin wrapper around {@link SqlParser} that does exception conversion and {@link SqlNode} casting.
*/
public class CalciteParser {
private final SqlParser.Config config;

public CalciteParser(SqlParser.Config config) {
this.config = config;
}

/**
* Parses a SQL statement into a {@link SqlNode}. The {@link SqlNode} is not yet validated.
*
* @param sql a sql string to parse
* @return a parsed sql node
* @throws SqlParserException if an exception is thrown when parsing the statement
*/
public SqlNode parse(String sql) {
try {
SqlParser parser = SqlParser.create(sql, config);
return parser.parseStmt();
} catch (SqlParseException e) {
throw new SqlParserException("SQL parse failed. " + e.getMessage());
}
}

/**
* Parses a SQL string as an identifier into a {@link SqlIdentifier}.
*
* @param identifier a sql string to parse as an identifier
* @return a parsed sql node
* @throws SqlParserException if an exception is thrown when parsing the identifier
*/
public SqlIdentifier parseIdentifier(String identifier) {
try {
SqlParser parser = SqlParser.create(identifier, config);
SqlNode sqlNode = parser.parseExpression();
return (SqlIdentifier) sqlNode;
} catch (Exception e) {
throw new SqlParserException(String.format(
"Invalid SQL identifier %s. All SQL keywords must be escaped.", identifier));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.planner.calcite.CalciteConfig;
import org.apache.flink.table.planner.calcite.CalciteConfig$;
import org.apache.flink.table.planner.calcite.CalciteParser;
import org.apache.flink.table.planner.calcite.FlinkContextImpl;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
Expand Down Expand Up @@ -150,6 +151,15 @@ public FlinkPlannerImpl createFlinkPlanner(String currentCatalog, String current
cluster);
}

/**
* Creates a configured instance of {@link CalciteParser}.
*
* @return configured calcite parser
*/
public CalciteParser createCalciteParser() {
return new CalciteParser(getSqlParserConfig());
}

private FlinkCalciteCatalogReader createCatalogReader(
boolean lenientCaseSensitivity,
String currentCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@
package org.apache.flink.table.planner.calcite

import org.apache.flink.sql.parser.ExtendedSqlNode
import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
import org.apache.flink.table.api.{TableException, ValidationException}

import com.google.common.collect.ImmutableList
import org.apache.calcite.config.NullCollation
import org.apache.calcite.plan.RelOptTable.ViewExpander
import org.apache.calcite.plan._
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => CSqlParseException}
import org.apache.calcite.sql.validate.SqlValidator
import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
Expand All @@ -54,24 +52,13 @@ class FlinkPlannerImpl(
cluster: RelOptCluster) {

val operatorTable: SqlOperatorTable = config.getOperatorTable
/** Holds the trait definitions to be registered with planner. May be null. */
val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
val parserConfig: SqlParser.Config = config.getParserConfig
val parser: CalciteParser = new CalciteParser(config.getParserConfig)
val convertletTable: SqlRexConvertletTable = config.getConvertletTable
val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig

var validator: FlinkCalciteSqlValidator = _
var root: RelRoot = _

private def ready() {
if (this.traitDefs != null) {
cluster.getPlanner.clearRelTraitDefs()
for (traitDef <- this.traitDefs) {
cluster.getPlanner.addRelTraitDef(traitDef)
}
}
}

def getCompletionHints(sql: String, cursor: Int): Array[String] = {
val advisorValidator = new SqlAdvisorValidator(
operatorTable,
Expand Down Expand Up @@ -109,18 +96,6 @@ class FlinkPlannerImpl(
validator
}

def parse(sql: String): SqlNode = {
try {
ready()
val parser: SqlParser = SqlParser.create(sql, parserConfig)
val sqlNode: SqlNode = parser.parseStmt
sqlNode
} catch {
case e: CSqlParseException =>
throw new SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
}
}

def validate(sqlNode: SqlNode): SqlNode = {
val catalogReader = catalogReaderSupplier.apply(false)
// do pre-validate rewrite.
Expand Down Expand Up @@ -179,15 +154,7 @@ class FlinkPlannerImpl(
schemaPath: util.List[String],
viewPath: util.List[String]): RelRoot = {

val parser: SqlParser = SqlParser.create(queryString, parserConfig)
var sqlNode: SqlNode = null
try {
sqlNode = parser.parseQuery
}
catch {
case e: CSqlParseException =>
throw new SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
}
val sqlNode: SqlNode = parser.parse(queryString)
val catalogReader: CalciteCatalogReader = catalogReaderSupplier.apply(false)
.withSchemaPath(schemaPath)
val validator: SqlValidator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ abstract class PlannerBase(

executor.asInstanceOf[ExecutorBase].setTableConfig(config)

private val plannerContext: PlannerContext =
@VisibleForTesting
private[flink] val plannerContext: PlannerContext =
new PlannerContext(
config,
functionCatalog,
Expand Down Expand Up @@ -122,8 +123,12 @@ abstract class PlannerBase(

override def parse(stmt: String): util.List[Operation] = {
val planner = createFlinkPlanner
// we do not cache the parser in order to use the most up to
// date configuration. Users might change parser configuration in TableConfig in between
// parsing statements
val parser = plannerContext.createCalciteParser()
// parse the sql query
val parsed = planner.parse(stmt)
val parsed = parser.parse(stmt)
parsed match {
case insert: RichSqlInsert =>
List(SqlToOperationConverter.convert(planner, catalogManager, insert))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.calcite.CalciteParser;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
import org.apache.flink.table.planner.delegation.PlannerContext;
Expand Down Expand Up @@ -119,7 +120,8 @@ public void testCreateTable() {
" 'kafka.topic' = 'log.test'\n" +
")\n";
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
Operation operation = parse(sql, planner);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
Operation operation = parse(sql, planner, parser);
assert operation instanceof CreateTableOperation;
CreateTableOperation op = (CreateTableOperation) operation;
CatalogTable catalogTable = op.getCatalogTable();
Expand All @@ -137,6 +139,7 @@ public void testCreateTable() {
@Test(expected = SqlConversionException.class)
public void testCreateTableWithPkUniqueKeys() {
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint,\n" +
" b varchar, \n" +
Expand All @@ -150,7 +153,7 @@ public void testCreateTableWithPkUniqueKeys() {
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
")\n";
parse(sql, planner);
parse(sql, planner, parser);
}

@Test
Expand All @@ -165,7 +168,8 @@ public void testCreateTableWithMinusInOptionKey() {
" 'a.b-c-d.e-f1231.g' = 'ada',\n" +
" 'a.b-c-d.*' = 'adad')\n";
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = planner.parse(sql);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = parser.parse(sql);
assert node instanceof SqlCreateTable;
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node);
assert operation instanceof CreateTableOperation;
Expand All @@ -187,7 +191,8 @@ public void testCreateTableWithMinusInOptionKey() {
public void testSqlInsertWithStaticPartition() {
final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
Operation operation = parse(sql, planner);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.HIVE);
Operation operation = parse(sql, planner, parser);
assert operation instanceof CatalogSinkModifyOperation;
CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
final Map<String, String> expectedStaticPartitions = new HashMap<>();
Expand Down Expand Up @@ -323,7 +328,8 @@ public void testCreateTableWithFullDataTypes() {
}
final String sql = buffer.toString();
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = planner.parse(sql);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = parser.parse(sql);
assert node instanceof SqlCreateTable;
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node);
TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema();
Expand All @@ -345,8 +351,8 @@ private static TestItem createTestItem(Object... args) {
return testItem;
}

private Operation parse(String sql, FlinkPlannerImpl planner) {
SqlNode node = planner.parse(sql);
private Operation parse(String sql, FlinkPlannerImpl planner, CalciteParser parser) {
SqlNode node = parser.parse(sql);
return SqlToOperationConverter.convert(planner, catalogManager, node);
}

Expand All @@ -356,6 +362,11 @@ private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
catalogManager.getCurrentDatabase());
}

private CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) {
tableConfig.setSqlDialect(sqlDialect);
return plannerContext.createCalciteParser();
}

//~ Inner Classes ----------------------------------------------------------

private static class TestItem {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ abstract class ExpressionTestBase {
private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
private val relBuilder = planner.getRelBuilder
private val calcitePlanner = planner.createFlinkPlanner
private val parser = planner.plannerContext.createCalciteParser()

// setup test utils
private val tableName = "testTable"
Expand Down Expand Up @@ -179,7 +180,7 @@ abstract class ExpressionTestBase {

private def addSqlTestExpr(sqlExpr: String, expected: String): Unit = {
// create RelNode from SQL expression
val parsed = calcitePlanner.parse(s"SELECT $sqlExpr FROM $tableName")
val parsed = parser.parse(s"SELECT $sqlExpr FROM $tableName")
val validated = calcitePlanner.validate(parsed)
val converted = calcitePlanner.rel(validated).rel
addTestExpr(converted, expected, sqlExpr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ abstract class PatternTranslatorTestBase extends TestLogger {
private val tableName = "testTable"
private val context = prepareContext(testTableTypeInfo)
private val calcitePlanner: FlinkPlannerImpl = context._2.createFlinkPlanner
private val parser = context._2.plannerContext.createCalciteParser()

private def prepareContext(typeInfo: TypeInformation[Row])
: (RelBuilder, PlannerBase, StreamExecutionEnvironment) = {
Expand All @@ -80,7 +81,7 @@ abstract class PatternTranslatorTestBase extends TestLogger {

def verifyPattern(matchRecognize: String, expected: Pattern[BaseRow, _ <: BaseRow]): Unit = {
// create RelNode from SQL expression
val parsed = calcitePlanner.parse(
val parsed = parser.parse(
s"""
|SELECT *
|FROM $tableName
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.calcite;

import org.apache.flink.table.api.SqlParserException;

import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;

/**
* Thin wrapper around {@link SqlParser} that does exception conversion and {@link SqlNode} casting.
*/
public class CalciteParser {
private final SqlParser.Config config;

public CalciteParser(SqlParser.Config config) {
this.config = config;
}

/**
* Parses a SQL statement into a {@link SqlNode}. The {@link SqlNode} is not yet validated.
*
* @param sql a sql string to parse
* @return a parsed sql node
* @throws SqlParserException if an exception is thrown when parsing the statement
*/
public SqlNode parse(String sql) {
try {
SqlParser parser = SqlParser.create(sql, config);
return parser.parseStmt();
} catch (SqlParseException e) {
throw new SqlParserException("SQL parse failed. " + e.getMessage());
}
}

/**
* Parses a SQL string as an identifier into a {@link SqlIdentifier}.
*
* @param identifier a sql string to parse as an identifier
* @return a parsed sql node
* @throws SqlParserException if an exception is thrown when parsing the identifier
*/
public SqlIdentifier parseIdentifier(String identifier) {
try {
SqlParser parser = SqlParser.create(identifier, config);
SqlNode sqlNode = parser.parseExpression();
return (SqlIdentifier) sqlNode;
} catch (Exception e) {
throw new SqlParserException(String.format(
"Invalid SQL identifier %s. All SQL keywords must be escaped.", identifier));
}
}
}
Loading

0 comments on commit 54e69c3

Please sign in to comment.