From 87822417d566addef0f14e33cdbe0e94285481f3 Mon Sep 17 00:00:00 2001 From: Ruslan Yushchenko Date: Sun, 23 Feb 2020 08:34:02 +0100 Subject: [PATCH] #251 Fix glob support and divisibility check for large amount of files. --- .../parameters/CobolParametersParser.scala | 29 ++++--- .../integration/Test20InputFileNameSpec.scala | 4 +- .../regression/Test08InputFileName.scala | 75 +++++++++++++++++++ 3 files changed, 95 insertions(+), 13 deletions(-) create mode 100644 spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test08InputFileName.scala diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersParser.scala index b64fbdbbf..ecbf7711f 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersParser.scala @@ -436,7 +436,12 @@ object CobolParametersParser { */ private def validateSparkCobolOptions(params: Parameters): Unit = { val isRecordSequence = params.getOrElse(PARAM_IS_XCOM, "false").toBoolean || - params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false").toBoolean + params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false").toBoolean || + params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean || + params.contains(PARAM_FILE_START_OFFSET) || + params.contains(PARAM_FILE_END_OFFSET) || + params.contains(PARAM_RECORD_LENGTH) + val isPedantic = params.getOrElse(PARAM_PEDANTIC, "false").toBoolean val keysPassed = params.getMap.keys.toSeq val unusedKeys = keysPassed.flatMap(key => { @@ -446,15 +451,6 @@ object CobolParametersParser { Some(key) } }) - if (unusedKeys.nonEmpty) { - val unusedKeyStr = unusedKeys.mkString(",") - val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr." - if (isPedantic) { - throw new IllegalArgumentException(msg) - } else { - logger.error(msg) - } - } val segmentRedefineParents = getSegmentRedefineParents(params) if (segmentRedefineParents.nonEmpty) { val segmentIdLevels = parseSegmentLevels(params) @@ -464,7 +460,18 @@ object CobolParametersParser { } } if (!isRecordSequence && params.contains(PARAM_INPUT_FILE_COLUMN)) { - throw new IllegalArgumentException(s"Option '$PARAM_INPUT_FILE_COLUMN' is supported only when '$PARAM_IS_RECORD_SEQUENCE' = true.") + val recordSequenceCondition = s"one of this holds: '$PARAM_IS_RECORD_SEQUENCE' = true or '$PARAM_VARIABLE_SIZE_OCCURS' = true" + + s" or one of these options is set: '$PARAM_RECORD_LENGTH', '$PARAM_FILE_START_OFFSET', '$PARAM_FILE_END_OFFSET'" + throw new IllegalArgumentException(s"Option '$PARAM_INPUT_FILE_COLUMN' is supported only when $recordSequenceCondition") + } + if (unusedKeys.nonEmpty) { + val unusedKeyStr = unusedKeys.mkString(",") + val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr." + if (isPedantic) { + throw new IllegalArgumentException(msg) + } else { + logger.error(msg) + } } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala index 978fe7c22..6258adce5 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala @@ -16,8 +16,8 @@ package za.co.absa.cobrix.spark.cobol.source.integration -import org.apache.spark.sql.{DataFrame, DataFrameReader} import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{DataFrame, DataFrameReader} import org.scalatest.WordSpec import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase @@ -85,7 +85,7 @@ class Test20InputFileNameSpec extends WordSpec with SparkTestBase { .load(inputDataPath) } - assert(ex.getMessage.contains("'with_input_file_name_col' is supported only when 'is_record_sequence' = true")) + assert(ex.getMessage.contains("'with_input_file_name_col' is supported only when one of this holds")) } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test08InputFileName.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test08InputFileName.scala new file mode 100644 index 000000000..13be6674f --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test08InputFileName.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.regression + +import org.apache.spark.sql.functions.col +import org.scalatest.FunSuite +import org.slf4j.{Logger, LoggerFactory} +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture + +class Test08InputFileName extends FunSuite with SparkTestBase with BinaryFileFixture { + + private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass) + + private val copybook = + """ 01 R. + 03 A PIC X(1). + 03 B PIC X(2). + """ + + val binFileContents: Array[Byte] = Array[Byte]( + // File offset start + 0x00, 0x00, 0x00, 0x00, + // Records + 0xF0.toByte, 0xF1.toByte, 0xF2.toByte, + 0xF3.toByte, 0xF4.toByte, 0xF5.toByte, + 0xF6.toByte, 0xF7.toByte, 0xF8.toByte, + // File offset end + 0x00, 0x00, 0x00, 0x00, 0x00 + ) + + test("Test input data has the input file column and file offsets") { + withTempBinFile("bin_file", ".dat", binFileContents) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("with_input_file_name_col", "file") + .option("file_start_offset", "4") + .option("file_end_offset", "5") + .option("schema_retention_policy", "collapse_root") + .load(tmpFileName) + .filter(col("file").contains("bin_file")) + + assert(df.count == 3) + } + } + + test("Test Cobrix throws an exceptions when conditions for 'schema_retention_policy' are not met ") { + intercept[IllegalArgumentException] { + spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("with_input_file_name_col", "file") + .option("schema_retention_policy", "collapse_root") + .load("dummy.dat") + } + } + +}