Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HadoopGeoTiffRDD Window Reading Error #2103

Closed
jbouffard opened this issue Mar 28, 2017 · 1 comment · Fixed by #2105
Closed

Fix HadoopGeoTiffRDD Window Reading Error #2103

jbouffard opened this issue Mar 28, 2017 · 1 comment · Fixed by #2105
Assignees

Comments

@jbouffard
Copy link
Contributor

jbouffard commented Mar 28, 2017

A RDD[(org.apache.hadoop.fs.Path, GridBounds)] is returned instead of a RDD[(ProjectedExtent, Tile)] when doing windowed reads with HadoopGeoTiffRDD with the numPartitions option set. There may be issues with other combinations of options, but it is not known at this time.

Example code where this error appears.

scala> import geotrellis.spark.io.hadoop._
import geotrellis.spark.io.hadoop._

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.rdd._
import org.apache.spark.rdd._

scala> import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.Path

scala> val conf = new SparkConf().setAppName("test").setMaster("local")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@5bf89627

scala> implicit val sc = new SparkContext(conf)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/03/28 16:17:09 INFO SparkContext: Running Spark version 2.1.0
17/03/28 16:17:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/28 16:17:10 WARN Utils: Your hostname, hay resolves to a loopback address: 127.0.1.1; using 172.26.109.161 instead (on interface wlp58s0)
17/03/28 16:17:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/28 16:17:10 INFO SecurityManager: Changing view acls to: jacob
17/03/28 16:17:10 INFO SecurityManager: Changing modify acls to: jacob
17/03/28 16:17:10 INFO SecurityManager: Changing view acls groups to: 
17/03/28 16:17:10 INFO SecurityManager: Changing modify acls groups to: 
17/03/28 16:17:10 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(jacob); groups with view permissions: Set(); users  with modify permissions: Set(jacob); groups with modify permissions: Set()
17/03/28 16:17:10 INFO Utils: Successfully started service 'sparkDriver' on port 46085.
17/03/28 16:17:10 INFO SparkEnv: Registering MapOutputTracker
17/03/28 16:17:10 INFO SparkEnv: Registering BlockManagerMaster
17/03/28 16:17:10 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/03/28 16:17:10 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/03/28 16:17:10 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-e2b3f205-1d82-4771-88a3-5c87df1abd90
17/03/28 16:17:10 INFO MemoryStore: MemoryStore started with capacity 912.3 MB
17/03/28 16:17:11 INFO SparkEnv: Registering OutputCommitCoordinator
17/03/28 16:17:11 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/03/28 16:17:11 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.26.109.161:4040
17/03/28 16:17:11 INFO Executor: Starting executor ID driver on host localhost
17/03/28 16:17:11 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44966.
17/03/28 16:17:11 INFO NettyBlockTransferService: Server created on 172.26.109.161:44966
17/03/28 16:17:11 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/03/28 16:17:11 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.26.109.161, 44966, None)
17/03/28 16:17:11 INFO BlockManagerMasterEndpoint: Registering block manager 172.26.109.161:44966 with 912.3 MB RAM, BlockManagerId(driver, 172.26.109.161, 44966, None)
17/03/28 16:17:11 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.26.109.161, 44966, None)
17/03/28 16:17:11 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.26.109.161, 44966, None)
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@31c3a288

scala> val path = new Path("../econic.tif")
path: org.apache.hadoop.fs.Path = ../econic.tif

scala> val rdd = HadoopGeoTiffRDD.spatialMultiband(path, HadoopGeoTiffRDD.Options(maxTileSize=Some(256), numPartitions=Some(500)))
17/03/28 16:19:11 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 275.4 KB, free 912.0 MB)
17/03/28 16:19:11 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.0 KB, free 912.0 MB)
17/03/28 16:19:11 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.26.109.161:44966 (size: 23.0 KB, free: 912.3 MB)
17/03/28 16:19:11 INFO SparkContext: Created broadcast 0 from newAPIHadoopRDD at HadoopGeoTiffRDD.scala:95
rdd: org.apache.spark.rdd.RDD[(geotrellis.vector.ProjectedExtent, geotrellis.raster.MultibandTile)] = MapPartitionsRDD[7] at map at HadoopGeoTiffRDD.scala:153

scala> rdd.first()
17/03/28 16:19:17 INFO FileInputFormat: Total input paths to process : 1
17/03/28 16:19:17 INFO SparkContext: Starting job: first at <console>:44
17/03/28 16:19:17 INFO DAGScheduler: Registering RDD 3 (repartition at HadoopGeoTiffRDD.scala:149)
17/03/28 16:19:17 INFO DAGScheduler: Got job 0 (first at <console>:44) with 1 output partitions
17/03/28 16:19:17 INFO DAGScheduler: Final stage: ResultStage 1 (first at <console>:44)
17/03/28 16:19:17 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
17/03/28 16:19:17 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
17/03/28 16:19:17 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at repartition at HadoopGeoTiffRDD.scala:149), which has no missing parents
17/03/28 16:19:17 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.4 KB, free 912.0 MB)
17/03/28 16:19:17 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.5 KB, free 912.0 MB)
17/03/28 16:19:17 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.26.109.161:44966 (size: 2.5 KB, free: 912.3 MB)
17/03/28 16:19:17 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
17/03/28 16:19:17 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at repartition at HadoopGeoTiffRDD.scala:149)
17/03/28 16:19:17 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/03/28 16:19:17 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6014 bytes)
17/03/28 16:19:17 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/03/28 16:19:18 INFO NewHadoopRDD: Input split: file:/home/jacob/Documents/econic.tif:0+266468
17/03/28 16:19:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.hadoop.fs.Path
	- field (class "scala.Tuple2", name: "_1", type: "class java.lang.Object")
	- root object (class "scala.Tuple2", (file:/home/jacob/Documents/econic.tif,GridBounds(0,0,255,255)))
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:135)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:233)
	at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
	at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:699)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/03/28 16:19:18 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.fs.Path
	- field (class "scala.Tuple2", name: "_1", type: "class java.lang.Object")
	- root object (class "scala.Tuple2", (file:/home/jacob/Documents/econic.tif,GridBounds(0,0,255,255))); not retrying
17/03/28 16:19:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/03/28 16:19:18 INFO TaskSchedulerImpl: Cancelling stage 0
17/03/28 16:19:18 INFO DAGScheduler: ShuffleMapStage 0 (repartition at HadoopGeoTiffRDD.scala:149) failed in 0.472 s due to Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.fs.Path
	- field (class "scala.Tuple2", name: "_1", type: "class java.lang.Object")
	- root object (class "scala.Tuple2", (file:/home/jacob/Documents/econic.tif,GridBounds(0,0,255,255)))
17/03/28 16:19:18 INFO DAGScheduler: Job 0 failed: first at <console>:44, took 0.630601 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.fs.Path
	- field (class "scala.Tuple2", name: "_1", type: "class java.lang.Object")
	- root object (class "scala.Tuple2", (file:/home/jacob/Documents/econic.tif,GridBounds(0,0,255,255)))
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1326)
  at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1367)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.first(RDD.scala:1366)
  ... 42 elided
@jbouffard
Copy link
Contributor Author

@lossyrob Could you please assign me this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant