Skip to content

Commit

Permalink
#146 Add support for S3 storage for files with variable-length records.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Dec 30, 2020
1 parent eba6922 commit 9ef2404
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class SparseIndexSpecSpec extends WordSpec {
10 B2 PIC X(5).
"""

""

"sparseIndexGenerator()" should {
"Generate a sparse index for ASCII text data" in {
val copybook = CopybookParser.parse(copybookContents, ASCII)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.file.{Files, Paths}

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import za.co.absa.cobrix.cobol.reader.parameters.CobolParameters
import za.co.absa.cobrix.spark.cobol.utils.FileNameUtils

Expand Down Expand Up @@ -63,8 +63,9 @@ object CopybookContentLoader {
}

private def loadCopybookFromHDFS(hadoopConfiguration: Configuration, copyBookHDFSPath: String): String = {
val hdfs = FileSystem.get(hadoopConfiguration)
val stream = hdfs.open(new Path(copyBookHDFSPath))
val copybookPath = new Path(copyBookHDFSPath)
val hdfs = copybookPath.getFileSystem(hadoopConfiguration)
val stream = hdfs.open(copybookPath)
try IOUtils.readLines(stream).asScala.mkString("\n") finally stream.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@
package za.co.absa.cobrix.spark.cobol.source.index

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs._
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.slf4j.LoggerFactory
import za.co.absa.cobrix.cobol.reader.common.Constants
import za.co.absa.cobrix.spark.cobol.reader.{Reader, VarLenReader}
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
import za.co.absa.cobrix.spark.cobol.reader.{Reader, VarLenReader}
import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder
import za.co.absa.cobrix.spark.cobol.utils.{HDFSUtils, SparkUtils}

import scala.collection.mutable.ArrayBuffer

/**
* Builds offsets indexes for distributed processing of variable-length records.
*
Expand All @@ -39,20 +42,25 @@ import za.co.absa.cobrix.spark.cobol.utils.{HDFSUtils, SparkUtils}
*
* In a nutshell, ideally, there will be as many partitions as are there are indexes.
*/
private [source] object IndexBuilder {
private[source] object IndexBuilder {

private val logger = LoggerFactory.getLogger(this.getClass)

def buildIndex(filesList: Array[FileWithOrder], cobolReader: Reader, sqlContext: SQLContext)(localityParams: LocalityParameters): RDD[SparseIndexEntry] = {
val fs = new Path(filesList.head.filePath).getFileSystem(sqlContext.sparkSession.sparkContext.hadoopConfiguration)
val isIndexSupported = isFileRandomAccessSupported(fs)

cobolReader match {
case reader: VarLenReader => {
if (reader.isIndexGenerationNeeded && localityParams.improveLocality){
case reader: VarLenReader if isIndexSupported => {
if (reader.isIndexGenerationNeeded && localityParams.improveLocality && isDataLocalitySupported(fs)) {
buildIndexForVarLenReaderWithFullLocality(filesList, reader, sqlContext)(localityParams.optimizeAllocation)
}
else {
buildIndexForVarLenReader(filesList, reader, sqlContext)
}
}
case reader: VarLenReader =>
buildIndexForFullFiles(filesList, reader, sqlContext)
case _ => null
}
}
Expand All @@ -62,7 +70,7 @@ private [source] object IndexBuilder {
* to those records in those locations.
*/
private def buildIndexForVarLenReaderWithFullLocality(filesList: Array[FileWithOrder], reader: VarLenReader, sqlContext: SQLContext)
(optimizeAllocation: Boolean): RDD[SparseIndexEntry] = {
(optimizeAllocation: Boolean): RDD[SparseIndexEntry] = {

val conf = sqlContext.sparkContext.hadoopConfiguration

Expand All @@ -72,14 +80,11 @@ private [source] object IndexBuilder {

val indexes = filesRDD.mapPartitions(
partition => {
val fileSystem = FileSystem.get(sconf.value)
partition.flatMap(row => {
val filePath = row.filePath
val fileOrder = row.order
val index = generateIndexEntry(row, sconf.value, reader)

logger.info(s"Going to generate index for the file: $filePath")
val index = reader.generateIndex(new FileStreamer(filePath, fileSystem, 0, 0),
fileOrder, reader.isRdwBigEndian)
val filePath = row.filePath
val fileSystem = new Path(filePath).getFileSystem(sconf.value)

index.map(entry => {
val offset = if (entry.offsetFrom >= 0) entry.offsetFrom else 0
Expand All @@ -91,7 +96,7 @@ private [source] object IndexBuilder {
})

logger.info("Going to collect located indexes into driver.")
val offsetsLocations: Seq[(SparseIndexEntry,Seq[String])] = if (optimizeAllocation) {
val offsetsLocations: Seq[(SparseIndexEntry, Seq[String])] = if (optimizeAllocation) {
optimizeDistribution(indexes.collect(), sqlContext.sparkContext)
}
else {
Expand All @@ -105,9 +110,61 @@ private [source] object IndexBuilder {
offsetsLocations.foreach(allocation => logger.debug(allocation.toString()))
}

sqlContext.sparkContext.makeRDD(offsetsLocations)
val indexRDD = sqlContext.sparkContext.makeRDD(offsetsLocations)

repartitionIndexes(indexRDD)
}

/**
* Builds records indexes. Does not take locality into account. Might be removed in further releases.
*/
def buildIndexForVarLenReader(filesList: Array[FileWithOrder], reader: VarLenReader, sqlContext: SQLContext): RDD[SparseIndexEntry] = {
val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)
val conf = sqlContext.sparkContext.hadoopConfiguration
val sconf = new SerializableConfiguration(conf)

val indexRDD = filesRDD.mapPartitions(
partition => {
partition.flatMap(row => {
generateIndexEntry(row, sconf.value, reader)
})
}).cache

repartitionIndexes(indexRDD)
}

/**
* Builds records indexes for filesystems that do not support fetching from the middle.
*/
def buildIndexForFullFiles(filesList: Array[FileWithOrder], reader: VarLenReader, sqlContext: SQLContext): RDD[SparseIndexEntry] = {
val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)

val indexRDD = filesRDD.mapPartitions(
partition => {
partition.flatMap(row => {
val fileId = row.order

val element = SparseIndexEntry(0, -1, fileId, 0L)
ArrayBuffer[SparseIndexEntry](element)
})
}).cache

repartitionIndexes(indexRDD)
}

private def generateIndexEntry(fileWithOrder: FileWithOrder, config: Configuration, reader: VarLenReader): ArrayBuffer[SparseIndexEntry] = {
val filePath = fileWithOrder.filePath
val path = new Path(filePath)
val fileOrder = fileWithOrder.order
val fileSystem = path.getFileSystem(config)

logger.info(s"Going to generate index for the file: $filePath")
val index = reader.generateIndex(new FileStreamer(filePath, fileSystem, 0, 0),
fileOrder, reader.isRdwBigEndian)
index
}


private def getBlockLengthByIndexEntry(entry: SparseIndexEntry): Long = {
val indexedLength = if (entry.offsetTo > 0) entry.offsetTo else Long.MaxValue

Expand All @@ -119,7 +176,7 @@ private [source] object IndexBuilder {
// of the block.
// In other words we don't care if the last megabyte is not node local as long as
// most of the split chunk is node local.
val significantLength = if (indexedLength < 10L*Constants.megabyte) {
val significantLength = if (indexedLength < 10L * Constants.megabyte) {
indexedLength
} else {
indexedLength - Constants.megabyte
Expand All @@ -130,7 +187,7 @@ private [source] object IndexBuilder {
/**
* Tries to balance the allocation among unused executors.
*/
private def optimizeDistribution(allocation: Seq[(SparseIndexEntry,Seq[String])], sc: SparkContext): Seq[(SparseIndexEntry,Seq[String])] = {
private def optimizeDistribution(allocation: Seq[(SparseIndexEntry, Seq[String])], sc: SparkContext): Seq[(SparseIndexEntry, Seq[String])] = {
val availableExecutors = SparkUtils.currentActiveExecutors(sc)
logger.info(s"Trying to balance ${allocation.size} partitions among all available executors ($availableExecutors)")
LocationBalancer.balance(allocation, availableExecutors)
Expand All @@ -146,36 +203,30 @@ private [source] object IndexBuilder {
(file, HDFSUtils.getBlocksLocations(new Path(file.filePath), fileSystem))
}).toSeq

filesWithPreferredLocations.foreach(a => logger.debug(a.toString()))
if (logger.isDebugEnabled()) {
filesWithPreferredLocations.foreach(a => logger.debug(a.toString()))
}

sqlContext.sparkContext.makeRDD(filesWithPreferredLocations)
}

/**
* Builds records indexes. Does not take locality into account. Might be removed in further releases.
*/
def buildIndexForVarLenReader(filesList: Array[FileWithOrder], reader: VarLenReader, sqlContext: SQLContext): RDD[SparseIndexEntry] = {
val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)
val conf = sqlContext.sparkContext.hadoopConfiguration
val sconf = new SerializableConfiguration(conf)

val indexes = filesRDD.mapPartitions(
partition => {
val fileSystem = FileSystem.get(sconf.value)
partition.flatMap(row => {
val filePath = row.filePath
val fileOrder = row.order

logger.info(s"Going to generate index for the file: $filePath")
val index = reader.generateIndex(new FileStreamer(filePath, fileSystem, 0, 0),
fileOrder, reader.isRdwBigEndian)
index
}
)
}).cache
val indexCount = indexes.count()
private def repartitionIndexes(indexRDD: RDD[SparseIndexEntry]): RDD[SparseIndexEntry] = {
val indexCount = indexRDD.count()
val numPartitions = Math.min(indexCount, Constants.maxNumPartitions).toInt
logger.warn(s"Index elements count: $indexCount, number of partitions = $numPartitions")
indexes.repartition(numPartitions).cache()
indexRDD.repartition(numPartitions).cache()
}

def isFileRandomAccessSupported(fs: FileSystem): Boolean = {
fs.isInstanceOf[DistributedFileSystem] ||
fs.isInstanceOf[RawLocalFileSystem] ||
fs.isInstanceOf[FilterFileSystem] ||
fs.isInstanceOf[LocalFileSystem] ||
fs.isInstanceOf[ChecksumFileSystem]
}

def isDataLocalitySupported(fs: FileSystem): Boolean = {
fs.isInstanceOf[DistributedFileSystem]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.cobrix.spark.cobol.source.scanners
import java.nio.charset.StandardCharsets

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.slf4j.LoggerFactory
Expand All @@ -41,9 +41,10 @@ private[source] object CobolScanners {
val sconf = new SerializableConfiguration(conf)

indexes.flatMap(indexEntry => {
val fileSystem = FileSystem.get(sconf.value)
val filePathName = filesMap(indexEntry.fileId)
val fileName = new Path(filePathName).getName
val path = new Path(filePathName)
val fileSystem = path.getFileSystem(sconf.value)
val fileName = path.getName
val numOfBytes = if (indexEntry.offsetTo > 0L) indexEntry.offsetTo - indexEntry.offsetFrom else 0L
val numOfBytesMsg = if (numOfBytes > 0) s"${numOfBytes / Constants.megabyte} MB" else "until the end"

Expand All @@ -60,10 +61,11 @@ private[source] object CobolScanners {
val sconf = new SerializableConfiguration(conf)
filesRDD.mapPartitions(
partition => {
val fileSystem = FileSystem.get(sconf.value)
partition.flatMap(row => {
val filePath = row.filePath
val fileOrder = row.order
val path = new Path(filePath)
val fileSystem = path.getFileSystem(sconf.value)

logger.info(s"Going to parse file: $filePath")
reader.getRowIterator(new FileStreamer(filePath, fileSystem), 0L, fileOrder, 0L)
Expand All @@ -87,8 +89,6 @@ private[source] object CobolScanners {
throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.")
}

val schema = reader.getSparkSchema

val records = sqlContext.sparkContext.binaryRecords(sourceDir, recordSize, sqlContext.sparkContext.hadoopConfiguration)
recordParser(reader, records)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.cobrix.spark.cobol.source.integration
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

import org.apache.spark.sql.functions._
import org.scalatest.FunSuite
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
Expand Down Expand Up @@ -123,7 +124,7 @@ class Test13aFixedLenFileHeadersSpec extends FunSuite with SparkTestBase {
s"$actualSchemaPath for details.")
}

val actual = df.toJSON.take(60)
val actual = df.orderBy(col("COMPANY_ID")).toJSON.take(60)
val expected = Files.readAllLines(Paths.get(expectedResultsPath), StandardCharsets.ISO_8859_1).toArray

if (!actual.sameElements(expected)) {
Expand Down

0 comments on commit 9ef2404

Please sign in to comment.