Skip to content

Commit

Permalink
#672 Tidy up the code, remove a few instances of code duplications.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Apr 22, 2024
1 parent 1235883 commit b2a5434
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,20 @@

package za.co.absa.cobrix.cobol.reader

import java.nio.charset.{Charset, StandardCharsets}
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC}
import za.co.absa.cobrix.cobol.parser.policies.FillerNamingPolicy
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{AsciiText, CobrixAsciiText, FixedLength}
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{AsciiText, FixedLength}
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
import za.co.absa.cobrix.cobol.reader.iterator.FixedLenNestedRowIterator
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema

import scala.collection.immutable.HashMap
import scala.reflect.ClassTag

/**
* The Cobol data reader that produces nested structure schema
*
* @param copyBookContents A copybook contents.
* @param startOffset Specifies the number of bytes at the beginning of each record that can be ignored.
* @param endOffset Specifies the number of bytes at the end of each record that can be ignored.
* @param schemaRetentionPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
*/
class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],
isEbcdic: Boolean,
ebcdicCodePage: CodePage,
floatingPointFormat: FloatingPointFormat,
startOffset: Int,
endOffset: Int,
schemaRetentionPolicy: SchemaRetentionPolicy,
stringTrimmingPolicy: StringTrimmingPolicy,
dropGroupFillers: Boolean,
dropValueFillers: Boolean,
fillerNamingPolicy: FillerNamingPolicy,
nonTerminals: Seq[String],
occursMappings: Map[String, Map[String, Int]],
readerProperties: ReaderParameters,
handler: RecordHandler[T]) extends FixedLenReader with Serializable {

Expand All @@ -63,22 +39,22 @@ class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],

override def getRecordSize: Int = {
val recordInternalsSize = readerProperties.recordLength.getOrElse(cobolSchema.getRecordSize)
recordInternalsSize + startOffset + endOffset
recordInternalsSize + readerProperties.startOffset + readerProperties.endOffset
}

@throws(classOf[Exception])
override def getRecordIterator(binaryData: Array[Byte]): Iterator[Seq[Any]] = {
checkBinaryDataValidity(binaryData)
val singleRecordIterator = readerProperties.recordFormat == AsciiText || readerProperties.recordFormat == FixedLength
new FixedLenNestedRowIterator(binaryData, cobolSchema, readerProperties, schemaRetentionPolicy, startOffset, endOffset, singleRecordIterator, handler)
new FixedLenNestedRowIterator(binaryData, cobolSchema, readerProperties, readerProperties.startOffset, readerProperties.endOffset, singleRecordIterator, handler)
}

