Skip to content

Commit

Permalink
[FLINK-14397][hive] Failed to run Hive UDTF with array arguments
Browse files Browse the repository at this point in the history
Fix the issue that calling Hive UDTF with array arguments causes cast exception.

This closes apache#9927.
  • Loading branch information
lirui-apache authored and bowenli86 committed Oct 28, 2019
1 parent 96640ca commit e18320b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.sql.Date;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -312,9 +314,11 @@ public static Object toFlinkObject(ObjectInspector inspector, Object data) {
ListObjectInspector listInspector = (ListObjectInspector) inspector;
List<?> list = listInspector.getList(data);

Object[] result = new Object[list.size()];
// flink expects a specific array type (e.g. Integer[] instead of Object[]), so we have to get the element class
ObjectInspector elementInspector = listInspector.getListElementObjectInspector();
Object[] result = (Object[]) Array.newInstance(getClassFromObjectInspector(elementInspector), list.size());
for (int i = 0; i < list.size(); i++) {
result[i] = toFlinkObject(listInspector.getListElementObjectInspector(), list.get(i));
result[i] = toFlinkObject(elementInspector, list.get(i));
}
return result;
}
Expand Down Expand Up @@ -450,4 +454,56 @@ private static ObjectInspector getObjectInspector(TypeInfo type) {
public static DataType toFlinkType(ObjectInspector inspector) {
return HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(inspector.getTypeName()));
}

// given a Hive ObjectInspector, get the class for corresponding Flink object
private static Class<?> getClassFromObjectInspector(ObjectInspector inspector) {
switch (inspector.getCategory()) {
case PRIMITIVE: {
PrimitiveObjectInspector primitiveOI = (PrimitiveObjectInspector) inspector;
switch (primitiveOI.getPrimitiveCategory()) {
case STRING:
case CHAR:
case VARCHAR:
return String.class;
case INT:
return Integer.class;
case LONG:
return Long.class;
case BYTE:
return Byte.class;
case SHORT:
return Short.class;
case FLOAT:
return Float.class;
case DOUBLE:
return Double.class;
case DECIMAL:
return BigDecimal.class;
case BOOLEAN:
return Boolean.class;
case BINARY:
return byte[].class;
case DATE:
return Date.class;
case TIMESTAMP:
case INTERVAL_DAY_TIME:
case INTERVAL_YEAR_MONTH:
default:
throw new IllegalArgumentException(
"Unsupported primitive type " + primitiveOI.getPrimitiveCategory().name());

}
}
case LIST:
ListObjectInspector listInspector = (ListObjectInspector) inspector;
Class elementClz = getClassFromObjectInspector(listInspector.getListElementObjectInspector());
return Array.newInstance(elementClz, 0).getClass();
case MAP:
return Map.class;
case STRUCT:
return Row.class;
default:
throw new IllegalArgumentException("Unsupported type " + inspector.getCategory().name());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import scala.collection.JavaConverters;

Expand Down Expand Up @@ -260,6 +263,41 @@ public void testPartialDynamicPartition() throws Exception {
}
}

@Test
public void testUDTF() throws Exception {
// W/o https://issues.apache.org/jira/browse/HIVE-11878 Hive registers the App classloader as the classloader
// for the UDTF and closes the App classloader when we tear down the session. This causes problems for JUnit code
// and shutdown hooks that have to run after the test finishes, because App classloader can no longer load new
// classes. And will crash the forked JVM, thus failing the test phase.
// Therefore disable such tests for older Hive versions.
String hiveVersion = HiveShimLoader.getHiveVersion();
Assume.assumeTrue(hiveVersion.compareTo("2.0.0") >= 0 || hiveVersion.compareTo("1.3.0") >= 0);
hiveShell.execute("create database db1");
try {
hiveShell.execute("create table db1.simple (i int,a array<int>)");
hiveShell.execute("create table db1.nested (a array<map<int, string>>)");
hiveShell.execute("create function hiveudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
hiveShell.insertInto("db1", "simple").addRow(3, Arrays.asList(1, 2, 3)).commit();
Map<Integer, String> map1 = new HashMap<>();
map1.put(1, "a");
map1.put(2, "b");
Map<Integer, String> map2 = new HashMap<>();
map2.put(3, "c");
hiveShell.insertInto("db1", "nested").addRow(Arrays.asList(map1, map2)).commit();

TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
List<Row> results = HiveTestUtils.collectTable(tableEnv,
tableEnv.sqlQuery("select x from db1.simple, lateral table(hiveudtf(a)) as T(x)"));
assertEquals("[1, 2, 3]", results.toString());
results = HiveTestUtils.collectTable(tableEnv,
tableEnv.sqlQuery("select x from db1.nested, lateral table(hiveudtf(a)) as T(x)"));
assertEquals("[{1=a, 2=b}, {3=c}]", results.toString());
} finally {
hiveShell.execute("drop database db1 cascade");
hiveShell.execute("drop function hiveudtf");
}
}

private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,23 @@

package org.apache.flink.table.catalog.hive;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTest;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.planner.sinks.CollectRowTableSink;
import org.apache.flink.table.planner.sinks.CollectTableSink;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.StringUtils;

import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -33,6 +44,9 @@
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
Expand Down Expand Up @@ -108,4 +122,22 @@ public static TableEnvironment createTableEnv() {
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
return tableEnv;
}

public static List<Row> collectTable(TableEnvironment tableEnv, Table table) throws Exception {
CollectTableSink sink = new CollectRowTableSink();
TableSchema tableSchema = table.getSchema();
sink = (CollectTableSink) sink.configure(tableSchema.getFieldNames(), tableSchema.getFieldTypes());
final String id = new AbstractID().toString();
TypeSerializer serializer = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType())
.createSerializer(new ExecutionConfig());
sink.init(serializer, id);
String sinkName = UUID.randomUUID().toString();
tableEnv.registerTableSink(sinkName, sink);
final String builtInCatalogName = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
final String builtInDBName = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
tableEnv.insertInto(table, builtInCatalogName, builtInDBName, sinkName);
JobExecutionResult result = tableEnv.execute("collect-table");
ArrayList<byte[]> data = result.getAccumulatorResult(id);
return SerializedListAccumulator.deserializeList(data, serializer);
}
}

0 comments on commit e18320b

Please sign in to comment.