Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file wildcards / file globbing unstable #251

Closed
bart-at-qqdatafruits opened this issue Feb 20, 2020 · 7 comments
Closed

file wildcards / file globbing unstable #251

bart-at-qqdatafruits opened this issue Feb 20, 2020 · 7 comments
Labels
accepted Accepted for implementation bug Something isn't working

Comments

@bart-at-qqdatafruits
Copy link

Describe the bug

when using a generic wildcard issues arise when lots of files (eg: path_to_folder/* or path_to_folder)

workaround is to reduce the number of files by using more specific wildcards (eg per year : path_to_folder/D2016* ; path_to_folder/D2017* ; path_to_folder/D2018* ; path_to_folder/D2019* ; path_to_folder/D2020*

Name: java.io.FileNotFoundException Message: File file:/home/jovyan/data/SOURCE/BRAND/initial_transformed/* does not exist StackTrace: at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:431) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557) at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674) at za.co.absa.cobrix.spark.cobol.utils.FileUtils$.findAndLogFirstNonDivisibleFile(FileUtils.scala:198) at za.co.absa.cobrix.spark.cobol.source.scanners.CobolScanners$.areThereNonDivisibleFiles(CobolScanners.scala:107) at za.co.absa.cobrix.spark.cobol.source.scanners.CobolScanners$.buildScanForFixedLength(CobolScanners.scala:87) at za.co.absa.cobrix.spark.cobol.source.CobolRelation.buildScan(CobolRelation.scala:90) at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:308) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365) at org.apache.spark.sql.Dataset.head(Dataset.scala:2550) at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)

To Reproduce

Steps to reproduce the behaviour OR commands run:

%AddDeps za.co.absa.cobrix spark-cobol_2.11 2.0.3 --transitive

val sparkBuilder = SparkSession.builder().appName("Example") val spark = sparkBuilder .getOrCreate()

`
import org.apache.spark.sql.functions._

import org.apache.spark.sql.SparkSession

spark.udf.register("get_file_name", (path: String) => path.split("/").last)

val cobolDataframe = spark
.read
.format("za.co.absa.cobrix.spark.cobol.source")
.option("pedantic", "true")
.option("copybook", "file:///home/jovyan/data/SOURCE/COPYBOOK.txt")
.load("file:///home/jovyan/data/SOURCE/BRAND/initial_transformed/*")
.withColumn("DPSource", callUDF("get_file_name", input_file_name()))

cobolDataframe
//.filter("RECORD.ID % 2 = 0") // filter the even values of the nested field 'RECORD_LENGTH'
.take(20)
.foreach(v => println(v))
`

Expected behaviour

get data

Screenshots

If applicable, add screenshots to help explain your problem.

Screenshot from 2020-02-20 16-21-54-anonymized

Additional context

I Thank you for feedback

@bart-at-qqdatafruits bart-at-qqdatafruits added the bug Something isn't working label Feb 20, 2020
@bart-at-qqdatafruits
Copy link
Author

environment: environment: docker: jupyter/all-spark-notebook:latest + Apache Toree - Scala

loads za.co.absa.cobrix spark-cobol_2.11 2.0.3

@yruslan
Copy link
Collaborator

yruslan commented Feb 20, 2020

Thanks for the bug report. We will take a look/

@yruslan yruslan added the accepted Accepted for implementation label Feb 20, 2020
@bart-at-qqdatafruits
Copy link
Author

bart-at-qqdatafruits commented Feb 21, 2020

@yruslan

  • It looks like following code doesn't filter hidden folders

https://github.com/AbsaOSS/cobrix/blob/master/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala

private val hiddenFileFilter = new PathFilter() { def accept(p: Path): Boolean = { val name = p.getName !name.startsWith("_") && !name.startsWith(".") } }

  • I found a folder ".ipynb_checkpoint" in the data directory.

  • causing

Name: java.lang.IllegalArgumentException Message: There are some files in /home/jovyan/data/SOURCE/BRAND/initial_transformed that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook (100 bytes per record). Check the logs for the names of the files. StackTrace: at za.co.absa.cobrix.spark.cobol.source.scanners.CobolScanners$.buildScanForFixedLength(CobolScanners.scala:88)

  • this can be tested by making a folder starting with a dot (.folder)

  • this kind of error is related to my test-bed and should however not occur on a production environment

@bart-at-qqdatafruits
Copy link
Author

bart-at-qqdatafruits commented Feb 21, 2020

the real wildcard / globbing issue seems to boil down to the the code checking "areThereNonDivisibleFiles", oddly it does not seem to use method "getFiles", "getFiles" should be ok I think

https://github.com/AbsaOSS/cobrix/blob/master/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala

val cobolDataframe = spark .read .format("za.co.absa.cobrix.spark.cobol.source") .option("pedantic", "true") .option("copybook", "file:///home/jovyan/data/SOURCE/COPYBOOK.txt") .load("file:///home/jovyan/data/SOURCE/BRAND/initial_transformed/PREFIX*") .withColumn("DPSource", callUDF("get_file_name", input_file_name()))
Name: java.io.FileNotFoundException Message: File file:/home/jovyan/data/SOURCE/BRAND/initial_transformed/PREFIX* does not exist StackTrace: at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:431) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557) at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674) at za.co.absa.cobrix.spark.cobol.utils.FileUtils$.findAndLogFirstNonDivisibleFile(FileUtils.scala:198) at za.co.absa.cobrix.spark.cobol.source.scanners.CobolScanners$.areThereNonDivisibleFiles(CobolScanners.scala:107) at za.co.absa.cobrix.spark.cobol.source.scanners.CobolScanners$.buildScanForFixedLength(CobolScanners.scala:87) at za.co.absa.cobrix.spark.cobol.source.CobolRelation.buildScan(CobolRelation.scala:90)

@bart-at-qqdatafruits
Copy link
Author

val dirPath = new Path("file:///home/jovyan/data/SOURCE/BRAND/initial_transformed/PREFIX*")

  • below works with wildcard / globbing and return files as used in method "getFiles"
    val stats: Array[FileStatus] = FileSystem.get(spark.sqlContext.sparkContext.hadoopConfiguration).globStatus(dirPath)
  • the below does not work with wildcard / globbing and returns an exception

val stats: Array[FileStatus] = FileSystem.get(spark.sqlContext.sparkContext.hadoopConfiguration).listStatus(dirPath)

  • exception is
    Name: java.io.FileNotFoundException Message: File file:/home/jovyan/data/SAP/AUDI/initial_transformed/SAP* does not exist StackTrace: at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:431) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557) at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674)

  • hence I recommend to use method "globStatus" instead of "listStatus" in method "findAndLogFirstNonDivisibleFile"

@bart-at-qqdatafruits
Copy link
Author

@yruslan

I am currently not equipped and knowledgeable enough to issue a pull request

Please review my findings.

Thanks in advance,

Bart,

@yruslan
Copy link
Collaborator

yruslan commented Feb 22, 2020

This should be fixed and deployed as a snapshot. And a PR is created for the fix: #253. You can check it out.

Here is the current snapshot version.

groupId: za.co.absa.cobrix
artifactId: spark-cobol_2.11
version: 2.0.4-SNAPSHOT

And the fix will be a part of the next release 2.0.4 next week.

yruslan added a commit that referenced this issue Feb 24, 2020
…le-col

#251 Fix glob support and divisibility check for large amount of files.
@yruslan yruslan closed this as completed Feb 25, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Accepted for implementation bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants