diff --git a/.circleci/.maven.xml b/.circleci/.maven.xml index 683c303..a439654 100644 --- a/.circleci/.maven.xml +++ b/.circleci/.maven.xml @@ -4,8 +4,8 @@ ossrh - ${env.SONATYPE_USERNAME} - ${env.SONATYPE_PASSWORD} + ${env.SONATYPE_USERTOKEN_ID} + ${env.SONATYPE_USERTOKEN_TOKEN} \ No newline at end of file diff --git a/.circleci/config.yml b/.circleci/config.yml index 22fe97d..876efa7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -64,11 +64,11 @@ custom_filters: merge_only: &merge_only filters: branches: - only: "yosegi-2.0_spark-3.2" + only: "yosegi-2.0_spark-3.5" merge_ignore: &merge_ignore filters: branches: - ignore: "yosegi-2.0_spark-3.2" + ignore: "yosegi-2.0_spark-3.5" workflows: build-and-test: diff --git a/.gitignore b/.gitignore index 6f09b8c..4c49784 100644 --- a/.gitignore +++ b/.gitignore @@ -61,5 +61,9 @@ jars ## sbt build /project/build.properties +## spark +metastore_db +spark-warehouse + .bsp .scala-version diff --git a/pom.xml b/pom.xml index 51d06b4..ff1a5f3 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ jp.co.yahoo.yosegi yosegi-spark_2.12 - 2.0.1_spark-3.2-SNAPSHOT + 2.0.4_spark-3.5-SNAPSHOT jar Yosegi Spark Yosegi Spark library. @@ -57,11 +57,11 @@ 3.1.8 3.1.9 2.0 - ${yosegi.base}.2 - 3.2 - ${spark.base}.1 + ${yosegi.base}.5 + 3.5 + ${spark.base}.0 2.12 - ${scala.base}.15 + ${scala.base}.19 @@ -99,17 +99,17 @@ com.fasterxml.jackson.core jackson-core - 2.13.2 + 2.17.1 com.fasterxml.jackson.core jackson-databind - 2.13.2.2 + 2.17.1 com.fasterxml.jackson.module jackson-module-scala_${scala.base} - 2.13.2 + 2.17.1 diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java index e003ee7..bff80bc 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java @@ -18,7 +18,7 @@ import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.inmemory.ILoaderFactory; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkArrayLoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyArrayLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkRunLengthEncodingArrayLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionArrayLoader; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -37,7 +37,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) throws IOException { if (columnBinary == null) { // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyArrayLoader(vector, loadSize); } switch (getLoadType(columnBinary, loadSize)) { case ARRAY: @@ -47,8 +47,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) case UNION: return new SparkUnionArrayLoader(vector, loadSize); default: - // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyArrayLoader(vector, loadSize); } } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java index b6a38ba..fc06b4c 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java @@ -17,8 +17,8 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.inmemory.ILoaderFactory; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyMapLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkMapLoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionMapLoader; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -36,7 +36,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) throws IOException { if (columnBinary == null) { // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyMapLoader(vector, loadSize); } switch (getLoadType(columnBinary, loadSize)) { case SPREAD: @@ -45,7 +45,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) return new SparkUnionMapLoader(vector, loadSize); default: // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyMapLoader(vector, loadSize); } } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java index 3c4fb3d..0f0b9f8 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java @@ -17,7 +17,7 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.inmemory.ILoaderFactory; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyStructLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkStructLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionStructLoader; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -36,7 +36,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) throws IOException { if (columnBinary == null) { // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyStructLoader(vector, loadSize); } switch (getLoadType(columnBinary, loadSize)) { case SPREAD: @@ -45,7 +45,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize) return new SparkUnionStructLoader(vector, loadSize); default: // FIXME: - return new SparkNullLoader(vector, loadSize); + return new SparkEmptyStructLoader(vector, loadSize); } } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java index 9190f20..a272314 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java @@ -38,7 +38,7 @@ public int getLoadSize() { @Override public void setNull(final int index) throws IOException { - vector.putNull(index); + vector.putArray(index, 0, 0); } @Override @@ -60,6 +60,10 @@ public void setArrayIndex(final int index, final int start, final int length) th public void loadChild(final ColumnBinary columnBinary, final int childLength) throws IOException { vector.getChild(0).reset(); vector.getChild(0).reserve(childLength); + if (vector.getChild(0).hasDictionary()) { + vector.getChild(0).reserveDictionaryIds(0); + vector.getChild(0).setDictionary(null); + } SparkLoaderFactoryUtil.createLoaderFactory(vector.getChild(0)) .create(columnBinary, childLength); } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoader.java new file mode 100644 index 0000000..29ad378 --- /dev/null +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoader.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import jp.co.yahoo.yosegi.inmemory.LoadType; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +import java.io.IOException; + +public class SparkEmptyArrayLoader implements ILoader { + + private final WritableColumnVector vector; + private final int loadSize; + + public SparkEmptyArrayLoader(final WritableColumnVector vector, final int loadSize) { + this.vector = vector; + this.loadSize = loadSize; + this.vector.getChild(0).reset(); + this.vector.getChild(0).reserve(0); + if (this.vector.getChild(0).hasDictionary()) { + this.vector.getChild(0).reserveDictionaryIds(0); + this.vector.getChild(0).setDictionary(null); + } + } + + @Override + public LoadType getLoaderType() { + return LoadType.NULL; + } + + @Override + public int getLoadSize() { + return loadSize; + } + + @Override + public void setNull(final int index) throws IOException { + // FIXME: + } + + @Override + public void finish() throws IOException { + // FIXME: + } + + @Override + public WritableColumnVector build() throws IOException { + for (int i = 0; i < loadSize; i++) { + vector.putArray(i, 0, 0); + } + return vector; + } + + @Override + public boolean isLoadingSkipped() { + return true; + } +} diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoader.java new file mode 100644 index 0000000..f0c6ea0 --- /dev/null +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoader.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; + +public class SparkEmptyLoader { + public static void load(final WritableColumnVector vector, final int loadSize) throws IOException { + final Class klass = vector.dataType().getClass(); + if (klass == ArrayType.class) { + new SparkEmptyArrayLoader(vector, loadSize).build(); + } else if (klass == StructType.class) { + new SparkEmptyStructLoader(vector, loadSize).build(); + } else if (klass == MapType.class) { + new SparkEmptyMapLoader(vector, loadSize).build(); + } else { + new SparkNullLoader(vector, loadSize).build(); + } + } +} diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoader.java new file mode 100644 index 0000000..7e2f598 --- /dev/null +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoader.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import jp.co.yahoo.yosegi.inmemory.LoadType; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +import java.io.IOException; + +public class SparkEmptyMapLoader implements ILoader { + + private final WritableColumnVector vector; + private final int loadSize; + + public SparkEmptyMapLoader(final WritableColumnVector vector, final int loadSize) { + this.vector = vector; + this.loadSize = loadSize; + } + + @Override + public LoadType getLoaderType() { + return LoadType.NULL; + } + + @Override + public int getLoadSize() { + return loadSize; + } + + @Override + public void setNull(final int index) throws IOException { + // FIXME: + } + + @Override + public void finish() throws IOException { + // FIXME: + } + + @Override + public WritableColumnVector build() throws IOException { + vector.getChild(0).reset(); + vector.getChild(0).reserve(0); + vector.getChild(1).reset(); + vector.getChild(1).reserve(0); + return vector; + } + + @Override + public boolean isLoadingSkipped() { + return true; + } +} diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoader.java new file mode 100644 index 0000000..01a6bb0 --- /dev/null +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoader.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import jp.co.yahoo.yosegi.inmemory.LoadType; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; + +public class SparkEmptyStructLoader implements ILoader { + + private final WritableColumnVector vector; + private final int loadSize; + private final String[] names; + + public SparkEmptyStructLoader(final WritableColumnVector vector, final int loadSize) { + this.vector = vector; + this.loadSize = loadSize; + final StructType structType = (StructType) vector.dataType(); + this.names = structType.fieldNames(); + for (int i = 0; i < names.length; i++) { + vector.getChild(i).reset(); + vector.getChild(i).reserve(loadSize); + if (vector.getChild(i).hasDictionary()) { + vector.getChild(i).reserveDictionaryIds(0); + vector.getChild(i).setDictionary(null); + } + } + } + + @Override + public LoadType getLoaderType() { + return LoadType.NULL; + } + + @Override + public int getLoadSize() { + return loadSize; + } + + @Override + public void setNull(final int index) throws IOException { + // FIXME: + } + + @Override + public void finish() throws IOException { + // FIXME: + } + + @Override + public WritableColumnVector build() throws IOException { + for (int i = 0; i < names.length; i++) { + SparkEmptyLoader.load(vector.getChild(i), loadSize); + } + return vector; + } + + @Override + public boolean isLoadingSkipped() { + return true; + } +} diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java index d8d04d1..9f8112f 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java @@ -14,13 +14,13 @@ */ package jp.co.yahoo.yosegi.spark.inmemory.loader; -import jp.co.yahoo.yosegi.inmemory.ISequentialLoader; +import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.inmemory.LoadType; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import java.io.IOException; -public class SparkNullLoader implements ISequentialLoader { +public class SparkNullLoader implements ILoader { private final WritableColumnVector vector; private final int loadSize; @@ -42,17 +42,22 @@ public int getLoadSize() { @Override public void setNull(final int index) throws IOException { - // TODO: + // FIXME: } @Override public void finish() throws IOException { // FIXME: - vector.putNulls(0, loadSize); } @Override public WritableColumnVector build() throws IOException { + vector.putNulls(0, loadSize); return vector; } + + @Override + public boolean isLoadingSkipped() { + return true; + } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java index 9c5dda1..3ebb236 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java @@ -49,7 +49,7 @@ public WritableColumnVector build() throws IOException { @Override public void setNull(final int index) throws IOException { - vector.putNull(index); + vector.putArray(index, 0, 0); } @Override @@ -59,7 +59,7 @@ public void setRowGroupCount(final int count) throws IOException {} public void setNullAndRepetitions( final int startIndex, final int repetitions, final int rowGroupIndex) throws IOException { for (int i = 0; i < repetitions; i++) { - vector.putNull(rowId); + vector.putArray(rowId, 0, 0); rowId++; } } @@ -83,6 +83,10 @@ public void setRowGourpIndexAndRepetitions( public void loadChild(final ColumnBinary columnBinary, final int childLength) throws IOException { vector.getChild(0).reset(); vector.getChild(0).reserve(childLength); + if (vector.getChild(0).hasDictionary()) { + vector.getChild(0).reserveDictionaryIds(0); + vector.getChild(0).setDictionary(null); + } SparkLoaderFactoryUtil.createLoaderFactory(vector.getChild(0)) .create(columnBinary, childLength); } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java index b00600e..828a1c7 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java @@ -65,13 +65,21 @@ public void finish() throws IOException { @Override public WritableColumnVector build() throws IOException { + final StructType structType = (StructType) vector.dataType(); + final String[] names = structType.fieldNames(); + // NOTE: Fill unloaded columns with nulls. + for (int i = 0; i < names.length; i++) { + if (loaderFactoryMap.containsKey(names[i])) { + SparkEmptyLoader.load(vector.getChild(i), loadSize); + } + } return vector; } @Override public void loadChild(final ColumnBinary columnBinary, final int loadSize) throws IOException { if (loaderFactoryMap.containsKey(columnBinary.columnName)) { - loaderFactoryMap.get(columnBinary.columnName).create(columnBinary, loadSize); + loaderFactoryMap.remove(columnBinary.columnName).create(columnBinary, loadSize); } else { // FIXME: } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java index 346fe26..dfa05f2 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java @@ -25,10 +25,17 @@ public class SparkUnionArrayLoader implements IUnionLoader { private final WritableColumnVector vector; private final int loadSize; + private boolean childLoaded; public SparkUnionArrayLoader(final WritableColumnVector vector, final int loadSize) { this.vector = vector; this.loadSize = loadSize; + this.vector.getChild(0).reset(); + this.vector.getChild(0).reserve(0); + if (this.vector.getChild(0).hasDictionary()) { + this.vector.getChild(0).reserveDictionaryIds(0); + this.vector.getChild(0).setDictionary(null); + } } @Override @@ -38,30 +45,33 @@ public int getLoadSize() { @Override public void setNull(final int index) throws IOException { - vector.putNull(index); + // FIXME: } @Override public void finish() throws IOException { - // + // FIXME: } @Override public WritableColumnVector build() throws IOException { + if (!childLoaded) { + for (int i = 0; i < loadSize; i++) { + vector.putArray(i, 0, 0); + } + } return vector; } @Override public void setIndexAndColumnType(final int index, final ColumnType columnType) throws IOException { // FIXME: - if (columnType != ColumnType.ARRAY) { - vector.putNull(index); - } } @Override public void loadChild(final ColumnBinary columnBinary, final int childLoadSize) throws IOException { if (columnBinary.columnType == ColumnType.ARRAY) { + childLoaded = true; SparkLoaderFactoryUtil.createLoaderFactory(vector).create(columnBinary, childLoadSize); } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java index 107ccb0..68274e2 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java @@ -29,6 +29,10 @@ public class SparkUnionMapLoader implements IUnionLoader { public SparkUnionMapLoader(WritableColumnVector vector, int loadSize) { this.vector = vector; this.loadSize = loadSize; + this.vector.getChild(0).reset(); + this.vector.getChild(0).reserve(0); + this.vector.getChild(1).reset(); + this.vector.getChild(1).reserve(0); } @Override @@ -38,12 +42,12 @@ public int getLoadSize() { @Override public void setNull(int index) throws IOException { - vector.putNull(index); + // FIXME: } @Override public void finish() throws IOException { - // + // FIXME: } @Override @@ -54,9 +58,6 @@ public WritableColumnVector build() throws IOException { @Override public void setIndexAndColumnType(int index, ColumnType columnType) throws IOException { // FIXME: - if (columnType != ColumnType.SPREAD) { - vector.putNull(index); - } } @Override diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java index c4e86de..dfc2341 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java @@ -25,6 +25,7 @@ public class SparkUnionStructLoader implements IUnionLoader { private final WritableColumnVector vector; private final int loadSize; + private boolean childLoaded; public SparkUnionStructLoader(WritableColumnVector vector, int loadSize) { this.vector = vector; @@ -38,30 +39,31 @@ public int getLoadSize() { @Override public void setNull(int index) throws IOException { - vector.putNull(index); + // FIXME: } @Override public void finish() throws IOException { - // + // FIXME: } @Override public WritableColumnVector build() throws IOException { + if (!childLoaded) { + new SparkEmptyStructLoader(vector, loadSize).build(); + } return vector; } @Override public void setIndexAndColumnType(int index, ColumnType columnType) throws IOException { // FIXME: - if (columnType != ColumnType.SPREAD) { - vector.putNull(index); - } } @Override public void loadChild(ColumnBinary columnBinary, int childLoadSize) throws IOException { if (columnBinary.columnType == ColumnType.SPREAD) { + childLoaded = true; SparkLoaderFactoryUtil.createLoaderFactory(vector).create(columnBinary, childLoadSize); } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java index ce265d8..19b9a58 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java @@ -17,10 +17,13 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.inmemory.IRawConverter; import jp.co.yahoo.yosegi.spark.inmemory.SparkLoaderFactoryUtil; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyLoader; import jp.co.yahoo.yosegi.spark.utils.PartitionColumnUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; import java.io.IOException; @@ -31,7 +34,7 @@ public class SparkColumnarBatchConverter implements IRawConverter private final StructType schema; private final StructType partitionSchema; private final InternalRow partitionValue; - private final WritableColumnVector[] childColumns; + private final ColumnVector[] childColumns; private final Map keyIndexMap; public SparkColumnarBatchConverter( @@ -39,7 +42,7 @@ public SparkColumnarBatchConverter( final StructType partitionSchema, final InternalRow partitionValue, final Map keyIndexMap, - final WritableColumnVector[] childColumns) { + final ColumnVector[] childColumns) { this.schema = schema; this.partitionSchema = partitionSchema; this.partitionValue = partitionValue; @@ -50,13 +53,13 @@ public SparkColumnarBatchConverter( @Override public ColumnarBatch convert(final List raw, final int loadSize) throws IOException { // NOTE: initialize - for (int i = 0; i < childColumns.length; i++) { + for (int i = 0; i < schema.length(); i++) { // FIXME: how to initialize vector with dictionary. - childColumns[i].reset(); - childColumns[i].reserve(loadSize); - if (childColumns[i].hasDictionary()) { - childColumns[i].reserveDictionaryIds(0); - childColumns[i].setDictionary(null); + ((WritableColumnVector) childColumns[i]).reset(); + ((WritableColumnVector) childColumns[i]).reserve(loadSize); + if (((WritableColumnVector) childColumns[i]).hasDictionary()) { + ((WritableColumnVector) childColumns[i]).reserveDictionaryIds(0); + ((WritableColumnVector) childColumns[i]).setDictionary(null); } } final ColumnarBatch result = new ColumnarBatch(childColumns); @@ -69,16 +72,16 @@ public ColumnarBatch convert(final List raw, final int loadSize) t } final int index = keyIndexMap.get(columnBinary.columnName); isSet[index] = true; - SparkLoaderFactoryUtil.createLoaderFactory(childColumns[index]).create(columnBinary, loadSize); + SparkLoaderFactoryUtil.createLoaderFactory(((WritableColumnVector) childColumns[index])).create(columnBinary, loadSize); } - // NOTE: null columns - for (int i = 0; i < childColumns.length; i++) { + // NOTE: Empty columns + for (int i = 0; i < schema.length(); i++) { if (!isSet[i]) { - childColumns[i].putNulls(0, loadSize); + SparkEmptyLoader.load((WritableColumnVector) childColumns[i], loadSize); } } // NOTE: partitionColumns - final WritableColumnVector[] partColumns = + final ColumnVector[] partColumns = PartitionColumnUtil.createPartitionColumns(partitionSchema, partitionValue, loadSize); for (int i = schema.length(), n = 0; i < childColumns.length; i++, n++) { childColumns[i] = partColumns[n]; diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchReader.java b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchReader.java index aff02b0..883ffee 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchReader.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchReader.java @@ -20,9 +20,9 @@ import jp.co.yahoo.yosegi.spread.expression.IExpressionNode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; -import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; import java.io.IOException; @@ -33,7 +33,7 @@ public class SparkColumnarBatchReader implements IColumnarBatchReader { private final WrapReader reader; - private final WritableColumnVector[] childColumns; + private final ColumnVector[] childColumns; public SparkColumnarBatchReader( final StructType partitionSchema, @@ -47,7 +47,7 @@ public SparkColumnarBatchReader( final IExpressionNode node) throws IOException { final StructField[] fields = schema.fields(); - childColumns = new OnHeapColumnVector[schema.length() + partitionSchema.length()]; + childColumns = new ColumnVector[schema.length() + partitionSchema.length()]; final Map keyIndexMap = new HashMap(); for (int i = 0; i < fields.length; i++) { keyIndexMap.put(fields[i].name(), i); @@ -85,6 +85,9 @@ public ColumnarBatch next() throws IOException { public void close() throws Exception { reader.close(); for (int i = 0; i < childColumns.length; i++) { + if (childColumns[i] == null) { + continue; + } childColumns[i].close(); } } diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/utils/PartitionColumnUtil.java b/src/main/java/jp/co/yahoo/yosegi/spark/utils/PartitionColumnUtil.java index c52ea16..c08d4df 100644 --- a/src/main/java/jp/co/yahoo/yosegi/spark/utils/PartitionColumnUtil.java +++ b/src/main/java/jp/co/yahoo/yosegi/spark/utils/PartitionColumnUtil.java @@ -17,20 +17,21 @@ */ package jp.co.yahoo.yosegi.spark.utils; +import org.apache.spark.sql.execution.vectorized.ConstantColumnVector; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.vectorized.ColumnVector; public final class PartitionColumnUtil{ - public static OnHeapColumnVector[] createPartitionColumns( final StructType partitionColumns , final InternalRow partitionValues , final int rowCount ){ + public static ColumnVector[] createPartitionColumns( final StructType partitionColumns , final InternalRow partitionValues , final int rowCount ){ StructField[] fields = partitionColumns.fields(); - OnHeapColumnVector[] vectors = new OnHeapColumnVector[ fields.length ]; + ColumnVector[] vectors = new ConstantColumnVector[ fields.length ]; for( int i = 0 ; i < vectors.length ; i++ ){ - vectors[i] = new OnHeapColumnVector( rowCount , fields[i].dataType() ); - ColumnVectorUtils.populate( vectors[i] , partitionValues , i ); + vectors[i] = new ConstantColumnVector( rowCount , fields[i].dataType() ); + ColumnVectorUtils.populate( (ConstantColumnVector) vectors[i] , partitionValues , i ); } return vectors; } diff --git a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000..43dad69 --- /dev/null +++ b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more contributor license +# agreements. See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. You may obtain a +# copy of the License at +# +#

