-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add persist(vectorTiles, zoom) method to Pipeline to allow override #140
Add persist(vectorTiles, zoom) method to Pipeline to allow override #140
Conversation
The default impl is vectorpipe.vectortile.export.saveVectorTiles
One implication of this change is that users who want to override |
Hey, Jon. Thanks for the contribution! I like the idea. It's been a bit since I've been in this code, but it seems like perhaps the Let me know what you think. |
@jpolchlo Yea that idea cross my mind, but I still find a little odd that the user would need to provide the output URI to I'm thinking something like this: def apply(input: DataFrame, pipeline: Pipeline, options: Options, outputUri: URI): Unit = {
val vectorTilesByZoom = apply(input, pipeline, options)
vectorTilesByZoom.foreach { case (zoom, vectorTiles) =>
saveVectorTiles(vectorTiles, zoom, outputUri)
}
}
def apply(input: DataFrame, pipeline: Pipeline, options: Options): Map[Int, RDD[(SpatialKey, VectorTile)]] = {
// same impl except doesn't persist
} Also, the overloaded What do you think? I have this change ready and can update this PR if it sounds good to you. |
As much as it would help solve some weirdnesses to extract the write step out of the pipeline, it does kinda ruin the completeness of the pipeline, and so I'd rather consider some alternative. It occurs to me that your addition of On a second point, your amendment to |
The Pipeline.Output mixin trait provides the existing default output behavior saving to S3 or Hadoop using the baseOutputURI which has been moved out of the base Pipeline trait. Additionally, the persist() method was renamed to finalize() to be more general since users may not want to persist immediately or even persist at all. I feel this approach provides a good abstraction while making the pipeline implementation more clear as it could then be implemented like "case class SomePipeline(baseOutputURI: URI) extends Pipeline with Pipeline.Output" The Pipeline scaladocs were also updated and fixed to create valid links to referenced code.
@jpolchlo Hey sorry for such a late response! I forgot about this PR, but now I'm back working with VectorPipe and I think I have a good solution which has addressed your good points.
The
So it does not make assumptions about what the user wants to do with the final RDD of vector tiles. In my case, I actually want to do some additional processing before persisting, so this generalization makes more sense to me. This allows separation of concern for "output" by moving the Additionally, I reverted the |
val geometryColumn: String | ||
* Name of the column containing the JTS geometry. | ||
*/ | ||
def geometryColumn: String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed geometryColumn
and layerMultiplicity
to def
from val
to give users more freedom in how they are defined since generally traits shouldn't define abstract vals. Also changed to make the scaladoc links work properly :p
* The only extra note is that [[vectorpipe.vectortile.Pipeline#reduce reduce]] | ||
* and [[vectorpipe.vectortile.Pipeline#simplify simplify]] will be called prior | ||
* to processing the initial zoom level. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The github diffs make it looks like the scaladocs changed a lot, but I just fixed links and formatting (indentation was a result of my IDE). They all still say the same thing.
* Receive the final RDD of generated vector tiles for a zoom level | ||
* to finish any additional processing or handling such as persisting to a datastore. | ||
*/ | ||
def finalize(vectorTiles: RDD[(SpatialKey, VectorTile)], zoom: Int): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't think of a better name for this method. Alternatives I was thinking of was: finish
, complete
, done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finalize
works for me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you nailed this. It's a nice, clean solution. Thanks for updating the docs as well.
Overview
persist(vectorTiles: RDD[(SpatialKey, VectorTile)], zoom: Int)
method to the Pipeline with a default implementation simply passing the call tosaveVectorTiles(vectorTiles, zoom, pipeline.baseOutputURI)
saveVectorTiles()
withpipeline.persist()
(which by default issaveVectorTiles
)VectorPipe.apply()
fromUnit
toMap[Int, RDD[(SpatialKey, VectorTile)]]
as a map the generated VectorTile RDDs keyed by the zoom level for alllayoutLevels
defined in the options.Reason for changes:
RDD[(SpatialKey, VectorTile)]
This is useful for cases when additional processing outside the scope of VectorPipe is needed after the fact without acting on the RDD and persisting an incomplete state of the data such as when using GeoTrellis to then rasterize the vectortile geometries. The user can now decide to override the
persist()
method to save to something like MongoDB, or to be a no-op and use VectorPipe as just another step in a larger Spark job like:Checklist
Closes #XXX