def checkBinaryDataValidity(binaryData: Array[Byte]): Unit = {
if (startOffset < 0) {
throw new IllegalArgumentException(s"Invalid record start offset = $startOffset. A record start offset cannot be negative.")
if (readerProperties.startOffset < 0) {
throw new IllegalArgumentException(s"Invalid record start offset = ${readerProperties.startOffset}. A record start offset cannot be negative.")
}
if (endOffset < 0) {
throw new IllegalArgumentException(s"Invalid record end offset = $endOffset. A record end offset cannot be negative.")
if (readerProperties.endOffset < 0) {
throw new IllegalArgumentException(s"Invalid record end offset = ${readerProperties.endOffset}. A record end offset cannot be negative.")
}
readerProperties.recordLength match {
case Some(len) =>
Expand All @@ -96,7 +72,7 @@ class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],
}

private def getExpectedLength: Int = {
cobolSchema.getRecordSize + startOffset + endOffset
cobolSchema.getRecordSize + readerProperties.startOffset + readerProperties.endOffset
}

private def loadCopyBook(copyBookContents: Seq[String]): CobolSchema = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class FixedLenNestedRowIterator[T: ClassTag](
val binaryData: Array[Byte],
val cobolSchema: CobolSchema,
readerProperties: ReaderParameters,
policy: SchemaRetentionPolicy,
startOffset: Int,
endOffset: Int,
singleRecordOnly: Boolean,
Expand Down Expand Up @@ -89,7 +88,7 @@ class FixedLenNestedRowIterator[T: ClassTag](
cobolSchema.getCobolSchema.ast,
binaryData,
offset,
policy,
readerProperties.schemaPolicy,
readerProperties.variableSizeOccurs,
generateRecordBytes = readerProperties.generateRecordBytes,
activeSegmentRedefine = activeSegmentRedefine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class CobolParameters(
isEbcdic: Boolean,
ebcdicCodePage: String,
ebcdicCodePageClass: Option[String],
asciiCharset: String,
asciiCharset: Option[String],
fieldCodePage: Map[String, String],
isUtf16BigEndian: Boolean,
floatingPointFormat: FloatingPointFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ case class ReaderParameters(
isText: Boolean = false,
ebcdicCodePage: String = "common",
ebcdicCodePageClass: Option[String] = None,
asciiCharset: String = "",
asciiCharset: Option[String] = None,
fieldCodePage: Map[String, String] = Map.empty[String, String],
isUtf16BigEndian: Boolean = true,
floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM,
Expand All @@ -103,7 +103,7 @@ case class ReaderParameters(
fileEndOffset: Int = 0,
generateRecordId: Boolean = false,
generateRecordBytes: Boolean = false,
schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal,
schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.CollapseRoot,
stringTrimmingPolicy: StringTrimmingPolicy = StringTrimmingPolicy.TrimBoth,
allowPartialRecords: Boolean = false,
multisegment: Option[MultisegmentParameters] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ object CobolSchema {
val segmentRedefines = readerParameters.multisegment.map(r => r.segmentIdRedefineMap.values.toList.distinct).getOrElse(Nil)
val fieldParentMap = readerParameters.multisegment.map(r => r.fieldParentMap).getOrElse(HashMap[String, String]())
val codePage = getCodePage(readerParameters.ebcdicCodePage, readerParameters.ebcdicCodePageClass)
val asciiCharset = if (readerParameters.asciiCharset.isEmpty) StandardCharsets.UTF_8 else Charset.forName(readerParameters.asciiCharset)
val asciiCharset = readerParameters.asciiCharset match {
case Some(asciiCharset) => Charset.forName(asciiCharset)
case None => StandardCharsets.UTF_8
}

val schema = if (copyBookContents.size == 1)
CopybookParser.parseTree(encoding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class FixedLenNestedReaderSpec extends AnyWordSpec {
}

"return an iterator for single ASCII record" in {
val reader = getUseCase(Seq(copybookContents), recordFormat = RecordFormat.AsciiText, asciiCharset = "us-ascii")
val reader = getUseCase(Seq(copybookContents), recordFormat = RecordFormat.AsciiText, asciiCharset = Some("us-ascii"))

val it = reader.getRecordIterator(fixedLengthDataExample)

Expand Down Expand Up @@ -206,28 +206,19 @@ class FixedLenNestedReaderSpec extends AnyWordSpec {
startOffset: Int = 0,
endOffset: Int = 0,
recordLength: Option[Int] = None,
asciiCharset: String = ""
asciiCharset: Option[String] = None
): FixedLenNestedReader[scala.Array[Any]] = {
val readerProperties = za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters(
isEbcdic = isEbcdic,
startOffset = startOffset,
endOffset = endOffset,
recordFormat = recordFormat,
recordLength = recordLength,
asciiCharset = asciiCharset
)

val reader = new FixedLenNestedReader[scala.Array[Any]](
copybooks,
isEbcdic = isEbcdic,
ebcdicCodePage = new CodePageCommon,
floatingPointFormat = FloatingPointFormat.IEEE754,
startOffset = startOffset,
endOffset = endOffset,
schemaRetentionPolicy = SchemaRetentionPolicy.CollapseRoot,
stringTrimmingPolicy = StringTrimmingPolicy.TrimBoth,
dropGroupFillers = false,
dropValueFillers = false,
fillerNamingPolicy = FillerNamingPolicy.SequenceNumbers,
nonTerminals = Nil,
occursMappings = Map.empty,
readerProperties = readerProperties,
handler = new SimpleRecordHandler)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ object CobolParametersParser extends Logging {
val stringTrimmingPolicy = getStringTrimmingPolicy(params)
val ebcdicCodePageName = params.getOrElse(PARAM_EBCDIC_CODE_PAGE, "common")
val ebcdicCodePageClass = params.get(PARAM_EBCDIC_CODE_PAGE_CLASS)
val asciiCharset = params.getOrElse(PARAM_ASCII_CHARSET, "")
val asciiCharset = params.get(PARAM_ASCII_CHARSET)

val recordFormatDefined = getRecordFormat(params)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ package za.co.absa.cobrix.spark.cobol.reader
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.StructType
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.policies.FillerNamingPolicy
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.{FixedLenNestedReader => ReaderFixedLenNestedReader}
import za.co.absa.cobrix.spark.cobol.schema.CobolSchema

Expand All @@ -33,31 +28,10 @@ import za.co.absa.cobrix.spark.cobol.schema.CobolSchema
* The Cobol data reader that produces nested structure schema
*
* @param copyBookContents A copybook contents.
* @param startOffset Specifies the number of bytes at the beginning of each record that can be ignored.
* @param endOffset Specifies the number of bytes at the end of each record that can be ignored.
* @param schemaRetentionPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
* @param readerProperties Properties reflecting parsing copybooks and decoding data.
*/
final class FixedLenNestedReader(copyBookContents: Seq[String],
isEbcdic: Boolean = true,
ebcdicCodePage: CodePage,
floatingPointFormat: FloatingPointFormat,
startOffset: Int = 0,
endOffset: Int = 0,
schemaRetentionPolicy: SchemaRetentionPolicy,
stringTrimmingPolicy: StringTrimmingPolicy,
dropGroupFillers: Boolean,
dropValueFillers: Boolean,
fillerNamingPolicy: FillerNamingPolicy,
nonTerminals: Seq[String],
occursMappings: Map[String, Map[String, Int]],
readerProperties: ReaderParameters
)
extends ReaderFixedLenNestedReader[GenericRow](
copyBookContents, isEbcdic, ebcdicCodePage, floatingPointFormat,
startOffset, endOffset, schemaRetentionPolicy, stringTrimmingPolicy,
dropGroupFillers, dropValueFillers, fillerNamingPolicy, nonTerminals, occursMappings, readerProperties,
new RowHandler()
) with FixedLenReader with Serializable {
final class FixedLenNestedReader(copyBookContents: Seq[String], readerProperties: ReaderParameters)
extends ReaderFixedLenNestedReader[GenericRow](copyBookContents, readerProperties, new RowHandler()) with FixedLenReader with Serializable {

class RowIterator(iterator: Iterator[Seq[Any]]) extends Iterator[Row] {
override def hasNext: Boolean = iterator.hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ package za.co.absa.cobrix.spark.cobol.reader
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.StructType
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.policies.FillerNamingPolicy
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.{FixedLenNestedReader => ReaderFixedLenNestedReader}
import za.co.absa.cobrix.spark.cobol.schema.CobolSchema

Expand All @@ -33,32 +28,10 @@ import za.co.absa.cobrix.spark.cobol.schema.CobolSchema
* The Cobol data reader from text files that produces nested structure schema
*
* @param copyBookContents A copybook contents.
* @param startOffset Specifies the number of bytes at the beginning of each record that can be ignored.
* @param endOffset Specifies the number of bytes at the end of each record that can be ignored.
* @param schemaRetentionPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
* @param readerProperties Properties reflecting parsing copybooks and decoding data.
*/
final class FixedLenTextReader(copyBookContents: Seq[String],
isEbcdic: Boolean = true,
ebcdicCodePage: CodePage,
val asciiCharset: Option[String],
floatingPointFormat: FloatingPointFormat,
startOffset: Int = 0,
endOffset: Int = 0,
schemaRetentionPolicy: SchemaRetentionPolicy,
stringTrimmingPolicy: StringTrimmingPolicy,
dropGroupFillers: Boolean,
dropValueFillers: Boolean,
fillerNamingPolicy: FillerNamingPolicy,
nonTerminals: Seq[String],
occursMappings: Map[String, Map[String, Int]],
readerProperties: ReaderParameters
)
extends ReaderFixedLenNestedReader[GenericRow](
copyBookContents, isEbcdic, ebcdicCodePage, floatingPointFormat,
startOffset, endOffset, schemaRetentionPolicy, stringTrimmingPolicy,
dropGroupFillers, dropValueFillers, fillerNamingPolicy, nonTerminals, occursMappings, readerProperties,
new RowHandler()
) with FixedLenReader with Serializable {
final class FixedLenTextReader(copyBookContents: Seq[String], readerProperties: ReaderParameters)
extends ReaderFixedLenNestedReader[GenericRow](copyBookContents, readerProperties, new RowHandler()) with FixedLenReader with Serializable {

class RowIterator(iterator: Iterator[Seq[Any]]) extends Iterator[Row] {
override def hasNext: Boolean = iterator.hasNext
Expand All @@ -80,11 +53,11 @@ final class FixedLenTextReader(copyBookContents: Seq[String],
}

override def checkBinaryDataValidity(binaryData: Array[Byte]): Unit = {
if (startOffset < 0) {
throw new IllegalArgumentException(s"Invalid record start offset = $startOffset. A record start offset cannot be negative.")
if (readerProperties.startOffset < 0) {
throw new IllegalArgumentException(s"Invalid record start offset = ${readerProperties.startOffset}. A record start offset cannot be negative.")
}
if (endOffset < 0) {
throw new IllegalArgumentException(s"Invalid record end offset = $endOffset. A record end offset cannot be negative.")
if (readerProperties.endOffset < 0) {
throw new IllegalArgumentException(s"Invalid record end offset = ${readerProperties.endOffset}. A record end offset cannot be negative.")
}
}
}
Loading

0 comments on commit b2a5434

Please sign in to comment.