From 87822417d566addef0f14e33cdbe0e94285481f3 Mon Sep 17 00:00:00 2001
From: Ruslan Yushchenko
Date: Sun, 23 Feb 2020 08:34:02 +0100
Subject: [PATCH] #251 Fix glob support and divisibility check for large amount
of files.
---
.../parameters/CobolParametersParser.scala | 29 ++++---
.../integration/Test20InputFileNameSpec.scala | 4 +-
.../regression/Test08InputFileName.scala | 75 +++++++++++++++++++
3 files changed, 95 insertions(+), 13 deletions(-)
create mode 100644 spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test08InputFileName.scala
diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersParser.scala
index b64fbdbbf..ecbf7711f 100644
--- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersParser.scala
+++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersParser.scala
@@ -436,7 +436,12 @@ object CobolParametersParser {
*/
private def validateSparkCobolOptions(params: Parameters): Unit = {
val isRecordSequence = params.getOrElse(PARAM_IS_XCOM, "false").toBoolean ||
- params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false").toBoolean
+ params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false").toBoolean ||
+ params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean ||
+ params.contains(PARAM_FILE_START_OFFSET) ||
+ params.contains(PARAM_FILE_END_OFFSET) ||
+ params.contains(PARAM_RECORD_LENGTH)
+
val isPedantic = params.getOrElse(PARAM_PEDANTIC, "false").toBoolean
val keysPassed = params.getMap.keys.toSeq
val unusedKeys = keysPassed.flatMap(key => {
@@ -446,15 +451,6 @@ object CobolParametersParser {
Some(key)
}
})
- if (unusedKeys.nonEmpty) {
- val unusedKeyStr = unusedKeys.mkString(",")
- val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."
- if (isPedantic) {
- throw new IllegalArgumentException(msg)
- } else {
- logger.error(msg)
- }
- }
val segmentRedefineParents = getSegmentRedefineParents(params)
if (segmentRedefineParents.nonEmpty) {
val segmentIdLevels = parseSegmentLevels(params)
@@ -464,7 +460,18 @@ object CobolParametersParser {
}
}
if (!isRecordSequence && params.contains(PARAM_INPUT_FILE_COLUMN)) {
- throw new IllegalArgumentException(s"Option '$PARAM_INPUT_FILE_COLUMN' is supported only when '$PARAM_IS_RECORD_SEQUENCE' = true.")
+ val recordSequenceCondition = s"one of this holds: '$PARAM_IS_RECORD_SEQUENCE' = true or '$PARAM_VARIABLE_SIZE_OCCURS' = true" +
+ s" or one of these options is set: '$PARAM_RECORD_LENGTH', '$PARAM_FILE_START_OFFSET', '$PARAM_FILE_END_OFFSET'"
+ throw new IllegalArgumentException(s"Option '$PARAM_INPUT_FILE_COLUMN' is supported only when $recordSequenceCondition")
+ }
+ if (unusedKeys.nonEmpty) {
+ val unusedKeyStr = unusedKeys.mkString(",")
+ val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."
+ if (isPedantic) {
+ throw new IllegalArgumentException(msg)
+ } else {
+ logger.error(msg)
+ }
}
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala
index 978fe7c22..6258adce5 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala
@@ -16,8 +16,8 @@
package za.co.absa.cobrix.spark.cobol.source.integration
-import org.apache.spark.sql.{DataFrame, DataFrameReader}
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.{DataFrame, DataFrameReader}
import org.scalatest.WordSpec
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
@@ -85,7 +85,7 @@ class Test20InputFileNameSpec extends WordSpec with SparkTestBase {
.load(inputDataPath)
}
- assert(ex.getMessage.contains("'with_input_file_name_col' is supported only when 'is_record_sequence' = true"))
+ assert(ex.getMessage.contains("'with_input_file_name_col' is supported only when one of this holds"))
}
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test08InputFileName.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test08InputFileName.scala
new file mode 100644
index 000000000..13be6674f
--- /dev/null
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test08InputFileName.scala
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.cobrix.spark.cobol.source.regression
+
+import org.apache.spark.sql.functions.col
+import org.scalatest.FunSuite
+import org.slf4j.{Logger, LoggerFactory}
+import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
+import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
+
+class Test08InputFileName extends FunSuite with SparkTestBase with BinaryFileFixture {
+
+ private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+ private val copybook =
+ """ 01 R.
+ 03 A PIC X(1).
+ 03 B PIC X(2).
+ """
+
+ val binFileContents: Array[Byte] = Array[Byte](
+ // File offset start
+ 0x00, 0x00, 0x00, 0x00,
+ // Records
+ 0xF0.toByte, 0xF1.toByte, 0xF2.toByte,
+ 0xF3.toByte, 0xF4.toByte, 0xF5.toByte,
+ 0xF6.toByte, 0xF7.toByte, 0xF8.toByte,
+ // File offset end
+ 0x00, 0x00, 0x00, 0x00, 0x00
+ )
+
+ test("Test input data has the input file column and file offsets") {
+ withTempBinFile("bin_file", ".dat", binFileContents) { tmpFileName =>
+ val df = spark
+ .read
+ .format("cobol")
+ .option("copybook_contents", copybook)
+ .option("with_input_file_name_col", "file")
+ .option("file_start_offset", "4")
+ .option("file_end_offset", "5")
+ .option("schema_retention_policy", "collapse_root")
+ .load(tmpFileName)
+ .filter(col("file").contains("bin_file"))
+
+ assert(df.count == 3)
+ }
+ }
+
+ test("Test Cobrix throws an exceptions when conditions for 'schema_retention_policy' are not met ") {
+ intercept[IllegalArgumentException] {
+ spark
+ .read
+ .format("cobol")
+ .option("copybook_contents", copybook)
+ .option("with_input_file_name_col", "file")
+ .option("schema_retention_policy", "collapse_root")
+ .load("dummy.dat")
+ }
+ }
+
+}