Skip to content

Commit

Permalink
#614 Add a workaround for record extractors that prefetch a single re…
Browse files Browse the repository at this point in the history
…cord ahead.
  • Loading branch information
yruslan committed May 3, 2023
1 parent 694cb52 commit 42bee62
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ trait RawRecordExtractor extends Iterator[Array[Byte]] {
* The offset should point to the absolute beginning of the record, e.g. including headers,
* so that if a record extractor is started from this offset it would be able to extract the record
* by invoking .next().
*
* IMPORTANT. The offset points to the next record to be fetched by .next(). If this invariant is not held,
* the reader might get inconsistent record ids, or can fail in certain circumstances.
*/
def offset: Long

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ object IndexGenerator extends Logging {
val indexEntry = SparseIndexEntry(fileStartOffset, -1, fileId, recordIndex)
index += indexEntry

if (dataStream.offset != fileStartOffset && recordExtractor.isDefined) {
logger.warn("The record extractor has returned the offset that is not the beginning of the file. " +
s"Expected: $fileStartOffset. Got: ${dataStream.offset}. File: ${dataStream.inputFileName}. " +
"It will be assumed that the offset is shifted by 1 record, but if you have record id inconsistency, " +
"please fix the record extractor.")
recordIndex += 1
}

var endOfFileReached = false
while (!endOfFileReached) {
var record: Array[Byte] = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private[source] object IndexBuilder extends Logging {
private def repartitionIndexes(indexRDD: RDD[SparseIndexEntry]): RDD[SparseIndexEntry] = {
val indexCount = indexRDD.count()
val numPartitions = Math.min(indexCount, Constants.maxNumPartitions).toInt
logger.warn(s"Index elements count: $indexCount, number of partitions = $numPartitions")
logger.info(s"Index elements count: $indexCount, number of partitions = $numPartitions")
indexRDD.repartition(numPartitions).cache()
}

Expand Down

0 comments on commit 42bee62

Please sign in to comment.