Skip to content

Commit

Permalink
Merge pull request #34 from koijima/yosegi-2.0_spark-3.3
Browse files Browse the repository at this point in the history
[3.3]Reset the vector of Array, Struct children.
  • Loading branch information
jufukuka authored Aug 6, 2024
2 parents c2d6483 + 24d093e commit 75c450c
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public void setArrayIndex(final int index, final int start, final int length) th
public void loadChild(final ColumnBinary columnBinary, final int childLength) throws IOException {
vector.getChild(0).reset();
vector.getChild(0).reserve(childLength);
if (vector.getChild(0).hasDictionary()) {
vector.getChild(0).reserveDictionaryIds(0);
vector.getChild(0).setDictionary(null);
}
SparkLoaderFactoryUtil.createLoaderFactory(vector.getChild(0))
.create(columnBinary, childLength);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,21 @@ public void finish() throws IOException {

@Override
public WritableColumnVector build() throws IOException {
final StructType structType = (StructType) vector.dataType();
final String[] names = structType.fieldNames();
// NOTE: Fill unloaded columns with nulls.
for (int i = 0; i < names.length; i++) {
if (loaderFactoryMap.containsKey(names[i])) {
vector.getChild(i).putNulls(0, loadSize);
}
}
return vector;
}

@Override
public void loadChild(final ColumnBinary columnBinary, final int loadSize) throws IOException {
if (loaderFactoryMap.containsKey(columnBinary.columnName)) {
loaderFactoryMap.get(columnBinary.columnName).create(columnBinary, loadSize);
loaderFactoryMap.remove(columnBinary.columnName).create(columnBinary, loadSize);
} else {
// FIXME:
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jp.co.yahoo.yosegi.binary.ColumnBinary;
import jp.co.yahoo.yosegi.binary.ColumnBinaryMakerConfig;
import jp.co.yahoo.yosegi.binary.FindColumnBinaryMaker;
import jp.co.yahoo.yosegi.binary.maker.IColumnBinaryMaker;
import jp.co.yahoo.yosegi.binary.maker.MaxLengthBasedArrayColumnBinaryMaker;
import jp.co.yahoo.yosegi.binary.maker.OptimizedNullArrayDumpStringColumnBinaryMaker;
import jp.co.yahoo.yosegi.binary.maker.OptimizedNullArrayStringColumnBinaryMaker;
import jp.co.yahoo.yosegi.inmemory.IArrayLoader;
import jp.co.yahoo.yosegi.message.parser.json.JacksonMessageReader;
import jp.co.yahoo.yosegi.spark.test.Utils;
Expand Down Expand Up @@ -370,4 +373,46 @@ void T_load_String_1(final String binaryMakerClassName) throws IOException {
// NOTE: assert
assertArray(values, vector, loadSize, elmDataType);
}

@ParameterizedTest
@MethodSource("D_arrayColumnBinaryMaker")
void T_load_String_checkDictionaryReset(final String binaryMakerClassName) throws IOException {
// NOTE: test data
// NOTE: expected
String resource = "SparkArrayLoaderTest/String_1.txt";
List<List<String>> values = toValues(resource);
int loadSize = values.size();

// NOTE: create ColumnBinary
IColumn column = toArrayColumn(resource);
IColumnBinaryMaker binaryMaker = FindColumnBinaryMaker.get(binaryMakerClassName);

ColumnBinaryMakerConfig config = new ColumnBinaryMakerConfig();
config.stringMakerClass = new OptimizedNullArrayStringColumnBinaryMaker();
ColumnBinary columnBinary = Utils.getColumnBinary(binaryMaker, column, config, null, null);

// NOTE: load
final DataType elmDataType = DataTypes.StringType;
final DataType dataType = DataTypes.createArrayType(elmDataType);
final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType);
IArrayLoader<WritableColumnVector> loader = new SparkArrayLoader(vector, loadSize);
binaryMaker.load(columnBinary, loader);

// NOTE: assert
assertArray(values, vector, loadSize, elmDataType);

// NOTE: Check if the vector is reset
String resource2 = "SparkArrayLoaderTest/String_2.txt";
List<List<String>> values2 = toValues(resource2);
int loadSize2 = values2.size();
IColumn column2 = toArrayColumn(resource2);
IColumnBinaryMaker binaryMaker2 = FindColumnBinaryMaker.get(binaryMakerClassName);
config.stringMakerClass = new OptimizedNullArrayDumpStringColumnBinaryMaker();
ColumnBinary columnBinary2 = Utils.getColumnBinary(binaryMaker2, column2, config, null, null);
vector.reset();
vector.reserve(loadSize2);
IArrayLoader<WritableColumnVector> loader2 = new SparkArrayLoader(vector, loadSize2);
binaryMaker.load(columnBinary2, loader2);
assertArray(values2, vector, loadSize2, elmDataType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,55 @@ void T_load_Struct_1(final String binaryMakerClassName) throws IOException {
// NOTE: assert
assertStruct(values, vector, loadSize, fields);
}

@ParameterizedTest
@MethodSource("D_spreadColumnBinaryMaker")
void T_load_Struct_checkDictionaryReset(final String binaryMakerClassName) throws IOException {
// NOTE: test data
// NOTE: expected
final String resource = "SparkStructLoaderTest/Struct_1.txt";
final List<Map<String, Object>> values = toValues(resource);
final int loadSize = values.size();

// NOTE: create ColumnBinary
final IColumn column = toSpreadColumn(resource);
final IColumnBinaryMaker binaryMaker = FindColumnBinaryMaker.get(binaryMakerClassName);
final ColumnBinary columnBinary = Utils.getColumnBinary(binaryMaker, column, null, null, null);

// NOTE: load
final List<StructField> fields =
Arrays.asList(
DataTypes.createStructField("bo", DataTypes.BooleanType, true),
DataTypes.createStructField("by", DataTypes.ByteType, true),
DataTypes.createStructField("bi", DataTypes.BinaryType, true),
DataTypes.createStructField("do", DataTypes.DoubleType, true),
DataTypes.createStructField("fl", DataTypes.FloatType, true),
DataTypes.createStructField("in", DataTypes.IntegerType, true),
DataTypes.createStructField("lo", DataTypes.LongType, true),
DataTypes.createStructField("sh", DataTypes.ShortType, true),
DataTypes.createStructField("st", DataTypes.StringType, true));
final DataType dataType = DataTypes.createStructType(fields);
final WritableColumnVector vector = new OnHeapColumnVector(loadSize, dataType);
final ISpreadLoader<WritableColumnVector> loader = new SparkStructLoader(vector, loadSize);
binaryMaker.load(columnBinary, loader);

// NOTE: assert
assertStruct(values, vector, loadSize, fields);

// NOTE: Check if the vector is reset
final String resource2 = "SparkStructLoaderTest/Struct_2.txt";
final List<Map<String, Object>> values2 = toValues(resource2);
final int loadSize2 = values2.size();

vector.reset();
vector.reserve(loadSize2);
// NOTE: create ColumnBinary
final IColumn column2 = toSpreadColumn(resource2);
final IColumnBinaryMaker binaryMaker2 = FindColumnBinaryMaker.get(binaryMakerClassName);
final ColumnBinary columnBinary2 = Utils.getColumnBinary(binaryMaker2, column2, null, null, null);
final ISpreadLoader<WritableColumnVector> loader2 = new SparkStructLoader(vector, loadSize2);
binaryMaker.load(columnBinary2, loader2);
loader2.build();
assertStruct(values2, vector, loadSize2, fields);
}
}
7 changes: 7 additions & 0 deletions src/test/resources/SparkArrayLoaderTest/String_2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
["b"]
[]
["cc",null]
["aa",null,null]
["dd"]
["a",null,null]
["bb",null,null]
7 changes: 7 additions & 0 deletions src/test/resources/SparkStructLoaderTest/Struct_2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"by":-128,"bi":"aaa","do":-1.7976931348623157e+308,"fl":-3.402823e+37,"in":-2147483648,"lo":-9223372036854775808,"sh":-32768,"st":"bbb"}
{"by":127,"bi":"aaa","do":1.7976931348623157e+308,"fl":3.402823e+38,"in":2147483647,"lo":9223372036854775807,"sh":32767,"st":"bbb"}
{}
{"by":null,"bi":null,"do":null,"fl":null,"in":null,"lo":null,"sh":null,"st":null}
{"by":-128,"bi":"aaa"}
{"by":-128,"bi":null}
{"by":null,"bi":"aaa"}

0 comments on commit 75c450c

Please sign in to comment.