Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolved Path java serialization issue in HadoopGeoTiffRDD #2105

Merged
merged 1 commit into from
Mar 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ object HadoopGeoTiffRDD {
* Creates a RDD[(K, V)] whose K and V depends on the type of the GeoTiff that is going to be read in.
*
* @param path Hdfs GeoTiff path.
* @param pathToKey function to transform input key basing on the Path information.
* @param uriToKey function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def apply[I, K, V](path: Path, pathToKey: (Path, I) => K, options: Options)(implicit sc: SparkContext, rr: RasterReader[Options, (I, V)]): RDD[(K, V)] = {
def apply[I, K, V](path: Path, uriToKey: (URI, I) => K, options: Options)(implicit sc: SparkContext, rr: RasterReader[Options, (I, V)]): RDD[(K, V)] = {
val conf = configuration(path, options)
options.maxTileSize match {
case Some(tileSize) =>
Expand All @@ -99,7 +99,7 @@ object HadoopGeoTiffRDD {
classOf[TiffTags]
).mapValues { tiffTags => (tiffTags.cols, tiffTags.rows) }

apply[I, K, V](pathsAndDimensions, pathToKey, options)
apply[I, K, V](pathsAndDimensions, uriToKey, options)
case None =>
sc.newAPIHadoopRDD(
conf,
Expand All @@ -109,7 +109,7 @@ object HadoopGeoTiffRDD {
).mapPartitions(
_.map { case (p, bytes) =>
val (k, v) = rr.readFully(ByteBuffer.wrap(bytes), options)
pathToKey(p, k) -> v
uriToKey(p.toUri, k) -> v
},
preservesPartitioning = true
)
Expand All @@ -123,16 +123,16 @@ object HadoopGeoTiffRDD {
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def apply[K, V](path: Path, options: Options)(implicit sc: SparkContext, rr: RasterReader[Options, (K, V)]): RDD[(K, V)] =
apply[K, K, V](path, (_: Path, key: K) => key, options)
apply[K, K, V](path, (_: URI, key: K) => key, options)

/**
* Creates a RDD[(K, V)] whose K and V depends on the type of the GeoTiff that is going to be read in.
*
* @param pathsToDimensions RDD keyed by GeoTiff path with (cols, rows) tuple as value.
* @param pathToKey function to transform input key basing on the Path information.
* @param uriToKey function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def apply[I, K, V](pathsToDimensions: RDD[(Path, (Int, Int))], pathToKey: (Path, I) => K, options: Options)
def apply[I, K, V](pathsToDimensions: RDD[(Path, (Int, Int))], uriToKey: (URI, I) => K, options: Options)
(implicit rr: RasterReader[Options, (I, V)]): RDD[(K, V)] = {

val conf = new SerializableConfiguration(pathsToDimensions.sparkContext.hadoopConfiguration)
Expand All @@ -159,7 +159,7 @@ object HadoopGeoTiffRDD {
}

val (k, v) = rr.readWindow(reader, pixelWindow, options)
pathToKey(path, k) -> v
uriToKey(path.toUri, k) -> v
}
}

Expand All @@ -168,11 +168,11 @@ object HadoopGeoTiffRDD {
* It assumes that the provided files are [[SinglebandGeoTiff]]s.
*
* @param path Hadoop path to recursively search for GeoTiffs.
* @param pathToKey function to transform input key basing on the Path information.
* @param uriToKey function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def singleband[I, K](path: Path, pathToKey: (Path, I) => K, options: Options)(implicit sc: SparkContext, rr: RasterReader[Options, (I, Tile)]): RDD[(K, Tile)] =
apply[I, K, Tile](path, pathToKey, options)
def singleband[I, K](path: Path, uriToKey: (URI, I) => K, options: Options)(implicit sc: SparkContext, rr: RasterReader[Options, (I, Tile)]): RDD[(K, Tile)] =
apply[I, K, Tile](path, uriToKey, options)

/**
* Creates RDDs with the [(K, V)] values where V is a [[Tile]].
Expand All @@ -189,12 +189,12 @@ object HadoopGeoTiffRDD {
* It assumes that the provided files are [[MultibandGeoTiff]]s.
*
* @param path Hadoop path to recursively search for GeoTiffs.
* @param pathToKey function to transform input key basing on the Path information.
* @param uriToKey function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/

def multiband[I, K](path: Path, pathToKey: (Path, I) => K, options: Options)(implicit sc: SparkContext, rr: RasterReader[Options, (I, MultibandTile)]): RDD[(K, MultibandTile)] =
apply[I, K, MultibandTile](path, pathToKey, options)
def multiband[I, K](path: Path, uriToKey: (URI, I) => K, options: Options)(implicit sc: SparkContext, rr: RasterReader[Options, (I, MultibandTile)]): RDD[(K, MultibandTile)] =
apply[I, K, MultibandTile](path, uriToKey, options)

/**
* Creates RDDs with the [(K, V)] values where V is a [[MultibandTile]].
Expand Down Expand Up @@ -230,11 +230,11 @@ object HadoopGeoTiffRDD {
* It assumes that the provided files are [[SinglebandGeoTiff]]s.
*
* @param path Hadoop path to recursively search for GeoTiffs.
* @param pathToKey function to transform input key basing on the Path information.
* @param uriToKey function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def spatial(path: Path, pathToKey: (Path, ProjectedExtent) => ProjectedExtent, options: Options)(implicit sc: SparkContext): RDD[(ProjectedExtent, Tile)] =
singleband[ProjectedExtent, ProjectedExtent](path, pathToKey, options)
def spatial(path: Path, uriToKey: (URI, ProjectedExtent) => ProjectedExtent, options: Options)(implicit sc: SparkContext): RDD[(ProjectedExtent, Tile)] =
singleband[ProjectedExtent, ProjectedExtent](path, uriToKey, options)

/**
* Creates RDDs with the [(K, V)] values being [[ProjectedExtent]] and [[MultibandTile]], respectively.
Expand All @@ -260,11 +260,11 @@ object HadoopGeoTiffRDD {
* It assumes that the provided files are [[MultibandGeoTiff]]s.
*
* @param path Hadoop path to recursively search for GeoTiffs.
* @param pathToKey function to transform input key basing on the Path information.
* @param uriToKey function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def spatialMultiband(path: Path, pathToKey: (Path, ProjectedExtent) => ProjectedExtent, options: Options)(implicit sc: SparkContext): RDD[(ProjectedExtent, MultibandTile)] =
multiband[ProjectedExtent, ProjectedExtent](path, pathToKey, options)
def spatialMultiband(path: Path, uriToKey: (URI, ProjectedExtent) => ProjectedExtent, options: Options)(implicit sc: SparkContext): RDD[(ProjectedExtent, MultibandTile)] =
multiband[ProjectedExtent, ProjectedExtent](path, uriToKey, options)

/**
* Creates RDDs with the [(K, V)] values being [[TemporalProjectedExtent]] and [[Tile]], respectively.
Expand All @@ -290,11 +290,11 @@ object HadoopGeoTiffRDD {
* It assumes that the provided files are [[SinglebandGeoTiff]]s.
*
* @param path Hadoop path to recursively search for GeoTiffs.
* @param pathToKey function to transform input key basing on the Path information.
* @param uriToKey function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def temporal(path: Path, pathToKey: (Path, TemporalProjectedExtent) => TemporalProjectedExtent, options: Options)(implicit sc: SparkContext): RDD[(TemporalProjectedExtent, Tile)] =
singleband[TemporalProjectedExtent, TemporalProjectedExtent](path, pathToKey, options)
def temporal(path: Path, uriToKey: (URI, TemporalProjectedExtent) => TemporalProjectedExtent, options: Options)(implicit sc: SparkContext): RDD[(TemporalProjectedExtent, Tile)] =
singleband[TemporalProjectedExtent, TemporalProjectedExtent](path, uriToKey, options)

/**
* Creates RDDs with the [(K, V)] values being [[TemporalProjectedExtent]] and [[MultibandTile]], respectively.
Expand All @@ -320,9 +320,9 @@ object HadoopGeoTiffRDD {
* It assumes that the provided files are [[MultibandGeoTiff]]s.
*
* @param path Hadoop path to recursively search for GeoTiffs.
* @param pathToKey function to transform input key basing on the Path information.
* @param uriToKey function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def temporalMultiband(path: Path, pathToKey: (Path, TemporalProjectedExtent) => TemporalProjectedExtent, options: Options)(implicit sc: SparkContext): RDD[(TemporalProjectedExtent, MultibandTile)] =
multiband[TemporalProjectedExtent, TemporalProjectedExtent](path, pathToKey, options)
def temporalMultiband(path: Path, uriToKey: (URI, TemporalProjectedExtent) => TemporalProjectedExtent, options: Options)(implicit sc: SparkContext): RDD[(TemporalProjectedExtent, MultibandTile)] =
multiband[TemporalProjectedExtent, TemporalProjectedExtent](path, uriToKey, options)
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ class HadoopGeoTiffRDDSpec
val actual =
HadoopGeoTiffRDD.singleband[ProjectedExtent, TemporalProjectedExtent](
path = tilesDir,
pathToKey = (p: Path, key: ProjectedExtent) => {
val n = pattern.findAllIn(p.toUri.getPath.split("/").last)
uriToKey = (u: URI, key: ProjectedExtent) => {
val n = pattern.findAllIn(u.getPath.split("/").last)
n.next()
val gr = n.group(1)
val zdt = LocalDateTime.of(gr.substring(0, 4).toInt, gr.substring(4, 6).toInt, 1, 0, 0, 0).atZone(ZoneId.of("UTC"))
Expand Down