Skip to content

Commit

Permalink
#672 Implement a method that returns the Spark schema for copybooks a…
Browse files Browse the repository at this point in the history
…nd use it across the code base.
  • Loading branch information
yruslan committed Apr 19, 2024
1 parent ced5908 commit 4c3d79f
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,57 +100,6 @@ class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],
}

private def loadCopyBook(copyBookContents: Seq[String]): CobolSchema = {
val encoding = if (isEbcdic) EBCDIC else ASCII
val segmentRedefines = readerProperties.multisegment.map(r => r.segmentIdRedefineMap.values.toList.distinct).getOrElse(Nil)
val fieldParentMap = readerProperties.multisegment.map(r => r.fieldParentMap).getOrElse(HashMap[String, String]())
val asciiCharset = if (readerProperties.asciiCharset.isEmpty) StandardCharsets.UTF_8 else Charset.forName(readerProperties.asciiCharset)

val schema = if (copyBookContents.size == 1)
CopybookParser.parseTree(encoding,
copyBookContents.head,
dropGroupFillers,
dropValueFillers,
fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
stringTrimmingPolicy,
readerProperties.commentPolicy,
readerProperties.strictSignOverpunch,
readerProperties.improvedNullDetection,
readerProperties.decodeBinaryAsHex,
ebcdicCodePage,
asciiCharset,
readerProperties.isUtf16BigEndian,
floatingPointFormat,
nonTerminals,
occursMappings,
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
else
Copybook.merge(
copyBookContents.map(
CopybookParser.parseTree(encoding,
_,
dropGroupFillers,
dropValueFillers,
fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
stringTrimmingPolicy,
readerProperties.commentPolicy,
readerProperties.strictSignOverpunch,
readerProperties.improvedNullDetection,
readerProperties.decodeBinaryAsHex,
ebcdicCodePage,
asciiCharset,
readerProperties.isUtf16BigEndian,
floatingPointFormat,
nonTerminals,
occursMappings,
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
)
)
new CobolSchema(schema, schemaRetentionPolicy, "", false, readerProperties.generateRecordBytes, metadataPolicy = readerProperties.metadataPolicy)
CobolSchema.fromReaderParameters(copyBookContents, readerProperties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,59 +199,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
}

private def loadCopyBook(copyBookContents: Seq[String]): CobolSchema = {
val encoding = if (readerProperties.isEbcdic) EBCDIC else ASCII
val segmentRedefines = readerProperties.multisegment.map(r => r.segmentIdRedefineMap.values.toList.distinct).getOrElse(Nil)
val fieldParentMap = readerProperties.multisegment.map(r => r.fieldParentMap).getOrElse(HashMap[String, String]())
val codePage = getCodePage(readerProperties.ebcdicCodePage, readerProperties.ebcdicCodePageClass)
val asciiCharset = if (readerProperties.asciiCharset.isEmpty) StandardCharsets.US_ASCII else Charset.forName(readerProperties.asciiCharset)

val schema = if (copyBookContents.size == 1)
CopybookParser.parseTree(encoding,
copyBookContents.head,
readerProperties.dropGroupFillers,
readerProperties.dropValueFillers,
readerProperties.fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
readerProperties.stringTrimmingPolicy,
readerProperties.commentPolicy,
readerProperties.strictSignOverpunch,
readerProperties.improvedNullDetection,
readerProperties.decodeBinaryAsHex,
codePage,
asciiCharset,
readerProperties.isUtf16BigEndian,
readerProperties.floatingPointFormat,
readerProperties.nonTerminals,
readerProperties.occursMappings,
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
else
Copybook.merge(copyBookContents.map(cpb =>
CopybookParser.parseTree(encoding,
cpb,
readerProperties.dropGroupFillers,
readerProperties.dropValueFillers,
readerProperties.fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
readerProperties.stringTrimmingPolicy,
readerProperties.commentPolicy,
readerProperties.strictSignOverpunch,
readerProperties.improvedNullDetection,
readerProperties.decodeBinaryAsHex,
codePage,
asciiCharset,
readerProperties.isUtf16BigEndian,
readerProperties.floatingPointFormat,
nonTerminals = readerProperties.nonTerminals,
readerProperties.occursMappings,
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
))
val segIdFieldCount = readerProperties.multisegment.map(p => p.segmentLevelIds.size).getOrElse(0)
val segmentIdPrefix = readerProperties.multisegment.map(p => p.segmentIdPrefix).getOrElse("")
new CobolSchema(schema, readerProperties.schemaPolicy, readerProperties.inputFileNameColumn, readerProperties.generateRecordId, readerProperties.generateRecordBytes, segIdFieldCount, segmentIdPrefix)
CobolSchema.fromReaderParameters(copyBookContents, readerProperties)
}

private def checkInputArgumentsValidity(): Unit = {
Expand All @@ -271,13 +219,6 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
}
}

private def getCodePage(codePageName: String, codePageClass: Option[String]): CodePage = {
codePageClass match {
case Some(c) => CodePage.getCodePageByClass(c)
case None => CodePage.getCodePageByName(codePageName)
}
}

private def getRecordHeaderParser: RecordHeaderParser = {
val adjustment1 = if (readerProperties.isRdwPartRecLength) -4 else 0
val adjustment2 = readerProperties.rdwAdjustment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@

package za.co.absa.cobrix.cobol.reader.schema

import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage

import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC}
import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy

import java.nio.charset.{Charset, StandardCharsets}
import scala.collection.immutable.HashMap


/**
* This class provides a view on a COBOL schema from the perspective of Spark. When provided with a parsed copybook the class
Expand Down Expand Up @@ -59,3 +66,72 @@ class CobolSchema(val copybook: Copybook,
timestampFormat.format(now)
}
}

object CobolSchema {
def fromReaderParameters(copyBookContents: Seq[String], readerParameters: ReaderParameters): CobolSchema = {
if (copyBookContents.isEmpty) {
throw new IllegalArgumentException("At least one copybook must be specified.")
}

val encoding = if (readerParameters.isEbcdic) EBCDIC else ASCII
val segmentRedefines = readerParameters.multisegment.map(r => r.segmentIdRedefineMap.values.toList.distinct).getOrElse(Nil)
val fieldParentMap = readerParameters.multisegment.map(r => r.fieldParentMap).getOrElse(HashMap[String, String]())
val codePage = getCodePage(readerParameters.ebcdicCodePage, readerParameters.ebcdicCodePageClass)
val asciiCharset = if (readerParameters.asciiCharset.isEmpty) StandardCharsets.UTF_8 else Charset.forName(readerParameters.asciiCharset)

val schema = if (copyBookContents.size == 1)
CopybookParser.parseTree(encoding,
copyBookContents.head,
readerParameters.dropGroupFillers,
readerParameters.dropValueFillers,
readerParameters.fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
readerParameters.stringTrimmingPolicy,
readerParameters.commentPolicy,
readerParameters.strictSignOverpunch,
readerParameters.improvedNullDetection,
readerParameters.decodeBinaryAsHex,
codePage,
asciiCharset,
readerParameters.isUtf16BigEndian,
readerParameters.floatingPointFormat,
readerParameters.nonTerminals,
readerParameters.occursMappings,
readerParameters.debugFieldsPolicy,
readerParameters.fieldCodePage)
else
Copybook.merge(copyBookContents.map(cpb =>
CopybookParser.parseTree(encoding,
cpb,
readerParameters.dropGroupFillers,
readerParameters.dropValueFillers,
readerParameters.fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
readerParameters.stringTrimmingPolicy,
readerParameters.commentPolicy,
readerParameters.strictSignOverpunch,
readerParameters.improvedNullDetection,
readerParameters.decodeBinaryAsHex,
codePage,
asciiCharset,
readerParameters.isUtf16BigEndian,
readerParameters.floatingPointFormat,
nonTerminals = readerParameters.nonTerminals,
readerParameters.occursMappings,
readerParameters.debugFieldsPolicy,
readerParameters.fieldCodePage)
))
val segIdFieldCount = readerParameters.multisegment.map(p => p.segmentLevelIds.size).getOrElse(0)
val segmentIdPrefix = readerParameters.multisegment.map(p => p.segmentIdPrefix).getOrElse("")
new CobolSchema(schema, readerParameters.schemaPolicy, readerParameters.inputFileNameColumn, readerParameters.generateRecordId, readerParameters.generateRecordBytes, segIdFieldCount, segmentIdPrefix, readerParameters.metadataPolicy)
}

def getCodePage(codePageName: String, codePageClass: Option[String]): CodePage = {
codePageClass match {
case Some(c) => CodePage.getCodePageByClass(c)
case None => CodePage.getCodePageByName(codePageName)
}
}
}
Loading

0 comments on commit 4c3d79f

Please sign in to comment.