diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java
index e003ee7..bff80bc 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkArrayLoaderFactory.java
@@ -18,7 +18,7 @@
import jp.co.yahoo.yosegi.inmemory.ILoader;
import jp.co.yahoo.yosegi.inmemory.ILoaderFactory;
import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkArrayLoader;
-import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader;
+import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyArrayLoader;
import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkRunLengthEncodingArrayLoader;
import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionArrayLoader;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
@@ -37,7 +37,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize)
throws IOException {
if (columnBinary == null) {
// FIXME:
- return new SparkNullLoader(vector, loadSize);
+ return new SparkEmptyArrayLoader(vector, loadSize);
}
switch (getLoadType(columnBinary, loadSize)) {
case ARRAY:
@@ -47,8 +47,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize)
case UNION:
return new SparkUnionArrayLoader(vector, loadSize);
default:
- // FIXME:
- return new SparkNullLoader(vector, loadSize);
+ return new SparkEmptyArrayLoader(vector, loadSize);
}
}
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java
index b6a38ba..fc06b4c 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkMapLoaderFactory.java
@@ -17,8 +17,8 @@
import jp.co.yahoo.yosegi.binary.ColumnBinary;
import jp.co.yahoo.yosegi.inmemory.ILoader;
import jp.co.yahoo.yosegi.inmemory.ILoaderFactory;
+import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyMapLoader;
import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkMapLoader;
-import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader;
import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionMapLoader;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
@@ -36,7 +36,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize)
throws IOException {
if (columnBinary == null) {
// FIXME:
- return new SparkNullLoader(vector, loadSize);
+ return new SparkEmptyMapLoader(vector, loadSize);
}
switch (getLoadType(columnBinary, loadSize)) {
case SPREAD:
@@ -45,7 +45,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize)
return new SparkUnionMapLoader(vector, loadSize);
default:
// FIXME:
- return new SparkNullLoader(vector, loadSize);
+ return new SparkEmptyMapLoader(vector, loadSize);
}
}
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java
index 3c4fb3d..0f0b9f8 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/factory/SparkStructLoaderFactory.java
@@ -17,7 +17,7 @@
import jp.co.yahoo.yosegi.binary.ColumnBinary;
import jp.co.yahoo.yosegi.inmemory.ILoader;
import jp.co.yahoo.yosegi.inmemory.ILoaderFactory;
-import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkNullLoader;
+import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyStructLoader;
import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkStructLoader;
import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkUnionStructLoader;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
@@ -36,7 +36,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize)
throws IOException {
if (columnBinary == null) {
// FIXME:
- return new SparkNullLoader(vector, loadSize);
+ return new SparkEmptyStructLoader(vector, loadSize);
}
switch (getLoadType(columnBinary, loadSize)) {
case SPREAD:
@@ -45,7 +45,7 @@ public ILoader createLoader(final ColumnBinary columnBinary, final int loadSize)
return new SparkUnionStructLoader(vector, loadSize);
default:
// FIXME:
- return new SparkNullLoader(vector, loadSize);
+ return new SparkEmptyStructLoader(vector, loadSize);
}
}
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java
index 9190f20..a272314 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkArrayLoader.java
@@ -38,7 +38,7 @@ public int getLoadSize() {
@Override
public void setNull(final int index) throws IOException {
- vector.putNull(index);
+ vector.putArray(index, 0, 0);
}
@Override
@@ -60,6 +60,10 @@ public void setArrayIndex(final int index, final int start, final int length) th
public void loadChild(final ColumnBinary columnBinary, final int childLength) throws IOException {
vector.getChild(0).reset();
vector.getChild(0).reserve(childLength);
+ if (vector.getChild(0).hasDictionary()) {
+ vector.getChild(0).reserveDictionaryIds(0);
+ vector.getChild(0).setDictionary(null);
+ }
SparkLoaderFactoryUtil.createLoaderFactory(vector.getChild(0))
.create(columnBinary, childLength);
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoader.java
new file mode 100644
index 0000000..29ad378
--- /dev/null
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyArrayLoader.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package jp.co.yahoo.yosegi.spark.inmemory.loader;
+
+import jp.co.yahoo.yosegi.inmemory.ILoader;
+import jp.co.yahoo.yosegi.inmemory.LoadType;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+
+import java.io.IOException;
+
+public class SparkEmptyArrayLoader implements ILoader {
+
+ private final WritableColumnVector vector;
+ private final int loadSize;
+
+ public SparkEmptyArrayLoader(final WritableColumnVector vector, final int loadSize) {
+ this.vector = vector;
+ this.loadSize = loadSize;
+ this.vector.getChild(0).reset();
+ this.vector.getChild(0).reserve(0);
+ if (this.vector.getChild(0).hasDictionary()) {
+ this.vector.getChild(0).reserveDictionaryIds(0);
+ this.vector.getChild(0).setDictionary(null);
+ }
+ }
+
+ @Override
+ public LoadType getLoaderType() {
+ return LoadType.NULL;
+ }
+
+ @Override
+ public int getLoadSize() {
+ return loadSize;
+ }
+
+ @Override
+ public void setNull(final int index) throws IOException {
+ // FIXME:
+ }
+
+ @Override
+ public void finish() throws IOException {
+ // FIXME:
+ }
+
+ @Override
+ public WritableColumnVector build() throws IOException {
+ for (int i = 0; i < loadSize; i++) {
+ vector.putArray(i, 0, 0);
+ }
+ return vector;
+ }
+
+ @Override
+ public boolean isLoadingSkipped() {
+ return true;
+ }
+}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoader.java
new file mode 100644
index 0000000..f0c6ea0
--- /dev/null
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyLoader.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package jp.co.yahoo.yosegi.spark.inmemory.loader;
+
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+
+public class SparkEmptyLoader {
+ public static void load(final WritableColumnVector vector, final int loadSize) throws IOException {
+ final Class klass = vector.dataType().getClass();
+ if (klass == ArrayType.class) {
+ new SparkEmptyArrayLoader(vector, loadSize).build();
+ } else if (klass == StructType.class) {
+ new SparkEmptyStructLoader(vector, loadSize).build();
+ } else if (klass == MapType.class) {
+ new SparkEmptyMapLoader(vector, loadSize).build();
+ } else {
+ new SparkNullLoader(vector, loadSize).build();
+ }
+ }
+}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoader.java
new file mode 100644
index 0000000..7e2f598
--- /dev/null
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyMapLoader.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package jp.co.yahoo.yosegi.spark.inmemory.loader;
+
+import jp.co.yahoo.yosegi.inmemory.ILoader;
+import jp.co.yahoo.yosegi.inmemory.LoadType;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+
+import java.io.IOException;
+
+public class SparkEmptyMapLoader implements ILoader {
+
+ private final WritableColumnVector vector;
+ private final int loadSize;
+
+ public SparkEmptyMapLoader(final WritableColumnVector vector, final int loadSize) {
+ this.vector = vector;
+ this.loadSize = loadSize;
+ }
+
+ @Override
+ public LoadType getLoaderType() {
+ return LoadType.NULL;
+ }
+
+ @Override
+ public int getLoadSize() {
+ return loadSize;
+ }
+
+ @Override
+ public void setNull(final int index) throws IOException {
+ // FIXME:
+ }
+
+ @Override
+ public void finish() throws IOException {
+ // FIXME:
+ }
+
+ @Override
+ public WritableColumnVector build() throws IOException {
+ vector.getChild(0).reset();
+ vector.getChild(0).reserve(0);
+ vector.getChild(1).reset();
+ vector.getChild(1).reserve(0);
+ return vector;
+ }
+
+ @Override
+ public boolean isLoadingSkipped() {
+ return true;
+ }
+}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoader.java
new file mode 100644
index 0000000..01a6bb0
--- /dev/null
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkEmptyStructLoader.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package jp.co.yahoo.yosegi.spark.inmemory.loader;
+
+import jp.co.yahoo.yosegi.inmemory.ILoader;
+import jp.co.yahoo.yosegi.inmemory.LoadType;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+
+public class SparkEmptyStructLoader implements ILoader {
+
+ private final WritableColumnVector vector;
+ private final int loadSize;
+ private final String[] names;
+
+ public SparkEmptyStructLoader(final WritableColumnVector vector, final int loadSize) {
+ this.vector = vector;
+ this.loadSize = loadSize;
+ final StructType structType = (StructType) vector.dataType();
+ this.names = structType.fieldNames();
+ for (int i = 0; i < names.length; i++) {
+ vector.getChild(i).reset();
+ vector.getChild(i).reserve(loadSize);
+ if (vector.getChild(i).hasDictionary()) {
+ vector.getChild(i).reserveDictionaryIds(0);
+ vector.getChild(i).setDictionary(null);
+ }
+ }
+ }
+
+ @Override
+ public LoadType getLoaderType() {
+ return LoadType.NULL;
+ }
+
+ @Override
+ public int getLoadSize() {
+ return loadSize;
+ }
+
+ @Override
+ public void setNull(final int index) throws IOException {
+ // FIXME:
+ }
+
+ @Override
+ public void finish() throws IOException {
+ // FIXME:
+ }
+
+ @Override
+ public WritableColumnVector build() throws IOException {
+ for (int i = 0; i < names.length; i++) {
+ SparkEmptyLoader.load(vector.getChild(i), loadSize);
+ }
+ return vector;
+ }
+
+ @Override
+ public boolean isLoadingSkipped() {
+ return true;
+ }
+}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java
index d8d04d1..9f8112f 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkNullLoader.java
@@ -14,13 +14,13 @@
*/
package jp.co.yahoo.yosegi.spark.inmemory.loader;
-import jp.co.yahoo.yosegi.inmemory.ISequentialLoader;
+import jp.co.yahoo.yosegi.inmemory.ILoader;
import jp.co.yahoo.yosegi.inmemory.LoadType;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import java.io.IOException;
-public class SparkNullLoader implements ISequentialLoader {
+public class SparkNullLoader implements ILoader {
private final WritableColumnVector vector;
private final int loadSize;
@@ -42,17 +42,22 @@ public int getLoadSize() {
@Override
public void setNull(final int index) throws IOException {
- // TODO:
+ // FIXME:
}
@Override
public void finish() throws IOException {
// FIXME:
- vector.putNulls(0, loadSize);
}
@Override
public WritableColumnVector build() throws IOException {
+ vector.putNulls(0, loadSize);
return vector;
}
+
+ @Override
+ public boolean isLoadingSkipped() {
+ return true;
+ }
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java
index 9c5dda1..3ebb236 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkRunLengthEncodingArrayLoader.java
@@ -49,7 +49,7 @@ public WritableColumnVector build() throws IOException {
@Override
public void setNull(final int index) throws IOException {
- vector.putNull(index);
+ vector.putArray(index, 0, 0);
}
@Override
@@ -59,7 +59,7 @@ public void setRowGroupCount(final int count) throws IOException {}
public void setNullAndRepetitions(
final int startIndex, final int repetitions, final int rowGroupIndex) throws IOException {
for (int i = 0; i < repetitions; i++) {
- vector.putNull(rowId);
+ vector.putArray(rowId, 0, 0);
rowId++;
}
}
@@ -83,6 +83,10 @@ public void setRowGourpIndexAndRepetitions(
public void loadChild(final ColumnBinary columnBinary, final int childLength) throws IOException {
vector.getChild(0).reset();
vector.getChild(0).reserve(childLength);
+ if (vector.getChild(0).hasDictionary()) {
+ vector.getChild(0).reserveDictionaryIds(0);
+ vector.getChild(0).setDictionary(null);
+ }
SparkLoaderFactoryUtil.createLoaderFactory(vector.getChild(0))
.create(columnBinary, childLength);
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java
index b00600e..828a1c7 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkStructLoader.java
@@ -65,13 +65,21 @@ public void finish() throws IOException {
@Override
public WritableColumnVector build() throws IOException {
+ final StructType structType = (StructType) vector.dataType();
+ final String[] names = structType.fieldNames();
+ // NOTE: Fill unloaded columns with nulls.
+ for (int i = 0; i < names.length; i++) {
+ if (loaderFactoryMap.containsKey(names[i])) {
+ SparkEmptyLoader.load(vector.getChild(i), loadSize);
+ }
+ }
return vector;
}
@Override
public void loadChild(final ColumnBinary columnBinary, final int loadSize) throws IOException {
if (loaderFactoryMap.containsKey(columnBinary.columnName)) {
- loaderFactoryMap.get(columnBinary.columnName).create(columnBinary, loadSize);
+ loaderFactoryMap.remove(columnBinary.columnName).create(columnBinary, loadSize);
} else {
// FIXME:
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java
index 346fe26..dfa05f2 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionArrayLoader.java
@@ -25,10 +25,17 @@
public class SparkUnionArrayLoader implements IUnionLoader {
private final WritableColumnVector vector;
private final int loadSize;
+ private boolean childLoaded;
public SparkUnionArrayLoader(final WritableColumnVector vector, final int loadSize) {
this.vector = vector;
this.loadSize = loadSize;
+ this.vector.getChild(0).reset();
+ this.vector.getChild(0).reserve(0);
+ if (this.vector.getChild(0).hasDictionary()) {
+ this.vector.getChild(0).reserveDictionaryIds(0);
+ this.vector.getChild(0).setDictionary(null);
+ }
}
@Override
@@ -38,30 +45,33 @@ public int getLoadSize() {
@Override
public void setNull(final int index) throws IOException {
- vector.putNull(index);
+ // FIXME:
}
@Override
public void finish() throws IOException {
- //
+ // FIXME:
}
@Override
public WritableColumnVector build() throws IOException {
+ if (!childLoaded) {
+ for (int i = 0; i < loadSize; i++) {
+ vector.putArray(i, 0, 0);
+ }
+ }
return vector;
}
@Override
public void setIndexAndColumnType(final int index, final ColumnType columnType) throws IOException {
// FIXME:
- if (columnType != ColumnType.ARRAY) {
- vector.putNull(index);
- }
}
@Override
public void loadChild(final ColumnBinary columnBinary, final int childLoadSize) throws IOException {
if (columnBinary.columnType == ColumnType.ARRAY) {
+ childLoaded = true;
SparkLoaderFactoryUtil.createLoaderFactory(vector).create(columnBinary, childLoadSize);
}
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java
index 107ccb0..68274e2 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionMapLoader.java
@@ -29,6 +29,10 @@ public class SparkUnionMapLoader implements IUnionLoader {
public SparkUnionMapLoader(WritableColumnVector vector, int loadSize) {
this.vector = vector;
this.loadSize = loadSize;
+ this.vector.getChild(0).reset();
+ this.vector.getChild(0).reserve(0);
+ this.vector.getChild(1).reset();
+ this.vector.getChild(1).reserve(0);
}
@Override
@@ -38,12 +42,12 @@ public int getLoadSize() {
@Override
public void setNull(int index) throws IOException {
- vector.putNull(index);
+ // FIXME:
}
@Override
public void finish() throws IOException {
- //
+ // FIXME:
}
@Override
@@ -54,9 +58,6 @@ public WritableColumnVector build() throws IOException {
@Override
public void setIndexAndColumnType(int index, ColumnType columnType) throws IOException {
// FIXME:
- if (columnType != ColumnType.SPREAD) {
- vector.putNull(index);
- }
}
@Override
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java
index c4e86de..dfc2341 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/inmemory/loader/SparkUnionStructLoader.java
@@ -25,6 +25,7 @@
public class SparkUnionStructLoader implements IUnionLoader {
private final WritableColumnVector vector;
private final int loadSize;
+ private boolean childLoaded;
public SparkUnionStructLoader(WritableColumnVector vector, int loadSize) {
this.vector = vector;
@@ -38,30 +39,31 @@ public int getLoadSize() {
@Override
public void setNull(int index) throws IOException {
- vector.putNull(index);
+ // FIXME:
}
@Override
public void finish() throws IOException {
- //
+ // FIXME:
}
@Override
public WritableColumnVector build() throws IOException {
+ if (!childLoaded) {
+ new SparkEmptyStructLoader(vector, loadSize).build();
+ }
return vector;
}
@Override
public void setIndexAndColumnType(int index, ColumnType columnType) throws IOException {
// FIXME:
- if (columnType != ColumnType.SPREAD) {
- vector.putNull(index);
- }
}
@Override
public void loadChild(ColumnBinary columnBinary, int childLoadSize) throws IOException {
if (columnBinary.columnType == ColumnType.SPREAD) {
+ childLoaded = true;
SparkLoaderFactoryUtil.createLoaderFactory(vector).create(columnBinary, childLoadSize);
}
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java
index ce265d8..19b9a58 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchConverter.java
@@ -17,10 +17,13 @@
import jp.co.yahoo.yosegi.binary.ColumnBinary;
import jp.co.yahoo.yosegi.inmemory.IRawConverter;
import jp.co.yahoo.yosegi.spark.inmemory.SparkLoaderFactoryUtil;
+import jp.co.yahoo.yosegi.spark.inmemory.loader.SparkEmptyLoader;
import jp.co.yahoo.yosegi.spark.utils.PartitionColumnUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import java.io.IOException;
@@ -31,7 +34,7 @@ public class SparkColumnarBatchConverter implements IRawConverter
private final StructType schema;
private final StructType partitionSchema;
private final InternalRow partitionValue;
- private final WritableColumnVector[] childColumns;
+ private final ColumnVector[] childColumns;
private final Map keyIndexMap;
public SparkColumnarBatchConverter(
@@ -39,7 +42,7 @@ public SparkColumnarBatchConverter(
final StructType partitionSchema,
final InternalRow partitionValue,
final Map keyIndexMap,
- final WritableColumnVector[] childColumns) {
+ final ColumnVector[] childColumns) {
this.schema = schema;
this.partitionSchema = partitionSchema;
this.partitionValue = partitionValue;
@@ -50,13 +53,13 @@ public SparkColumnarBatchConverter(
@Override
public ColumnarBatch convert(final List raw, final int loadSize) throws IOException {
// NOTE: initialize
- for (int i = 0; i < childColumns.length; i++) {
+ for (int i = 0; i < schema.length(); i++) {
// FIXME: how to initialize vector with dictionary.
- childColumns[i].reset();
- childColumns[i].reserve(loadSize);
- if (childColumns[i].hasDictionary()) {
- childColumns[i].reserveDictionaryIds(0);
- childColumns[i].setDictionary(null);
+ ((WritableColumnVector) childColumns[i]).reset();
+ ((WritableColumnVector) childColumns[i]).reserve(loadSize);
+ if (((WritableColumnVector) childColumns[i]).hasDictionary()) {
+ ((WritableColumnVector) childColumns[i]).reserveDictionaryIds(0);
+ ((WritableColumnVector) childColumns[i]).setDictionary(null);
}
}
final ColumnarBatch result = new ColumnarBatch(childColumns);
@@ -69,16 +72,16 @@ public ColumnarBatch convert(final List raw, final int loadSize) t
}
final int index = keyIndexMap.get(columnBinary.columnName);
isSet[index] = true;
- SparkLoaderFactoryUtil.createLoaderFactory(childColumns[index]).create(columnBinary, loadSize);
+ SparkLoaderFactoryUtil.createLoaderFactory(((WritableColumnVector) childColumns[index])).create(columnBinary, loadSize);
}
- // NOTE: null columns
- for (int i = 0; i < childColumns.length; i++) {
+ // NOTE: Empty columns
+ for (int i = 0; i < schema.length(); i++) {
if (!isSet[i]) {
- childColumns[i].putNulls(0, loadSize);
+ SparkEmptyLoader.load((WritableColumnVector) childColumns[i], loadSize);
}
}
// NOTE: partitionColumns
- final WritableColumnVector[] partColumns =
+ final ColumnVector[] partColumns =
PartitionColumnUtil.createPartitionColumns(partitionSchema, partitionValue, loadSize);
for (int i = schema.length(), n = 0; i < childColumns.length; i++, n++) {
childColumns[i] = partColumns[n];
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchReader.java b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchReader.java
index aff02b0..883ffee 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchReader.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/reader/SparkColumnarBatchReader.java
@@ -20,9 +20,9 @@
import jp.co.yahoo.yosegi.spread.expression.IExpressionNode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import java.io.IOException;
@@ -33,7 +33,7 @@
public class SparkColumnarBatchReader implements IColumnarBatchReader {
private final WrapReader reader;
- private final WritableColumnVector[] childColumns;
+ private final ColumnVector[] childColumns;
public SparkColumnarBatchReader(
final StructType partitionSchema,
@@ -47,7 +47,7 @@ public SparkColumnarBatchReader(
final IExpressionNode node)
throws IOException {
final StructField[] fields = schema.fields();
- childColumns = new OnHeapColumnVector[schema.length() + partitionSchema.length()];
+ childColumns = new ColumnVector[schema.length() + partitionSchema.length()];
final Map keyIndexMap = new HashMap();
for (int i = 0; i < fields.length; i++) {
keyIndexMap.put(fields[i].name(), i);
@@ -85,6 +85,9 @@ public ColumnarBatch next() throws IOException {
public void close() throws Exception {
reader.close();
for (int i = 0; i < childColumns.length; i++) {
+ if (childColumns[i] == null) {
+ continue;
+ }
childColumns[i].close();
}
}
diff --git a/src/main/java/jp/co/yahoo/yosegi/spark/utils/PartitionColumnUtil.java b/src/main/java/jp/co/yahoo/yosegi/spark/utils/PartitionColumnUtil.java
index c52ea16..c08d4df 100644
--- a/src/main/java/jp/co/yahoo/yosegi/spark/utils/PartitionColumnUtil.java
+++ b/src/main/java/jp/co/yahoo/yosegi/spark/utils/PartitionColumnUtil.java
@@ -17,20 +17,21 @@
*/
package jp.co.yahoo.yosegi.spark.utils;
+import org.apache.spark.sql.execution.vectorized.ConstantColumnVector;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
+import org.apache.spark.sql.vectorized.ColumnVector;
public final class PartitionColumnUtil{
- public static OnHeapColumnVector[] createPartitionColumns( final StructType partitionColumns , final InternalRow partitionValues , final int rowCount ){
+ public static ColumnVector[] createPartitionColumns( final StructType partitionColumns , final InternalRow partitionValues , final int rowCount ){
StructField[] fields = partitionColumns.fields();
- OnHeapColumnVector[] vectors = new OnHeapColumnVector[ fields.length ];
+ ColumnVector[] vectors = new ConstantColumnVector[ fields.length ];
for( int i = 0 ; i < vectors.length ; i++ ){
- vectors[i] = new OnHeapColumnVector( rowCount , fields[i].dataType() );
- ColumnVectorUtils.populate( vectors[i] , partitionValues , i );
+ vectors[i] = new ConstantColumnVector( rowCount , fields[i].dataType() );
+ ColumnVectorUtils.populate( (ConstantColumnVector) vectors[i] , partitionValues , i );
}
return vectors;
}
diff --git a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..43dad69
--- /dev/null
+++ b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+# agreements. See the NOTICE file distributed with this work for additional information regarding
+# copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License. You may obtain a
+# copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+#
Unless required by applicable law or agreed to in writing, software distributed under the
+# License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+# express or implied. See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+jp.co.yahoo.yosegi.spark.YosegiFileFormat
\ No newline at end of file
diff --git a/src/main/scala/jp/co/yahoo/yosegi/spark/YosegiFileFormat.scala b/src/main/scala/jp/co/yahoo/yosegi/spark/YosegiFileFormat.scala
index 7bec533..c8a4575 100644
--- a/src/main/scala/jp/co/yahoo/yosegi/spark/YosegiFileFormat.scala
+++ b/src/main/scala/jp/co/yahoo/yosegi/spark/YosegiFileFormat.scala
@@ -118,7 +118,7 @@ class YosegiFileFormat extends FileFormat with DataSourceRegister with Serializa
val readSchema:DataType = DataType.fromJson( requiredSchemaJson )
val partSchema:DataType = DataType.fromJson( partitionSchemaJson )
assert(file.partitionValues.numFields == partitionSchema.size )
- val path:Path = new Path( new URI(file.filePath) )
+ val path:Path = file.filePath.toPath
val fs:FileSystem = path.getFileSystem( broadcastedHadoopConf.value.value )
val yosegiConfig = new jp.co.yahoo.yosegi.config.Configuration()
if( expandOption.nonEmpty ){
diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/DataSourceTest.java b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/DataSourceTest.java
new file mode 100644
index 0000000..1ed096b
--- /dev/null
+++ b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/DataSourceTest.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package jp.co.yahoo.yosegi.spark.blackbox;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DecimalType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class DataSourceTest {
+ private static SparkSession spark;
+ private static SQLContext sqlContext;
+ private static final String appName = "DataSourceTest";
+
+ public boolean deleteDirectory(final File directory) {
+ final File[] allContents = directory.listFiles();
+ if (allContents != null) {
+ for (final File file : allContents) {
+ deleteDirectory(file);
+ }
+ }
+ return directory.delete();
+ }
+
+ public String getTmpPath() {
+ String tmpdir = System.getProperty("java.io.tmpdir");
+ if (tmpdir.endsWith("/")) {
+ tmpdir = tmpdir.substring(0, tmpdir.length() - 1);
+ }
+ return tmpdir + "/" + appName;
+ }
+
+ public String getLocation(final String table) {
+ String tmpPath = getTmpPath();
+ return tmpPath + "/" + table;
+ }
+
+ @BeforeAll
+ static void initAll() {
+ spark = SparkSession.builder().appName(appName).master("local[*]").getOrCreate();
+ sqlContext = spark.sqlContext();
+ }
+
+ @AfterAll
+ static void tearDownAll() {
+ spark.close();
+ }
+
+ @AfterEach
+ void tearDown() {
+ deleteDirectory(new File(getTmpPath()));
+ }
+
+ @Test
+ void T_DataSource_Primitive_1() throws IOException {
+ final String location = getLocation("primitive1");
+ final int precision = DecimalType.MAX_PRECISION();
+ final int scale = DecimalType.MINIMUM_ADJUSTED_SCALE();
+ /**
+ * CREATE TABLE primitive1 (
+ * id INT,
+ * bo BOOLEAN,
+ * by BYTE,
+ * de DECIMAL(38,6),
+ * do DOUBLE,
+ * fl FLOAT,
+ * in INT,
+ * lo LONG,
+ * sh SHORT,
+ * st STRING
+ * )
+ * USING yosegi
+ * LOCATION ''
+ */
+ final String ddl = String.format("CREATE TABLE primitive1 (\n" +
+ " id INT,\n" +
+ " bo BOOLEAN,\n" +
+ " by BYTE,\n" +
+ " de DECIMAL(%d, %d),\n" +
+ " do DOUBLE,\n" +
+ " fl FLOAT,\n" +
+ " in INT,\n" +
+ " lo LONG,\n" +
+ " sh SHORT,\n" +
+ " st STRING\n" +
+ ")\n" +
+ "USING yosegi\n" +
+ "LOCATION '%s';", precision, scale, location);
+ spark.sql(ddl);
+
+ /**
+ * FIXME: cannot insert decimal value.
+ */
+ final String insertSql = "INSERT INTO primitive1\n" +
+ "(id, bo, by, de, do, fl, in, lo, sh, st)\n" +
+ "VALUES\n" +
+ "(0, true, 127, 123.45678901, 1.7976931348623157e+308, 3.402823e+37, 2147483647, 9223372036854775807, 32767, 'value1');";
+ spark.sql(insertSql);
+
+ List rows = spark.sql("SELECT * FROM primitive1;").collectAsList();
+ assertEquals(rows.get(0).getAs("id"), Integer.valueOf(0));
+ assertEquals(rows.get(0).getAs("bo"), Boolean.valueOf(true));
+ assertEquals(rows.get(0).getAs("by"), Byte.valueOf((byte) 127));
+ assertNull(rows.get(0).getAs("de"));
+ assertEquals(rows.get(0).getAs("do"), Double.valueOf(1.7976931348623157e+308));
+ assertEquals(rows.get(0).getAs("fl"), Float.valueOf(3.402823e+37F));
+ assertEquals(rows.get(0).getAs("in"), Integer.valueOf(2147483647));
+ assertEquals(rows.get(0).getAs("lo"), Long.valueOf(9223372036854775807L));
+ assertEquals(rows.get(0).getAs("sh"), Short.valueOf((short) 32767));
+ assertEquals(rows.get(0).getAs("st"), "value1");
+ }
+
+ @Test
+ void T_DataSource_Primitive_2() throws IOException {
+ final String location = getLocation("primitive2");
+ final int precision = DecimalType.MAX_PRECISION();
+ final int scale = DecimalType.MINIMUM_ADJUSTED_SCALE();
+ /**
+ * CREATE TABLE primitive2 (
+ * id INT,
+ * bo BOOLEAN,
+ * by BYTE,
+ * de DOUBLE,
+ * do DOUBLE,
+ * fl FLOAT,
+ * in INT,
+ * lo LONG,
+ * sh SHORT,
+ * st STRING
+ * )
+ * USING yosegi
+ * LOCATION ''
+ */
+ final String ddl1 = String.format("CREATE TABLE primitive2 (\n" +
+ " id INT,\n" +
+ " bo BOOLEAN,\n" +
+ " by BYTE,\n" +
+ " de DOUBLE,\n" +
+ " do DOUBLE,\n" +
+ " fl FLOAT,\n" +
+ " in INT,\n" +
+ " lo LONG,\n" +
+ " sh SHORT,\n" +
+ " st STRING\n" +
+ ")\n" +
+ "USING yosegi\n" +
+ "LOCATION '%s';", location);
+ spark.sql(ddl1);
+
+ final String insertSql = "INSERT INTO primitive2\n" +
+ "(id, bo, by, de, do, fl, in, lo, sh, st)\n" +
+ "VALUES\n" +
+ "(0, true, 127, 123.45678901, 1.7976931348623157e+308, 3.402823e+37, 2147483647, 9223372036854775807, 32767, 'value1');";
+ spark.sql(insertSql);
+
+ spark.sql("DROP TABLE primitive2;");
+
+ /**
+ * CREATE TABLE primitive2 (
+ * id INT,
+ * bo BOOLEAN,
+ * by BYTE,
+ * de DECIMAL(38,6),
+ * do DOUBLE,
+ * fl FLOAT,
+ * in INT,
+ * lo LONG,
+ * sh SHORT,
+ * st STRING
+ * )
+ * USING yosegi
+ * LOCATION ''
+ */
+ final String ddl2 = String.format("CREATE TABLE primitive2 (\n" +
+ " id INT,\n" +
+ " bo BOOLEAN,\n" +
+ " by BYTE,\n" +
+ " de DECIMAL(%d, %d),\n" +
+ " do DOUBLE,\n" +
+ " fl FLOAT,\n" +
+ " in INT,\n" +
+ " lo LONG,\n" +
+ " sh SHORT,\n" +
+ " st STRING\n" +
+ ")\n" +
+ "USING yosegi\n" +
+ "LOCATION '%s';", precision, scale, location);
+ spark.sql(ddl2);
+
+ List rows = spark.sql("SELECT * FROM primitive2;").collectAsList();
+ assertEquals(rows.get(0).getAs("id"), Integer.valueOf(0));
+ assertEquals(rows.get(0).getAs("bo"), Boolean.valueOf(true));
+ assertEquals(rows.get(0).getAs("by"), Byte.valueOf((byte) 127));
+ assertEquals(rows.get(0).getAs("de"), BigDecimal.valueOf(123.456789));
+ assertEquals(rows.get(0).getAs("do"), Double.valueOf(1.7976931348623157e+308));
+ assertEquals(rows.get(0).getAs("fl"), Float.valueOf(3.402823e+37F));
+ assertEquals(rows.get(0).getAs("in"), Integer.valueOf(2147483647));
+ assertEquals(rows.get(0).getAs("lo"), Long.valueOf(9223372036854775807L));
+ assertEquals(rows.get(0).getAs("sh"), Short.valueOf((short) 32767));
+ assertEquals(rows.get(0).getAs("st"), "value1");
+ }
+
+ @Test
+ void T_DataSource_Expand_1() throws IOException {
+ final String location = getLocation("flatten1");
+ /**
+ * CREATE TABLE expand1 (
+ * id INT,
+ * a ARRAY
+ * )
+ * USING yosegi
+ * LOCATION '';
+ */
+ final String ddl1 = String.format("CREATE TABLE expand1 (\n" +
+ " id INT,\n" +
+ " a ARRAY\n" +
+ ")\n" +
+ "USING yosegi\n" +
+ "LOCATION '%s';", location);
+ spark.sql(ddl1);
+
+ final String insertSql = "INSERT INTO expand1\n" +
+ "(id, a)\n" +
+ "VALUES\n" +
+ "(0, array(1,2,3));";
+ spark.sql(insertSql);
+
+ spark.sql("DROP TABLE expand1;");
+
+ /**
+ * CREATE TABLE expand1(
+ * id INT,
+ * aa INT
+ * )
+ * USING yosegi
+ * LOCATION ''
+ * OPTIONS (
+ * 'spread.reader.expand.column'='{"base":{"node":"a", "link_name":"aa"}}'
+ * );
+ */
+ final String ddl2 = String.format("CREATE TABLE expand1(\n" +
+ " id INT,\n" +
+ " aa INT\n" +
+ ")\n" +
+ "USING yosegi\n" +
+ "LOCATION '%s'\n" +
+ "OPTIONS (\n" +
+ " 'spread.reader.expand.column'='{\"base\":{\"node\":\"a\", \"link_name\":\"aa\"}}'\n" +
+ ");", location);
+ spark.sql(ddl2);
+
+ List rows = spark.sql("SELECT * FROM expand1 ORDER BY id, aa;").collectAsList();
+ assertEquals(rows.get(0).getAs("id"), Integer.valueOf(0));
+ assertEquals(rows.get(1).getAs("id"), Integer.valueOf(0));
+ assertEquals(rows.get(2).getAs("id"), Integer.valueOf(0));
+ assertEquals(rows.get(0).getAs("aa"), Integer.valueOf(1));
+ assertEquals(rows.get(1).getAs("aa"), Integer.valueOf(2));
+ assertEquals(rows.get(2).getAs("aa"), Integer.valueOf(3));
+ }
+
+ @Test
+ void T_DataSource_Flatten_1() throws IOException {
+ final String location = getLocation("flatten1");
+ /**
+ * CREATE TABLE flatten1 (
+ * id INT,
+ * m MAP
+ * )
+ * USING yosegi
+ * LOCATION '';
+ */
+ final String ddl1 = String.format("CREATE TABLE flatten1 (\n" +
+ " id INT,\n" +
+ " m MAP\n" +
+ ")\n" +
+ "USING yosegi\n" +
+ "LOCATION '%s';", location);
+ spark.sql(ddl1);
+
+ final String insertSql = "INSERT INTO flatten1\n" +
+ "(id, m)\n" +
+ "VALUES\n" +
+ "(0, map('k1', 'v1', 'k2', 'v2'));";
+ spark.sql(insertSql);
+
+ spark.sql("DROP TABLE flatten1;");
+
+ /**
+ * CREATE TABLE flatten1 (
+ * id INT,
+ * mk1 STRING,
+ * mk2 STRING
+ * )
+ * USING yosegi
+ * LOCATION ''
+ * OPTIONS (
+ * 'spread.reader.flatten.column'='[{"link_name":"id", "nodes":["id"]}, {"link_name":"mk1", "nodes":["m","k1"]}, {"link_name":"mk2", "nodes":["m","k2"]}]'
+ * );
+ */
+ final String ddl2 = String.format("CREATE TABLE flatten1 (\n" +
+ " id INT,\n" +
+ " mk1 STRING,\n" +
+ " mk2 STRING\n" +
+ ")\n" +
+ "USING yosegi\n" +
+ "LOCATION '%s'\n" +
+ "OPTIONS (\n" +
+ " 'spread.reader.flatten.column'='[{\"link_name\":\"id\", \"nodes\":[\"id\"]}, {\"link_name\":\"mk1\", \"nodes\":[\"m\",\"k1\"]}, {\"link_name\":\"mk2\", \"nodes\":[\"m\",\"k2\"]}]'\n" +
+ ");", location);
+ spark.sql(ddl2);
+
+ List rows = spark.sql("SELECT * FROM flatten1;").collectAsList();
+ assertEquals(rows.get(0).getAs("id"), Integer.valueOf(0));
+ assertEquals(rows.get(0).getAs("mk1"), "v1");
+ assertEquals(rows.get(0).getAs("mk2"), "v2");
+ }
+}
diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlatten.java b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlattenTest.java
similarity index 97%
rename from src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlatten.java
rename to src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlattenTest.java
index 3773720..790588d 100644
--- a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlatten.java
+++ b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/ExpandFlattenTest.java
@@ -40,7 +40,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-public class ExpandFlatten {
+public class ExpandFlattenTest {
private static SparkSession spark;
private static SQLContext sqlContext;
private static final String appName = "ExpandFlattenTest";
@@ -60,7 +60,11 @@ public String getResourcePath(final String resource) {
}
public String getTmpPath() {
- return System.getProperty("java.io.tmpdir") + appName + ".yosegi";
+ String tmpdir = System.getProperty("java.io.tmpdir");
+ if (tmpdir.endsWith("/")) {
+ tmpdir = tmpdir.substring(0, tmpdir.length() - 1);
+ }
+ return tmpdir + "/" + appName + ".yosegi";
}
public Dataset loadJsonFile(final String resource, final StructType schema) {
diff --git a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/Load.java b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java
similarity index 89%
rename from src/test/java/jp/co/yahoo/yosegi/spark/blackbox/Load.java
rename to src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java
index 9b8fa97..65ddaf7 100644
--- a/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/Load.java
+++ b/src/test/java/jp/co/yahoo/yosegi/spark/blackbox/LoadTest.java
@@ -43,7 +43,7 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class Load {
+public class LoadTest {
private static SparkSession spark;
private static SQLContext sqlContext;
private static String appName = "LoadTest";
@@ -63,7 +63,11 @@ public String getResourcePath(final String resource) {
}
public String getTmpPath() {
- return System.getProperty("java.io.tmpdir") + appName + ".yosegi";
+ String tmpdir = System.getProperty("java.io.tmpdir");
+ if (tmpdir.endsWith("/")) {
+ tmpdir = tmpdir.substring(0, tmpdir.length() - 1);
+ }
+ return tmpdir + "/" + appName + ".yosegi";
}
public Dataset loadJsonFile(final String resource, final StructType schema) {
@@ -318,33 +322,24 @@ void T_load_Array_Array_Integer_1() throws IOException {
final int jIndex = ldfj.get(i).fieldIndex("aa");
final int yIndex = ldfy.get(i).fieldIndex("aa");
if (ldfj.get(i).isNullAt(jIndex)) {
- if (ldfy.get(i).isNullAt(yIndex)) {
- // NOTE: json:null, yosegi:null
- assertTrue(ldfy.get(i).isNullAt(yIndex));
- } else {
- // FIXME: json:null, yosegi:[]
- assertTrue(false);
- }
+ // NOTE: json:null, yosegi:[]
+ assertEquals(0, ldfy.get(i).getList(yIndex).size());
} else {
if (ldfy.get(i).isNullAt(yIndex)) {
// NOTE: json:[], yosegi:null
- assertEquals(0, ldfj.get(i).getList(jIndex).size());
+ assertTrue(false);
} else {
final List> ldfj2 = ldfj.get(i).getList(jIndex);
final List> ldfy2 = ldfy.get(i).getList(yIndex);
for (int j = 0; j < ldfj2.size(); j++) {
final WrappedArray waj = ldfj2.get(j);
final WrappedArray way = ldfy2.get(j);
- if (way == null) {
- if (waj == null) {
- // NOTE: json:[null], yosegi:[null]
- assertNull(waj);
- } else {
- // NOTE: json:[[]], yosegi:[null]
- assertEquals(0, waj.size());
- }
+ if (waj == null) {
+ // NOTE: json:[null], yosegi:[[]]
+ assertEquals(0, way.size());
} else {
// NOTE: json:[[]], yosegi:[[]]
+ assertEquals(waj.size(), way.size());
for (int k = 0; k < waj.size(); k++) {
assertEquals(waj.apply(k), way.apply(k));
}
@@ -393,59 +388,28 @@ void T_load_Array_Struct_Primitive_1() throws IOException {
final int jIndex = ldfj.get(i).fieldIndex("as");
final int yIndex = ldfy.get(i).fieldIndex("as");
if (ldfj.get(i).isNullAt(jIndex)) {
- if (ldfy.get(i).isNullAt(yIndex)) {
- // NOTE: json:null, yosegi:null
- assertTrue(ldfy.get(i).isNullAt(yIndex));
- } else {
- // FIXME: json:null, yosegi:[]
- assertTrue(false);
- }
+ // NOTE: json:null, yosegi:[]
+ assertEquals(0, ldfy.get(i).getList(yIndex).size());
} else {
if (ldfy.get(i).isNullAt(yIndex)) {
- final List lrj = ldfj.get(i).getList(jIndex);
- for (int j = 0; j < lrj.size(); j++) {
- final Row rj = lrj.get(j);
- if (rj == null) {
- // NOTE: json:[null], yosegi:null
- assertNull(rj);
- } else {
- // NOTE: json[as.field:null], yosegi:null
- for (final StructField field : fields) {
- final String name = field.name();
- assertNull(rj.getAs(name));
- }
- }
- }
+ assertTrue(false);
} else {
final List lrj = ldfj.get(i).getList(jIndex);
final List lry = ldfy.get(i).getList(yIndex);
for (int j = 0; j < lrj.size(); j++) {
final Row rj = lrj.get(j);
final Row ry = lry.get(j);
- if (ry == null) {
- if (rj == null) {
- // NOTE: json:[null], yosegi:[null]
- assertNull(rj);
- } else {
- // NOTE: json:[{}], yosegi:[null]
- for (final StructField field : fields) {
- final String name = field.name();
- assertNull(rj.getAs(name));
- }
+ if (rj == null) {
+ // NOTE: json:[null], yosegi:[{}]
+ for (final StructField field : fields) {
+ final String name = field.name();
+ assertNull(ry.getAs(name));
}
} else {
- if (rj == null) {
- // NOTE: json:[null], yosegi:[{}]
- for (final StructField field : fields) {
- final String name = field.name();
- assertNull(ry.getAs(name));
- }
- } else {
- // NOTE: json:[{}], yosegi:[{}]
- for (final StructField field : fields) {
- final String name = field.name();
- assertEquals((Object) rj.getAs(name), (Object) ry.getAs(name));
- }
+ // NOTE: json:[{}], yosegi:[{}]
+ for (final StructField field : fields) {
+ final String name = field.name();
+ assertEquals((Object) rj.getAs(name), (Object) ry.getAs(name));
}
}
}
@@ -480,31 +444,12 @@ void T_load_Array_Map_Integer_1() throws IOException {
final int jIndex = ldfj.get(i).fieldIndex("am");
final int yIndex = ldfy.get(i).fieldIndex("am");
if (ldfj.get(i).isNullAt(jIndex)) {
- if (ldfy.get(i).isNullAt(yIndex)) {
- // NOTE: json:null, yosegi:null
- assertTrue(ldfy.get(i).isNullAt(yIndex));
- } else {
- // FIXME: json:null, yosegi:[]
- assertTrue(false);
- }
+ // NOTE: json:null, yosegi:[]
+ assertEquals(0, ldfy.get(i).getList(yIndex).size());
} else {
if (ldfy.get(i).isNullAt(yIndex)) {
// NOTE: json:[], yosegi:null
- final List