diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordExtractor.scala index 47c8ad63..b583a37b 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordExtractor.scala @@ -30,6 +30,9 @@ trait RawRecordExtractor extends Iterator[Array[Byte]] { * The offset should point to the absolute beginning of the record, e.g. including headers, * so that if a record extractor is started from this offset it would be able to extract the record * by invoking .next(). + * + * IMPORTANT. The offset points to the next record to be fetched by .next(). If this invariant is not held, + * the reader might get inconsistent record ids, or can fail in certain circumstances. */ def offset: Long diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala index e9b04220..41778a67 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala @@ -57,6 +57,14 @@ object IndexGenerator extends Logging { val indexEntry = SparseIndexEntry(fileStartOffset, -1, fileId, recordIndex) index += indexEntry + if (dataStream.offset != fileStartOffset && recordExtractor.isDefined) { + logger.warn("The record extractor has returned the offset that is not the beginning of the file. " + + s"Expected: $fileStartOffset. Got: ${dataStream.offset}. File: ${dataStream.inputFileName}. " + + "It will be assumed that the offset is shifted by 1 record, but if you have record id inconsistency, " + + "please fix the record extractor.") + recordIndex += 1 + } + var endOfFileReached = false while (!endOfFileReached) { var record: Array[Byte] = null diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala index 910135e6..66bb902a 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala @@ -229,7 +229,7 @@ private[source] object IndexBuilder extends Logging { private def repartitionIndexes(indexRDD: RDD[SparseIndexEntry]): RDD[SparseIndexEntry] = { val indexCount = indexRDD.count() val numPartitions = Math.min(indexCount, Constants.maxNumPartitions).toInt - logger.warn(s"Index elements count: $indexCount, number of partitions = $numPartitions") + logger.info(s"Index elements count: $indexCount, number of partitions = $numPartitions") indexRDD.repartition(numPartitions).cache() }