Skip to content

Commit

Permalink
Fix initialization for partitionSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
jufukuka committed Jun 14, 2024
1 parent dab5ea8 commit fd1c03d
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 11 deletions.
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@
<spotbugs-maven-plugin.version>3.1.8</spotbugs-maven-plugin.version>
<spotbugs.version>3.1.9</spotbugs.version>
<yosegi.base>2.0</yosegi.base>
<yosegi.version>${yosegi.base}.2</yosegi.version>
<yosegi.version>${yosegi.base}.5</yosegi.version>
<spark.base>3.3</spark.base>
<spark.version>${spark.base}.0</spark.version>
<scala.base>2.12</scala.base>
<scala.version>${scala.base}.16</scala.version>
<scala.version>${scala.base}.19</scala.version>
</properties>

<distributionManagement>
Expand Down Expand Up @@ -99,17 +99,17 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.3</version>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.base}</artifactId>
<version>2.13.3</version>
<version>2.17.1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SparkColumnarBatchConverter(
@Override
public ColumnarBatch convert(final List<ColumnBinary> raw, final int loadSize) throws IOException {
// NOTE: initialize
for (int i = 0; i < childColumns.length; i++) {
for (int i = 0; i < schema.length(); i++) {
// FIXME: how to initialize vector with dictionary.
childColumns[i].reset();
childColumns[i].reserve(loadSize);
Expand All @@ -72,7 +72,7 @@ public ColumnarBatch convert(final List<ColumnBinary> raw, final int loadSize) t
SparkLoaderFactoryUtil.createLoaderFactory(childColumns[index]).create(columnBinary, loadSize);
}
// NOTE: null columns
for (int i = 0; i < childColumns.length; i++) {
for (int i = 0; i < schema.length(); i++) {
if (!isSet[i]) {
childColumns[i].putNulls(0, loadSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public ColumnarBatch next() throws IOException {
public void close() throws Exception {
reader.close();
for (int i = 0; i < childColumns.length; i++) {
if (childColumns[i] == null) {
continue;
}
childColumns[i].close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

public class ExpandFlatten {
public class ExpandFlattenTest {
private static SparkSession spark;
private static SQLContext sqlContext;
private static final String appName = "ExpandFlattenTest";
Expand All @@ -60,7 +60,11 @@ public String getResourcePath(final String resource) {
}

public String getTmpPath() {
return System.getProperty("java.io.tmpdir") + appName + ".yosegi";
String tmpdir = System.getProperty("java.io.tmpdir");
if (tmpdir.endsWith("/")) {
tmpdir = tmpdir.substring(0, tmpdir.length() - 1);
}
return tmpdir + "/" + appName + ".yosegi";
}

public Dataset<Row> loadJsonFile(final String resource, final StructType schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class Load {
public class LoadTest {
private static SparkSession spark;
private static SQLContext sqlContext;
private static String appName = "LoadTest";
Expand All @@ -63,7 +63,11 @@ public String getResourcePath(final String resource) {
}

public String getTmpPath() {
return System.getProperty("java.io.tmpdir") + appName + ".yosegi";
String tmpdir = System.getProperty("java.io.tmpdir");
if (tmpdir.endsWith("/")) {
tmpdir = tmpdir.substring(0, tmpdir.length() - 1);
}
return tmpdir + "/" + appName + ".yosegi";
}

public Dataset<Row> loadJsonFile(final String resource, final StructType schema) {
Expand Down
189 changes: 189 additions & 0 deletions src/test/java/jp/co/yahoo/yosegi/spark/blackbox/PartitionLoadTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package jp.co.yahoo.yosegi.spark.blackbox;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static org.apache.spark.sql.functions.col;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class PartitionLoadTest {
private static SparkSession spark;
private static SQLContext sqlContext;
private static String appName = "PartitionLoadTest";

public boolean deleteDirectory(final File directory) {
final File[] allContents = directory.listFiles();
if (allContents != null) {
for (final File file : allContents) {
deleteDirectory(file);
}
}
return directory.delete();
}

public String getResourcePath(final String resource) {
return Thread.currentThread().getContextClassLoader().getResource(resource).getPath();
}

public String getTmpPath() {
String tmpdir = System.getProperty("java.io.tmpdir");
if (tmpdir.endsWith("/")) {
tmpdir = tmpdir.substring(0, tmpdir.length() - 1);
}
return tmpdir + "/" + appName + ".yosegi";
}

public Dataset<Row> loadJsonFile(final String resource, final StructType schema) {
final String resourcePath = getResourcePath(resource);
if (schema == null) {
return sqlContext.read().json(resourcePath).orderBy(col("id").asc());
}
return sqlContext.read().schema(schema).json(resourcePath).orderBy(col("id").asc());
}

public void createYosegiFile(final String resource, final String... partitions) {
final Dataset<Row> df = loadJsonFile(resource, null);
final String tmpPath = getTmpPath();
df.write()
.mode(SaveMode.Overwrite)
.partitionBy(partitions)
.format("jp.co.yahoo.yosegi.spark.YosegiFileFormat")
.save(tmpPath);
}

public Dataset<Row> loadYosegiFile(final StructType schema) {
final String tmpPath = getTmpPath();
if (schema == null) {
return sqlContext
.read()
.format("jp.co.yahoo.yosegi.spark.YosegiFileFormat")
.load(tmpPath)
.orderBy(col("id").asc());
}
return sqlContext
.read()
.format("jp.co.yahoo.yosegi.spark.YosegiFileFormat")
.schema(schema)
.load(tmpPath)
.orderBy(col("id").asc());
}

@BeforeAll
static void initAll() {
spark = SparkSession.builder().appName(appName).master("local[*]").getOrCreate();
sqlContext = spark.sqlContext();
}

@AfterAll
static void tearDownAll() {
spark.close();
}

@AfterEach
void tearDown() {
deleteDirectory(new File(getTmpPath()));
}

/*
* FIXME: The rows cannot be loaded if rows in a partition have only null values.
* * {"id":1}
*/
@Test
void T_load_Partition_Primitive_1() throws IOException {
// NOTE: create yosegi file
final String resource = "blackbox/Partition_Primitive_1.txt";
createYosegiFile(resource, "id");

// NOTE: schema
final int precision = DecimalType.MAX_PRECISION();
final int scale = DecimalType.MINIMUM_ADJUSTED_SCALE();
final List<StructField> fields =
Arrays.asList(
DataTypes.createStructField("id", DataTypes.IntegerType, true),
DataTypes.createStructField("bo", DataTypes.BooleanType, true),
DataTypes.createStructField("by", DataTypes.ByteType, true),
DataTypes.createStructField("de", DataTypes.createDecimalType(precision, scale), true),
DataTypes.createStructField("do", DataTypes.DoubleType, true),
DataTypes.createStructField("fl", DataTypes.FloatType, true),
DataTypes.createStructField("in", DataTypes.IntegerType, true),
DataTypes.createStructField("lo", DataTypes.LongType, true),
DataTypes.createStructField("sh", DataTypes.ShortType, true),
DataTypes.createStructField("st", DataTypes.StringType, true));
final StructType structType = DataTypes.createStructType(fields);
// NOTE: load
final Dataset<Row> dfj = loadJsonFile(resource, structType);
final Dataset<Row> dfy = loadYosegiFile(structType);

// NOTE: assert
final List<Row> ldfj = dfj.collectAsList();
final List<Row> ldfy = dfy.collectAsList();
for (int i = 0; i < ldfj.size(); i++) {
for (final StructField field : fields) {
final String name = field.name();
final DataType dataType = field.dataType();
assertEquals((Object) ldfj.get(i).getAs(name), (Object) ldfy.get(i).getAs(name));
}
}
}

@Test
void T_load_Partition_Primitive_2() throws IOException {
// NOTE: create yosegi file
final String resource = "blackbox/Partition_Primitive_2.txt";
createYosegiFile(resource, "p1", "p2");

// NOTE: schema
final List<StructField> fields =
Arrays.asList(
DataTypes.createStructField("id", DataTypes.IntegerType, true),
DataTypes.createStructField("p1", DataTypes.IntegerType, true),
DataTypes.createStructField("p2", DataTypes.StringType, true),
DataTypes.createStructField("v", DataTypes.StringType, true));
final StructType structType = DataTypes.createStructType(fields);
// NOTE: load
final Dataset<Row> dfj = loadJsonFile(resource, structType);
final Dataset<Row> dfy = loadYosegiFile(structType);

// NOTE: assert
final List<Row> ldfj = dfj.collectAsList();
final List<Row> ldfy = dfy.collectAsList();
for (int i = 0; i < ldfj.size(); i++) {
for (final StructField field : fields) {
final String name = field.name();
final DataType dataType = field.dataType();
assertEquals((Object) ldfj.get(i).getAs(name), (Object) ldfy.get(i).getAs(name));
}
}
}
}
4 changes: 4 additions & 0 deletions src/test/resources/blackbox/Partition_Primitive_1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"id":0, "bo":true, "by":127, "de":123.45678901, "do":1.7976931348623157e+308, "fl":3.402823e+37, "in":2147483647, "lo":9223372036854775807, "sh":32767, "st":"value1"}
{"id":1}
{"id":1, "bo":true}
{"id":2, "bo":false, "by":-128, "de":234.56789012, "do":-1.7976931348623157e+308, "fl":-3.402823e+37, "in":-2147483648, "lo":-9223372036854775808, "sh":-32768, "st":"value2"}
5 changes: 5 additions & 0 deletions src/test/resources/blackbox/Partition_Primitive_2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"id":0, "p1":0, "p2":"a", "v":"v0a1"}
{"id":1, "p1":1, "p2":"b", "v":"v1b1"}
{"id":2, "p1":1, "p2":"b", "v":"v1b2"}
{"id":3, "p1":2, "p2":"a", "v":"v2a1"}
{"id":4, "p1":2, "p2":"b", "v":"v2b1"}

0 comments on commit fd1c03d

Please sign in to comment.