http://www.apache.org/licenses/LICENSE-2.0 +# +#

Unless required by applicable law or agreed to in writing, software distributed under the +# License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing permissions and +# limitations under the License. +# + +jp.co.yahoo.yosegi.spark.YosegiFileFormat \ No newline at end of file diff --git a/src/main/scala/jp/co/yahoo/yosegi/spark/YosegiFileFormat.scala b/src/main/scala/jp/co/yahoo/yosegi/spark/YosegiFileFormat.scala index 7bec533..c8a4575 100644 --- a/src/main/scala/jp/co/yahoo/yosegi/spark/YosegiFileFormat.scala +++ b/src/main/scala/jp/co/yahoo/yosegi/spark/YosegiFileFormat.scala @@ -118,7 +118,7 @@ class YosegiFileFormat extends FileFormat with DataSourceRegister with Serializa val readSchema:DataType = DataType.fromJson( requiredSchemaJson ) val partSchema:DataType = DataType.fromJson( partitionSchemaJson ) assert(file.partitionValues.numFields == partitionSchema.size ) - val path:Path = new Path( new URI(file.filePath) ) + val path:Path = file.filePath.toPath val fs:FileSystem = path.getFileSystem( broadcastedHadoopConf.value.value ) val yosegiConfig = new jp.co.yahoo.yosegi.config.Configuration() if( expandOption.nonEmpty ){ diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/DataSourceTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/DataSourceTest.java new file mode 100644 index 0000000..1ed096b --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/DataSourceTest.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.blackbox; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DecimalType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class DataSourceTest { + private static SparkSession spark; + private static SQLContext sqlContext; + private static final String appName = "DataSourceTest"; + + public boolean deleteDirectory(final File directory) { + final File[] allContents = directory.listFiles(); + if (allContents != null) { + for (final File file : allContents) { + deleteDirectory(file); + } + } + return directory.delete(); + } + + public String getTmpPath() { + String tmpdir = System.getProperty("java.io.tmpdir"); + if (tmpdir.endsWith("/")) { + tmpdir = tmpdir.substring(0, tmpdir.length() - 1); + } + return tmpdir + "/" + appName; + } + + public String getLocation(final String table) { + String tmpPath = getTmpPath(); + return tmpPath + "/" + table; + } + + @BeforeAll + static void initAll() { + spark = SparkSession.builder().appName(appName).master("local[*]").getOrCreate(); + sqlContext = spark.sqlContext(); + } + + @AfterAll + static void tearDownAll() { + spark.close(); + } + + @AfterEach + void tearDown() { + deleteDirectory(new File(getTmpPath())); + } + + @Test + void T_DataSource_Primitive_1() throws IOException { + final String location = getLocation("primitive1"); + final int precision = DecimalType.MAX_PRECISION(); + final int scale = DecimalType.MINIMUM_ADJUSTED_SCALE(); + /** + * CREATE TABLE primitive1 ( + * id INT, + * bo BOOLEAN, + * by BYTE, + * de DECIMAL(38,6), + * do DOUBLE, + * fl FLOAT, + * in INT, + * lo LONG, + * sh SHORT, + * st STRING + * ) + * USING yosegi + * LOCATION '' + */ + final String ddl = String.format("CREATE TABLE primitive1 (\n" + + " id INT,\n" + + " bo BOOLEAN,\n" + + " by BYTE,\n" + + " de DECIMAL(%d, %d),\n" + + " do DOUBLE,\n" + + " fl FLOAT,\n" + + " in INT,\n" + + " lo LONG,\n" + + " sh SHORT,\n" + + " st STRING\n" + + ")\n" + + "USING yosegi\n" + + "LOCATION '%s';", precision, scale, location); + spark.sql(ddl); + + /** + * FIXME: cannot insert decimal value. + */ + final String insertSql = "INSERT INTO primitive1\n" + + "(id, bo, by, de, do, fl, in, lo, sh, st)\n" + + "VALUES\n" + + "(0, true, 127, 123.45678901, 1.7976931348623157e+308, 3.402823e+37, 2147483647, 9223372036854775807, 32767, 'value1');"; + spark.sql(insertSql); + + List rows = spark.sql("SELECT * FROM primitive1;").collectAsList(); + assertEquals(rows.get(0).getAs("id"), Integer.valueOf(0)); + assertEquals(rows.get(0).getAs("bo"), Boolean.valueOf(true)); + assertEquals(rows.get(0).getAs("by"), Byte.valueOf((byte) 127)); + assertNull(rows.get(0).getAs("de")); + assertEquals(rows.get(0).getAs("do"), Double.valueOf(1.7976931348623157e+308)); + assertEquals(rows.get(0).getAs("fl"), Float.valueOf(3.402823e+37F)); + assertEquals(rows.get(0).getAs("in"), Integer.valueOf(2147483647)); + assertEquals(rows.get(0).getAs("lo"), Long.valueOf(9223372036854775807L)); + assertEquals(rows.get(0).getAs("sh"), Short.valueOf((short) 32767)); + assertEquals(rows.get(0).getAs("st"), "value1"); + } + + @Test + void T_DataSource_Primitive_2() throws IOException { + final String location = getLocation("primitive2"); + final int precision = DecimalType.MAX_PRECISION(); + final int scale = DecimalType.MINIMUM_ADJUSTED_SCALE(); + /** + * CREATE TABLE primitive2 ( + * id INT, + * bo BOOLEAN, + * by BYTE, + * de DOUBLE, + * do DOUBLE, + * fl FLOAT, + * in INT, + * lo LONG, + * sh SHORT, + * st STRING + * ) + * USING yosegi + * LOCATION '' + */ + final String ddl1 = String.format("CREATE TABLE primitive2 (\n" + + " id INT,\n" + + " bo BOOLEAN,\n" + + " by BYTE,\n" + + " de DOUBLE,\n" + + " do DOUBLE,\n" + + " fl FLOAT,\n" + + " in INT,\n" + + " lo LONG,\n" + + " sh SHORT,\n" + + " st STRING\n" + + ")\n" + + "USING yosegi\n" + + "LOCATION '%s';", location); + spark.sql(ddl1); + + final String insertSql = "INSERT INTO primitive2\n" + + "(id, bo, by, de, do, fl, in, lo, sh, st)\n" + + "VALUES\n" + + "(0, true, 127, 123.45678901, 1.7976931348623157e+308, 3.402823e+37, 2147483647, 9223372036854775807, 32767, 'value1');"; + spark.sql(insertSql); + + spark.sql("DROP TABLE primitive2;"); + + /** + * CREATE TABLE primitive2 ( + * id INT, + * bo BOOLEAN, + * by BYTE, + * de DECIMAL(38,6), + * do DOUBLE, + * fl FLOAT, + * in INT, + * lo LONG, + * sh SHORT, + * st STRING + * ) + * USING yosegi + * LOCATION '' + */ + final String ddl2 = String.format("CREATE TABLE primitive2 (\n" + + " id INT,\n" + + " bo BOOLEAN,\n" + + " by BYTE,\n" + + " de DECIMAL(%d, %d),\n" + + " do DOUBLE,\n" + + " fl FLOAT,\n" + + " in INT,\n" + + " lo LONG,\n" + + " sh SHORT,\n" + + " st STRING\n" + + ")\n" + + "USING yosegi\n" + + "LOCATION '%s';", precision, scale, location); + spark.sql(ddl2); + + List rows = spark.sql("SELECT * FROM primitive2;").collectAsList(); + assertEquals(rows.get(0).getAs("id"), Integer.valueOf(0)); + assertEquals(rows.get(0).getAs("bo"), Boolean.valueOf(true)); + assertEquals(rows.get(0).getAs("by"), Byte.valueOf((byte) 127)); + assertEquals(rows.get(0).getAs("de"), BigDecimal.valueOf(123.456789)); + assertEquals(rows.get(0).getAs("do"), Double.valueOf(1.7976931348623157e+308)); + assertEquals(rows.get(0).getAs("fl"), Float.valueOf(3.402823e+37F)); + assertEquals(rows.get(0).getAs("in"), Integer.valueOf(2147483647)); + assertEquals(rows.get(0).getAs("lo"), Long.valueOf(9223372036854775807L)); + assertEquals(rows.get(0).getAs("sh"), Short.valueOf((short) 32767)); + assertEquals(rows.get(0).getAs("st"), "value1"); + } + + @Test + void T_DataSource_Expand_1() throws IOException { + final String location = getLocation("flatten1"); + /** + * CREATE TABLE expand1 ( + * id INT, + * a ARRAY + * ) + * USING yosegi + * LOCATION ''; + */ + final String ddl1 = String.format("CREATE TABLE expand1 (\n" + + " id INT,\n" + + " a ARRAY\n" + + ")\n" + + "USING yosegi\n" + + "LOCATION '%s';", location); + spark.sql(ddl1); + + final String insertSql = "INSERT INTO expand1\n" + + "(id, a)\n" + + "VALUES\n" + + "(0, array(1,2,3));"; + spark.sql(insertSql); + + spark.sql("DROP TABLE expand1;"); + + /** + * CREATE TABLE expand1( + * id INT, + * aa INT + * ) + * USING yosegi + * LOCATION '' + * OPTIONS ( + * 'spread.reader.expand.column'='{"base":{"node":"a", "link_name":"aa"}}' + * ); + */ + final String ddl2 = String.format("CREATE TABLE expand1(\n" + + " id INT,\n" + + " aa INT\n" + + ")\n" + + "USING yosegi\n" + + "LOCATION '%s'\n" + + "OPTIONS (\n" + + " 'spread.reader.expand.column'='{\"base\":{\"node\":\"a\", \"link_name\":\"aa\"}}'\n" + + ");", location); + spark.sql(ddl2); + + List rows = spark.sql("SELECT * FROM expand1 ORDER BY id, aa;").collectAsList(); + assertEquals(rows.get(0).getAs("id"), Integer.valueOf(0)); + assertEquals(rows.get(1).getAs("id"), Integer.valueOf(0)); + assertEquals(rows.get(2).getAs("id"), Integer.valueOf(0)); + assertEquals(rows.get(0).getAs("aa"), Integer.valueOf(1)); + assertEquals(rows.get(1).getAs("aa"), Integer.valueOf(2)); + assertEquals(rows.get(2).getAs("aa"), Integer.valueOf(3)); + } + + @Test + void T_DataSource_Flatten_1() throws IOException { + final String location = getLocation("flatten1"); + /** + * CREATE TABLE flatten1 ( + * id INT, + * m MAP + * ) + * USING yosegi + * LOCATION ''; + */ + final String ddl1 = String.format("CREATE TABLE flatten1 (\n" + + " id INT,\n" + + " m MAP\n" + + ")\n" + + "USING yosegi\n" + + "LOCATION '%s';", location); + spark.sql(ddl1); + + final String insertSql = "INSERT INTO flatten1\n" + + "(id, m)\n" + + "VALUES\n" + + "(0, map('k1', 'v1', 'k2', 'v2'));"; + spark.sql(insertSql); + + spark.sql("DROP TABLE flatten1;"); + + /** + * CREATE TABLE flatten1 ( + * id INT, + * mk1 STRING, + * mk2 STRING + * ) + * USING yosegi + * LOCATION '' + * OPTIONS ( + * 'spread.reader.flatten.column'='[{"link_name":"id", "nodes":["id"]}, {"link_name":"mk1", "nodes":["m","k1"]}, {"link_name":"mk2", "nodes":["m","k2"]}]' + * ); + */ + final String ddl2 = String.format("CREATE TABLE flatten1 (\n" + + " id INT,\n" + + " mk1 STRING,\n" + + " mk2 STRING\n" + + ")\n" + + "USING yosegi\n" + + "LOCATION '%s'\n" + + "OPTIONS (\n" + + " 'spread.reader.flatten.column'='[{\"link_name\":\"id\", \"nodes\":[\"id\"]}, {\"link_name\":\"mk1\", \"nodes\":[\"m\",\"k1\"]}, {\"link_name\":\"mk2\", \"nodes\":[\"m\",\"k2\"]}]'\n" + + ");", location); + spark.sql(ddl2); + + List rows = spark.sql("SELECT * FROM flatten1;").collectAsList(); + assertEquals(rows.get(0).getAs("id"), Integer.valueOf(0)); + assertEquals(rows.get(0).getAs("mk1"), "v1"); + assertEquals(rows.get(0).getAs("mk2"), "v2"); + } +} diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlatten.java b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlattenTest.java similarity index 97% rename from src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlatten.java rename to src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlattenTest.java index 3773720..790588d 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlatten.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlattenTest.java @@ -40,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class ExpandFlatten { +public class ExpandFlattenTest { private static SparkSession spark; private static SQLContext sqlContext; private static final String appName = "ExpandFlattenTest"; @@ -60,7 +60,11 @@ public String getResourcePath(final String resource) { } public String getTmpPath() { - return System.getProperty("java.io.tmpdir") + appName + ".yosegi"; + String tmpdir = System.getProperty("java.io.tmpdir"); + if (tmpdir.endsWith("/")) { + tmpdir = tmpdir.substring(0, tmpdir.length() - 1); + } + return tmpdir + "/" + appName + ".yosegi"; } public Dataset loadJsonFile(final String resource, final StructType schema) { diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/Load.java b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java similarity index 89% rename from src/test/java/jp/co/yahoo/yosegi/spark/blackbox/Load.java rename to src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java index 9b8fa97..65ddaf7 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/Load.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java @@ -43,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class Load { +public class LoadTest { private static SparkSession spark; private static SQLContext sqlContext; private static String appName = "LoadTest"; @@ -63,7 +63,11 @@ public String getResourcePath(final String resource) { } public String getTmpPath() { - return System.getProperty("java.io.tmpdir") + appName + ".yosegi"; + String tmpdir = System.getProperty("java.io.tmpdir"); + if (tmpdir.endsWith("/")) { + tmpdir = tmpdir.substring(0, tmpdir.length() - 1); + } + return tmpdir + "/" + appName + ".yosegi"; } public Dataset loadJsonFile(final String resource, final StructType schema) { @@ -318,33 +322,24 @@ void T_load_Array_Array_Integer_1() throws IOException { final int jIndex = ldfj.get(i).fieldIndex("aa"); final int yIndex = ldfy.get(i).fieldIndex("aa"); if (ldfj.get(i).isNullAt(jIndex)) { - if (ldfy.get(i).isNullAt(yIndex)) { - // NOTE: json:null, yosegi:null - assertTrue(ldfy.get(i).isNullAt(yIndex)); - } else { - // FIXME: json:null, yosegi:[] - assertTrue(false); - } + // NOTE: json:null, yosegi:[] + assertEquals(0, ldfy.get(i).getList(yIndex).size()); } else { if (ldfy.get(i).isNullAt(yIndex)) { // NOTE: json:[], yosegi:null - assertEquals(0, ldfj.get(i).getList(jIndex).size()); + assertTrue(false); } else { final List> ldfj2 = ldfj.get(i).getList(jIndex); final List> ldfy2 = ldfy.get(i).getList(yIndex); for (int j = 0; j < ldfj2.size(); j++) { final WrappedArray waj = ldfj2.get(j); final WrappedArray way = ldfy2.get(j); - if (way == null) { - if (waj == null) { - // NOTE: json:[null], yosegi:[null] - assertNull(waj); - } else { - // NOTE: json:[[]], yosegi:[null] - assertEquals(0, waj.size()); - } + if (waj == null) { + // NOTE: json:[null], yosegi:[[]] + assertEquals(0, way.size()); } else { // NOTE: json:[[]], yosegi:[[]] + assertEquals(waj.size(), way.size()); for (int k = 0; k < waj.size(); k++) { assertEquals(waj.apply(k), way.apply(k)); } @@ -393,59 +388,28 @@ void T_load_Array_Struct_Primitive_1() throws IOException { final int jIndex = ldfj.get(i).fieldIndex("as"); final int yIndex = ldfy.get(i).fieldIndex("as"); if (ldfj.get(i).isNullAt(jIndex)) { - if (ldfy.get(i).isNullAt(yIndex)) { - // NOTE: json:null, yosegi:null - assertTrue(ldfy.get(i).isNullAt(yIndex)); - } else { - // FIXME: json:null, yosegi:[] - assertTrue(false); - } + // NOTE: json:null, yosegi:[] + assertEquals(0, ldfy.get(i).getList(yIndex).size()); } else { if (ldfy.get(i).isNullAt(yIndex)) { - final List lrj = ldfj.get(i).getList(jIndex); - for (int j = 0; j < lrj.size(); j++) { - final Row rj = lrj.get(j); - if (rj == null) { - // NOTE: json:[null], yosegi:null - assertNull(rj); - } else { - // NOTE: json[as.field:null], yosegi:null - for (final StructField field : fields) { - final String name = field.name(); - assertNull(rj.getAs(name)); - } - } - } + assertTrue(false); } else { final List lrj = ldfj.get(i).getList(jIndex); final List lry = ldfy.get(i).getList(yIndex); for (int j = 0; j < lrj.size(); j++) { final Row rj = lrj.get(j); final Row ry = lry.get(j); - if (ry == null) { - if (rj == null) { - // NOTE: json:[null], yosegi:[null] - assertNull(rj); - } else { - // NOTE: json:[{}], yosegi:[null] - for (final StructField field : fields) { - final String name = field.name(); - assertNull(rj.getAs(name)); - } + if (rj == null) { + // NOTE: json:[null], yosegi:[{}] + for (final StructField field : fields) { + final String name = field.name(); + assertNull(ry.getAs(name)); } } else { - if (rj == null) { - // NOTE: json:[null], yosegi:[{}] - for (final StructField field : fields) { - final String name = field.name(); - assertNull(ry.getAs(name)); - } - } else { - // NOTE: json:[{}], yosegi:[{}] - for (final StructField field : fields) { - final String name = field.name(); - assertEquals((Object) rj.getAs(name), (Object) ry.getAs(name)); - } + // NOTE: json:[{}], yosegi:[{}] + for (final StructField field : fields) { + final String name = field.name(); + assertEquals((Object) rj.getAs(name), (Object) ry.getAs(name)); } } } @@ -480,31 +444,12 @@ void T_load_Array_Map_Integer_1() throws IOException { final int jIndex = ldfj.get(i).fieldIndex("am"); final int yIndex = ldfy.get(i).fieldIndex("am"); if (ldfj.get(i).isNullAt(jIndex)) { - if (ldfy.get(i).isNullAt(yIndex)) { - // NOTE: json:null, yosegi:null - assertTrue(ldfy.get(i).isNullAt(yIndex)); - } else { - // FIXME: json:null, yosegi:[] - assertTrue(false); - } + // NOTE: json:null, yosegi:[] + assertEquals(0, ldfy.get(i).getList(yIndex).size()); } else { if (ldfy.get(i).isNullAt(yIndex)) { // NOTE: json:[], yosegi:null - final List> lmj = ldfj.get(i).getList(jIndex); - for (int j = 0; j < lmj.size(); j++) { - final Map mj = lmj.get(j); - if (mj == null) { - // NOTE: json:[null], yosegi:null - assertNull(mj); - } else { - // NOTE: json:[am.key:null], yosegi:null - final Iterator iter = mj.keysIterator(); - while (iter.hasNext()) { - final String key = iter.next(); - assertNull(mj.get(key).get()); - } - } - } + assertTrue(false); } else { // NOTE: json:[], yosegi:[] final List> lmj = ldfj.get(i).getList(jIndex); @@ -512,38 +457,20 @@ void T_load_Array_Map_Integer_1() throws IOException { for (int j = 0; j < lmj.size(); j++) { final Map mj = lmj.get(j); final Map my = lmy.get(j); - if (my == null) { - if (mj == null) { - // NOTE: json:[null], yosegi:[null] - assertNull(mj); - } else { - // NOTE: json:[{}], yosegi:[null] - final Iterator iter = mj.keysIterator(); - while (iter.hasNext()) { - final String key = iter.next(); - assertNull(mj.get(key).get()); - } - } + if (mj == null) { + // NOTE: json:[null], yosegi:[{}] + assertEquals(0, my.size()); } else { - if (mj == null) { - // NOTE: json:[null], yosegi:[{}] - final Iterator iter = my.keysIterator(); - while (iter.hasNext()) { - final String key = iter.next(); - assertNull(my.get(key).get()); - } - } else { - // NOTE: json[{}], yosegi:[{}] - final Iterator iter = mj.keysIterator(); - while (iter.hasNext()) { - final String key = iter.next(); - if (mj.get(key).get() == null) { - // NOTE: json:[{key:null}], yosegi:[{key:not exist}] - assertTrue(my.get(key).isEmpty()); - } else { - // NOTE: json:[{key}], yosegi:[{key}] - assertEquals(mj.get(key), my.get(key)); - } + // NOTE: json[{}], yosegi:[{}] + final Iterator iter = mj.keysIterator(); + while (iter.hasNext()) { + final String key = iter.next(); + if (mj.get(key).get() == null) { + // NOTE: json:[{key:null}], yosegi:[{key:not exist}] + assertTrue(my.get(key).isEmpty()); + } else { + // NOTE: json:[{key}], yosegi:[{key}] + assertEquals(mj.get(key), my.get(key)); } } } diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/PartitionLoadTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/PartitionLoadTest.java new file mode 100644 index 0000000..ce2b988 --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/PartitionLoadTest.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.blackbox; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.apache.spark.sql.functions.col; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class PartitionLoadTest { + private static SparkSession spark; + private static SQLContext sqlContext; + private static String appName = "PartitionLoadTest"; + + public boolean deleteDirectory(final File directory) { + final File[] allContents = directory.listFiles(); + if (allContents != null) { + for (final File file : allContents) { + deleteDirectory(file); + } + } + return directory.delete(); + } + + public String getResourcePath(final String resource) { + return Thread.currentThread().getContextClassLoader().getResource(resource).getPath(); + } + + public String getTmpPath() { + String tmpdir = System.getProperty("java.io.tmpdir"); + if (tmpdir.endsWith("/")) { + tmpdir = tmpdir.substring(0, tmpdir.length() - 1); + } + return tmpdir + "/" + appName + ".yosegi"; + } + + public Dataset loadJsonFile(final String resource, final StructType schema) { + final String resourcePath = getResourcePath(resource); + if (schema == null) { + return sqlContext.read().json(resourcePath).orderBy(col("id").asc()); + } + return sqlContext.read().schema(schema).json(resourcePath).orderBy(col("id").asc()); + } + + public void createYosegiFile(final String resource, final String... partitions) { + final Dataset df = loadJsonFile(resource, null); + final String tmpPath = getTmpPath(); + df.write() + .mode(SaveMode.Overwrite) + .partitionBy(partitions) + .format("jp.co.yahoo.yosegi.spark.YosegiFileFormat") + .save(tmpPath); + } + + public Dataset loadYosegiFile(final StructType schema) { + final String tmpPath = getTmpPath(); + if (schema == null) { + return sqlContext + .read() + .format("jp.co.yahoo.yosegi.spark.YosegiFileFormat") + .load(tmpPath) + .orderBy(col("id").asc()); + } + return sqlContext + .read() + .format("jp.co.yahoo.yosegi.spark.YosegiFileFormat") + .schema(schema) + .load(tmpPath) + .orderBy(col("id").asc()); + } + + @BeforeAll + static void initAll() { + spark = SparkSession.builder().appName(appName).master("local[*]").getOrCreate(); + sqlContext = spark.sqlContext(); + } + + @AfterAll + static void tearDownAll() { + spark.close(); + } + + @AfterEach + void tearDown() { + deleteDirectory(new File(getTmpPath())); + } + + /* + * FIXME: The rows cannot be loaded if rows in a partition have only null values. + * * {"id":1} + */ + @Test + void T_load_Partition_Primitive_1() throws IOException { + // NOTE: create yosegi file + final String resource = "blackbox/Partition_Primitive_1.txt"; + createYosegiFile(resource, "id"); + + // NOTE: schema + final int precision = DecimalType.MAX_PRECISION(); + final int scale = DecimalType.MINIMUM_ADJUSTED_SCALE(); + final List fields = + Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("bo", DataTypes.BooleanType, true), + DataTypes.createStructField("by", DataTypes.ByteType, true), + DataTypes.createStructField("de", DataTypes.createDecimalType(precision, scale), true), + DataTypes.createStructField("do", DataTypes.DoubleType, true), + DataTypes.createStructField("fl", DataTypes.FloatType, true), + DataTypes.createStructField("in", DataTypes.IntegerType, true), + DataTypes.createStructField("lo", DataTypes.LongType, true), + DataTypes.createStructField("sh", DataTypes.ShortType, true), + DataTypes.createStructField("st", DataTypes.StringType, true)); + final StructType structType = DataTypes.createStructType(fields); + // NOTE: load + final Dataset dfj = loadJsonFile(resource, structType); + final Dataset dfy = loadYosegiFile(structType); + + // NOTE: assert + final List ldfj = dfj.collectAsList(); + final List ldfy = dfy.collectAsList(); + for (int i = 0; i < ldfj.size(); i++) { + for (final StructField field : fields) { + final String name = field.name(); + final DataType dataType = field.dataType(); + assertEquals((Object) ldfj.get(i).getAs(name), (Object) ldfy.get(i).getAs(name)); + } + } + } + + @Test + void T_load_Partition_Primitive_2() throws IOException { + // NOTE: create yosegi file + final String resource = "blackbox/Partition_Primitive_2.txt"; + createYosegiFile(resource, "p1", "p2"); + + // NOTE: schema + final List fields = + Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("p1", DataTypes.IntegerType, true), + DataTypes.createStructField("p2", DataTypes.StringType, true), + DataTypes.createStructField("v", DataTypes.StringType, true)); + final StructType structType = DataTypes.createStructType(fields); + // NOTE: load + final Dataset dfj = loadJsonFile(resource, structType); + final Dataset dfy = loadYosegiFile(structType); + + // NOTE: assert + final List ldfj = dfj.collectAsList(); + final List ldfy = dfy.collectAsList(); + for (int i = 0; i < ldfj.size(); i++) { + for (final StructField field : fields) { + final String name = field.name(); + final DataType dataType = field.dataType(); + assertEquals((Object) ldfj.get(i).getAs(name), (Object) ldfy.get(i).getAs(name)); + } + } + } +} diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactoryTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactoryTest.java index 7cab7a4..0dbab82 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactoryTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactoryTest.java @@ -19,7 +19,7 @@ import jp.co.yahoo.yosegi.binary.maker.MaxLengthBasedArrayColumnBinaryMaker; import jp.co.yahoo.yosegi.inmemory.ILoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkArrayLoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyArrayLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkRunLengthEncodingArrayLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionArrayLoader; import jp.co.yahoo.yosegi.spark.test.Utils; @@ -99,6 +99,6 @@ void T_createLoader_Null() throws IOException { final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); final ILoader loader = new SparkArrayLoaderFactory(vector).createLoader(columnBinary, loadSize); - assertTrue(loader instanceof SparkNullLoader); + assertTrue(loader instanceof SparkEmptyArrayLoader); } } diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactoryTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactoryTest.java index 21b2f8e..1a823fe 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactoryTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactoryTest.java @@ -17,8 +17,8 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.binary.maker.DumpSpreadColumnBinaryMaker; import jp.co.yahoo.yosegi.inmemory.ILoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyMapLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkMapLoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; import jp.co.yahoo.yosegi.spark.test.Utils; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -64,6 +64,6 @@ void T_createLoader_Null() throws IOException { final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); final ILoader loader = new SparkMapLoaderFactory(vector).createLoader(columnBinary, loadSize); - assertTrue(loader instanceof SparkNullLoader); + assertTrue(loader instanceof SparkEmptyMapLoader); } } diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactoryTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactoryTest.java index 1a458fd..1e66d38 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactoryTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactoryTest.java @@ -17,7 +17,7 @@ import jp.co.yahoo.yosegi.binary.ColumnBinary; import jp.co.yahoo.yosegi.binary.maker.DumpSpreadColumnBinaryMaker; import jp.co.yahoo.yosegi.inmemory.ILoader; -import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader; +import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyStructLoader; import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkStructLoader; import jp.co.yahoo.yosegi.spark.test.Utils; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; @@ -89,6 +89,6 @@ void T_createLoader_Null() throws IOException { final ILoader loader = new SparkStructLoaderFactory(vector).createLoader(columnBinary, loadSize); - assertTrue(loader instanceof SparkNullLoader); + assertTrue(loader instanceof SparkEmptyStructLoader); } } diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoaderTest.java index 8c80909..f244a9e 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoaderTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoaderTest.java @@ -17,9 +17,12 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import jp.co.yahoo.yosegi.binary.ColumnBinary; +import jp.co.yahoo.yosegi.binary.ColumnBinaryMakerConfig; import jp.co.yahoo.yosegi.binary.FindColumnBinaryMaker; import jp.co.yahoo.yosegi.binary.maker.IColumnBinaryMaker; import jp.co.yahoo.yosegi.binary.maker.MaxLengthBasedArrayColumnBinaryMaker; +import jp.co.yahoo.yosegi.binary.maker.OptimizedNullArrayDumpStringColumnBinaryMaker; +import jp.co.yahoo.yosegi.binary.maker.OptimizedNullArrayStringColumnBinaryMaker; import jp.co.yahoo.yosegi.inmemory.IArrayLoader; import jp.co.yahoo.yosegi.message.parser.json.JacksonMessageReader; import jp.co.yahoo.yosegi.spark.test.Utils; @@ -370,4 +373,46 @@ void T_load_String_1(final String binaryMakerClassName) throws IOException { // NOTE: assert assertArray(values, vector, loadSize, elmDataType); } + + @ParameterizedTest + @MethodSource("D_arrayColumnBinaryMaker") + void T_load_String_checkDictionaryReset(final String binaryMakerClassName) throws IOException { + // NOTE: test data + // NOTE: expected + String resource = "SparkArrayLoaderTest/String_1.txt"; + List> values = toValues(resource); + int loadSize = values.size(); + + // NOTE: create ColumnBinary + IColumn column = toArrayColumn(resource); + IColumnBinaryMaker binaryMaker = FindColumnBinaryMaker.get(binaryMakerClassName); + + ColumnBinaryMakerConfig config = new ColumnBinaryMakerConfig(); + config.stringMakerClass = new OptimizedNullArrayStringColumnBinaryMaker(); + ColumnBinary columnBinary = Utils.getColumnBinary(binaryMaker, column, config, null, null); + + // NOTE: load + final DataType elmDataType = DataTypes.StringType; + final DataType dataType = DataTypes.createArrayType(elmDataType); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + IArrayLoader loader = new SparkArrayLoader(vector, loadSize); + binaryMaker.load(columnBinary, loader); + + // NOTE: assert + assertArray(values, vector, loadSize, elmDataType); + + // NOTE: Check if the vector is reset + String resource2 = "SparkArrayLoaderTest/String_2.txt"; + List> values2 = toValues(resource2); + int loadSize2 = values2.size(); + IColumn column2 = toArrayColumn(resource2); + IColumnBinaryMaker binaryMaker2 = FindColumnBinaryMaker.get(binaryMakerClassName); + config.stringMakerClass = new OptimizedNullArrayDumpStringColumnBinaryMaker(); + ColumnBinary columnBinary2 = Utils.getColumnBinary(binaryMaker2, column2, config, null, null); + vector.reset(); + vector.reserve(loadSize2); + IArrayLoader loader2 = new SparkArrayLoader(vector, loadSize2); + binaryMaker.load(columnBinary2, loader2); + assertArray(values2, vector, loadSize2, elmDataType); + } } diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoaderTest.java new file mode 100644 index 0000000..1348aa1 --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoaderTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class SparkEmptyArrayLoaderTest { + public static Stream data() { + final int loadSize = 5; + return Stream.of( + arguments(DataTypes.BooleanType, loadSize), + arguments(DataTypes.ByteType, loadSize), + arguments(DataTypes.StringType, loadSize), + arguments(DataTypes.createDecimalType(), loadSize), + arguments(DataTypes.DoubleType, loadSize), + arguments(DataTypes.FloatType, loadSize), + arguments(DataTypes.IntegerType, loadSize), + arguments(DataTypes.LongType, loadSize), + arguments(DataTypes.ShortType, loadSize) + ); + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Primitive(final DataType elmDataType, final int loadSize) throws IOException { + final DataType dataType = DataTypes.createArrayType(elmDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyArrayLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Array(final DataType elmDataType, final int loadSize) throws IOException { + final DataType arrayDataType = DataTypes.createArrayType(elmDataType); + final DataType dataType = DataTypes.createArrayType(arrayDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyArrayLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Map(final DataType elmDataType, final int loadSize) throws IOException { + final DataType mapDataType = DataTypes.createMapType(DataTypes.StringType, elmDataType, true); + final DataType dataType = DataTypes.createArrayType(mapDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyArrayLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @Test + void T_load_Array_Struct() throws IOException { + final List fields = + Arrays.asList( + DataTypes.createStructField("bo", DataTypes.BooleanType, true), + DataTypes.createStructField("by", DataTypes.ByteType, true), + DataTypes.createStructField("bi", DataTypes.BinaryType, true), + DataTypes.createStructField("do", DataTypes.DoubleType, true), + DataTypes.createStructField("fl", DataTypes.FloatType, true), + DataTypes.createStructField("in", DataTypes.IntegerType, true), + DataTypes.createStructField("lo", DataTypes.LongType, true), + DataTypes.createStructField("sh", DataTypes.ShortType, true), + DataTypes.createStructField("st", DataTypes.StringType, true)); + final DataType structDataType = DataTypes.createStructType(fields); + final DataType dataType = DataTypes.createArrayType(structDataType, true); + final int loadSize = 5; + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyArrayLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } +} \ No newline at end of file diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoaderTest.java new file mode 100644 index 0000000..a1c9f31 --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoaderTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class SparkEmptyLoaderTest { + public static Stream data() { + final int loadSize = 5; + return Stream.of( + arguments(DataTypes.BooleanType, loadSize), + arguments(DataTypes.ByteType, loadSize), + arguments(DataTypes.StringType, loadSize), + arguments(DataTypes.createDecimalType(), loadSize), + arguments(DataTypes.DoubleType, loadSize), + arguments(DataTypes.FloatType, loadSize), + arguments(DataTypes.IntegerType, loadSize), + arguments(DataTypes.LongType, loadSize), + arguments(DataTypes.ShortType, loadSize) + ); + } + + public void assertNull(final WritableColumnVector vector, final int loadSize) { + for (int i = 0; i < loadSize; i++) { + assertTrue(vector.isNullAt(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Primitive(final DataType dataType, final int loadSize) throws IOException { + final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + assertNull(vector, loadSize); + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Primitive(final DataType elmDataType, final int loadSize) throws IOException { + final DataType dataType = DataTypes.createArrayType(elmDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Array(final DataType elmDataType, final int loadSize) throws IOException { + final DataType arrayDataType = DataTypes.createArrayType(elmDataType); + final DataType dataType = DataTypes.createArrayType(arrayDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Array_Map(final DataType elmDataType, final int loadSize) throws IOException { + final DataType mapDataType = DataTypes.createMapType(DataTypes.StringType, elmDataType, true); + final DataType dataType = DataTypes.createArrayType(mapDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @Test + void T_load_Array_Struct() throws IOException { + final List fields = + Arrays.asList( + DataTypes.createStructField("bo", DataTypes.BooleanType, true), + DataTypes.createStructField("by", DataTypes.ByteType, true), + DataTypes.createStructField("bi", DataTypes.BinaryType, true), + DataTypes.createStructField("do", DataTypes.DoubleType, true), + DataTypes.createStructField("fl", DataTypes.FloatType, true), + DataTypes.createStructField("in", DataTypes.IntegerType, true), + DataTypes.createStructField("lo", DataTypes.LongType, true), + DataTypes.createStructField("sh", DataTypes.ShortType, true), + DataTypes.createStructField("st", DataTypes.StringType, true)); + final DataType structDataType = DataTypes.createStructType(fields); + final DataType dataType = DataTypes.createArrayType(structDataType, true); + final int loadSize = 5; + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + assertEquals(0, vector.getArrayLength(i)); + } + } + + @ParameterizedTest + @MethodSource("data") + void T_load_Map(final DataType elmDataType, final int loadSize) throws Exception { + final DataType dataType = DataTypes.createMapType(DataTypes.StringType, elmDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + final ColumnarMap cm = vector.getMap(i); + assertEquals(0, cm.numElements()); + } + } + + @Test + void T_load_Struct() throws IOException { + final List e4Fields = + Arrays.asList( + DataTypes.createStructField("e1", DataTypes.LongType, true), + DataTypes.createStructField("e2", DataTypes.createArrayType(DataTypes.LongType, true), true), + DataTypes.createStructField("e3", + DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType, true), true) + ); + final List fields = + Arrays.asList( + DataTypes.createStructField("e1", DataTypes.StringType, true), + DataTypes.createStructField("e2", DataTypes.createArrayType(DataTypes.StringType, true), true), + DataTypes.createStructField("e3", + DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType, true), true), + DataTypes.createStructField("e4", DataTypes.createStructType(e4Fields), true) + ); + final DataType dataType = DataTypes.createStructType(fields); + final int loadSize = 5; + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + SparkEmptyLoader.load(vector, loadSize); + for (int i = 0; i < loadSize; i++) { + for (int j = 0; j < fields.size(); j++) { + final StructField field = fields.get(j); + final String name = field.name(); + final DataType type = field.dataType(); + if (name.equals("e1")) { + assertTrue(vector.getChild(j).isNullAt(i)); + } else if (name.equals("e2")) { + assertFalse(vector.getChild(j).isNullAt(i)); + assertEquals(0, vector.getChild(j).getArrayLength(i)); + } else if (name.equals("e3")) { + assertFalse(vector.getChild(j).isNullAt(i)); + final ColumnarMap cm = vector.getChild(j).getMap(i); + assertEquals(0, cm.numElements()); + } else if (name.equals("e4")) { + for (int k = 0; k < e4Fields.size(); k++) { + final StructField e4Field = e4Fields.get(k); + final String e4Name = e4Field.name(); + final DataType e4Type = e4Field.dataType(); + if (e4Name.equals("e1")) { + assertTrue(vector.getChild(j).getChild(k).isNullAt(i)); + } else if (e4Name.equals("e2")) { + assertFalse(vector.getChild(j).isNullAt(i)); + assertEquals(0, vector.getChild(j).getChild(k).getArrayLength(i)); + } else if (e4Name.equals("e3")) { + assertFalse(vector.getChild(j).isNullAt(i)); + final ColumnarMap e4Cm = vector.getChild(j).getChild(k).getMap(i); + assertEquals(0, e4Cm.numElements()); + } else { + assertTrue(false); + } + } + } else { + assertTrue(false); + } + } + } + } +} \ No newline at end of file diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoaderTest.java new file mode 100644 index 0000000..9e325bb --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoaderTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class SparkEmptyMapLoaderTest { + public static Stream data() { + final int loadSize = 5; + return Stream.of( + arguments(DataTypes.BooleanType, loadSize), + arguments(DataTypes.ByteType, loadSize), + arguments(DataTypes.StringType, loadSize), + arguments(DataTypes.createDecimalType(), loadSize), + arguments(DataTypes.DoubleType, loadSize), + arguments(DataTypes.FloatType, loadSize), + arguments(DataTypes.IntegerType, loadSize), + arguments(DataTypes.LongType, loadSize), + arguments(DataTypes.ShortType, loadSize) + ); + } + + @ParameterizedTest + @MethodSource("data") + void T_load(final DataType elmDataType, final int loadSize) throws Exception { + final DataType dataType = DataTypes.createMapType(DataTypes.StringType, elmDataType, true); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyMapLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + assertFalse(vector.isNullAt(i)); + final ColumnarMap cm = vector.getMap(i); + assertEquals(0, cm.numElements()); + } + } +} \ No newline at end of file diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoaderTest.java new file mode 100644 index 0000000..27a1551 --- /dev/null +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoaderTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package jp.co.yahoo.yosegi.spark.inmemory.loader; + +import jp.co.yahoo.yosegi.inmemory.ILoader; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class SparkEmptyStructLoaderTest { + @Test + void T_load() throws IOException { + final List e4Fields = + Arrays.asList( + DataTypes.createStructField("e1", DataTypes.LongType, true), + DataTypes.createStructField("e2", DataTypes.createArrayType(DataTypes.LongType, true), true), + DataTypes.createStructField("e3", + DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType, true), true) + ); + final List fields = + Arrays.asList( + DataTypes.createStructField("e1", DataTypes.StringType, true), + DataTypes.createStructField("e2", DataTypes.createArrayType(DataTypes.StringType, true), true), + DataTypes.createStructField("e3", + DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType, true), true), + DataTypes.createStructField("e4", DataTypes.createStructType(e4Fields), true) + ); + final DataType dataType = DataTypes.createStructType(fields); + final int loadSize = 5; + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ILoader loader = new SparkEmptyStructLoader(vector, loadSize); + loader.build(); + for (int i = 0; i < loadSize; i++) { + for (int j = 0; j < fields.size(); j++) { + final StructField field = fields.get(j); + final String name = field.name(); + final DataType type = field.dataType(); + if (name.equals("e1")) { + assertTrue(vector.getChild(j).isNullAt(i)); + } else if (name.equals("e2")) { + assertFalse(vector.getChild(j).isNullAt(i)); + assertEquals(0, vector.getChild(j).getArrayLength(i)); + } else if (name.equals("e3")) { + assertFalse(vector.getChild(j).isNullAt(i)); + final ColumnarMap cm = vector.getChild(j).getMap(i); + assertEquals(0, cm.numElements()); + } else if (name.equals("e4")) { + for (int k = 0; k < e4Fields.size(); k++) { + final StructField e4Field = e4Fields.get(k); + final String e4Name = e4Field.name(); + final DataType e4Type = e4Field.dataType(); + if (e4Name.equals("e1")) { + assertTrue(vector.getChild(j).getChild(k).isNullAt(i)); + } else if (e4Name.equals("e2")) { + assertFalse(vector.getChild(j).isNullAt(i)); + assertEquals(0, vector.getChild(j).getChild(k).getArrayLength(i)); + } else if (e4Name.equals("e3")) { + assertFalse(vector.getChild(j).isNullAt(i)); + final ColumnarMap e4Cm = vector.getChild(j).getChild(k).getMap(i); + assertEquals(0, e4Cm.numElements()); + } else { + assertTrue(false); + } + } + } else { + assertTrue(false); + } + } + } + } +} \ No newline at end of file diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoaderTest.java index 6893be5..86ae384 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoaderTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoaderTest.java @@ -14,7 +14,7 @@ */ package jp.co.yahoo.yosegi.spark.inmemory.loader; -import jp.co.yahoo.yosegi.inmemory.ISequentialLoader; +import jp.co.yahoo.yosegi.inmemory.ILoader; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.DataType; @@ -38,8 +38,8 @@ void T_load_Boolean_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.BooleanType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -51,8 +51,8 @@ void T_load_Byte_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.ByteType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -64,8 +64,8 @@ void T_load_Bytes_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.BinaryType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -77,8 +77,8 @@ void T_load_Double_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.DoubleType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -90,8 +90,8 @@ void T_load_Float_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.FloatType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -103,8 +103,8 @@ void T_load_Integer_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.IntegerType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -116,8 +116,8 @@ void T_load_Long_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.LongType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -129,8 +129,8 @@ void T_load_Short_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.ShortType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); @@ -142,8 +142,8 @@ void T_load_String_1() throws IOException { final int loadSize = 5; final DataType dataType = DataTypes.StringType; final OnHeapColumnVector vector = new OnHeapColumnVector(loadSize, dataType); - final ISequentialLoader loader = new SparkNullLoader(vector, loadSize); - loader.finish(); + final ILoader loader = new SparkNullLoader(vector, loadSize); + loader.build(); // NOTE: assert assertNull(vector, loadSize); diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoaderTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoaderTest.java index 2a0b266..80bc78c 100644 --- a/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoaderTest.java +++ b/src/test/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoaderTest.java @@ -220,4 +220,55 @@ void T_load_Struct_1(final String binaryMakerClassName) throws IOException { // NOTE: assert assertStruct(values, vector, loadSize, fields); } + + @ParameterizedTest + @MethodSource("D_spreadColumnBinaryMaker") + void T_load_Struct_checkDictionaryReset(final String binaryMakerClassName) throws IOException { + // NOTE: test data + // NOTE: expected + final String resource = "SparkStructLoaderTest/Struct_1.txt"; + final List> values = toValues(resource); + final int loadSize = values.size(); + + // NOTE: create ColumnBinary + final IColumn column = toSpreadColumn(resource); + final IColumnBinaryMaker binaryMaker = FindColumnBinaryMaker.get(binaryMakerClassName); + final ColumnBinary columnBinary = Utils.getColumnBinary(binaryMaker, column, null, null, null); + + // NOTE: load + final List fields = + Arrays.asList( + DataTypes.createStructField("bo", DataTypes.BooleanType, true), + DataTypes.createStructField("by", DataTypes.ByteType, true), + DataTypes.createStructField("bi", DataTypes.BinaryType, true), + DataTypes.createStructField("do", DataTypes.DoubleType, true), + DataTypes.createStructField("fl", DataTypes.FloatType, true), + DataTypes.createStructField("in", DataTypes.IntegerType, true), + DataTypes.createStructField("lo", DataTypes.LongType, true), + DataTypes.createStructField("sh", DataTypes.ShortType, true), + DataTypes.createStructField("st", DataTypes.StringType, true)); + final DataType dataType = DataTypes.createStructType(fields); + final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType); + final ISpreadLoader loader = new SparkStructLoader(vector, loadSize); + binaryMaker.load(columnBinary, loader); + + // NOTE: assert + assertStruct(values, vector, loadSize, fields); + + // NOTE: Check if the vector is reset + final String resource2 = "SparkStructLoaderTest/Struct_2.txt"; + final List> values2 = toValues(resource2); + final int loadSize2 = values2.size(); + + vector.reset(); + vector.reserve(loadSize2); + // NOTE: create ColumnBinary + final IColumn column2 = toSpreadColumn(resource2); + final IColumnBinaryMaker binaryMaker2 = FindColumnBinaryMaker.get(binaryMakerClassName); + final ColumnBinary columnBinary2 = Utils.getColumnBinary(binaryMaker2, column2, null, null, null); + final ISpreadLoader loader2 = new SparkStructLoader(vector, loadSize2); + binaryMaker.load(columnBinary2, loader2); + loader2.build(); + assertStruct(values2, vector, loadSize2, fields); + } } diff --git a/src/test/resources/SparkArrayLoaderTest/String_2.txt b/src/test/resources/SparkArrayLoaderTest/String_2.txt new file mode 100644 index 0000000..61b215e --- /dev/null +++ b/src/test/resources/SparkArrayLoaderTest/String_2.txt @@ -0,0 +1,7 @@ +["b"] +[] +["cc",null] +["aa",null,null] +["dd"] +["a",null,null] +["bb",null,null] diff --git a/src/test/resources/SparkStructLoaderTest/Struct_2.txt b/src/test/resources/SparkStructLoaderTest/Struct_2.txt new file mode 100644 index 0000000..3d03465 --- /dev/null +++ b/src/test/resources/SparkStructLoaderTest/Struct_2.txt @@ -0,0 +1,7 @@ +{"by":-128,"bi":"aaa","do":-1.7976931348623157e+308,"fl":-3.402823e+37,"in":-2147483648,"lo":-9223372036854775808,"sh":-32768,"st":"bbb"} +{"by":127,"bi":"aaa","do":1.7976931348623157e+308,"fl":3.402823e+38,"in":2147483647,"lo":9223372036854775807,"sh":32767,"st":"bbb"} +{} +{"by":null,"bi":null,"do":null,"fl":null,"in":null,"lo":null,"sh":null,"st":null} +{"by":-128,"bi":"aaa"} +{"by":-128,"bi":null} +{"by":null,"bi":"aaa"} diff --git a/src/test/resources/blackbox/Partition_Primitive_1.txt b/src/test/resources/blackbox/Partition_Primitive_1.txt new file mode 100644 index 0000000..b191b37 --- /dev/null +++ b/src/test/resources/blackbox/Partition_Primitive_1.txt @@ -0,0 +1,4 @@ +{"id":0, "bo":true, "by":127, "de":123.45678901, "do":1.7976931348623157e+308, "fl":3.402823e+37, "in":2147483647, "lo":9223372036854775807, "sh":32767, "st":"value1"} +{"id":1} +{"id":1, "bo":true} +{"id":2, "bo":false, "by":-128, "de":234.56789012, "do":-1.7976931348623157e+308, "fl":-3.402823e+37, "in":-2147483648, "lo":-9223372036854775808, "sh":-32768, "st":"value2"} \ No newline at end of file diff --git a/src/test/resources/blackbox/Partition_Primitive_2.txt b/src/test/resources/blackbox/Partition_Primitive_2.txt new file mode 100644 index 0000000..02b388b --- /dev/null +++ b/src/test/resources/blackbox/Partition_Primitive_2.txt @@ -0,0 +1,5 @@ +{"id":0, "p1":0, "p2":"a", "v":"v0a1"} +{"id":1, "p1":1, "p2":"b", "v":"v1b1"} +{"id":2, "p1":1, "p2":"b", "v":"v1b2"} +{"id":3, "p1":2, "p2":"a", "v":"v2a1"} +{"id":4, "p1":2, "p2":"b", "v":"v2b1"} \ No newline at end of file