Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persist(vectorTiles, zoom) method to Pipeline to allow override #140

Merged
merged 5 commits into from
Sep 17, 2020

Conversation

JonMcPherson
Copy link
Contributor

@JonMcPherson JonMcPherson commented Jul 21, 2020

Overview

  • Adds persist(vectorTiles: RDD[(SpatialKey, VectorTile)], zoom: Int) method to the Pipeline with a default implementation simply passing the call to saveVectorTiles(vectorTiles, zoom, pipeline.baseOutputURI)
  • Replaces call to saveVectorTiles() with pipeline.persist() (which by default is saveVectorTiles)
  • Changes return type of VectorPipe.apply() from Unit to Map[Int, RDD[(SpatialKey, VectorTile)]] as a map the generated VectorTile RDDs keyed by the zoom level for all layoutLevels defined in the options.

Reason for changes:

  • let users override the behavior of saving the generated vector tiles
  • let users access the generated vector tiles as they are when persisted as RDD[(SpatialKey, VectorTile)]

This is useful for cases when additional processing outside the scope of VectorPipe is needed after the fact without acting on the RDD and persisting an incomplete state of the data such as when using GeoTrellis to then rasterize the vectortile geometries. The user can now decide to override the persist() method to save to something like MongoDB, or to be a no-op and use VectorPipe as just another step in a larger Spark job like:

object ThePipeline extends Pipeline {
  // ...
  override def persist(vectorTiles: RDD[(SpatialKey, VectorTile)], zoom: Int, uri: URI): Unit = ()
}
val vectorTiles = VectorPipe(df, ThePipeline, options)
vectorTiles.transform(...).rasterize(...)

Checklist

  • Add entry to CHANGELOG.md

Closes #XXX

@JonMcPherson JonMcPherson changed the title Add persist(vectorTiles, zoom, uri) method to Pipeline to allow override Add persist(vectorTiles, zoom) method to Pipeline to allow override Jul 21, 2020
@JonMcPherson
Copy link
Contributor Author

One implication of this change is that users who want to override persist() as a no-op still need to override the required baseOutputURI even though it would be unused. And now the only reason we have the abstract baseOutputURI is to be used in the default implementation of persist(). This might be an acceptable weirdness for an uncommon use case.

@jpolchlo
Copy link
Contributor

jpolchlo commented Jul 27, 2020

Hey, Jon. Thanks for the contribution! I like the idea.

It's been a bit since I've been in this code, but it seems like perhaps the baseOutputURI could be converted to a parameter to persist() as you've specified it. It always felt a bit odd to have it as a class member, and this is probably why—saving files to some location as the only option for export is not very general. We can talk about proper signatures, but I'll suggest persist: java.net.URI → Unit for a start. In which case, VectorPipe::apply should probably take in the destination URI (which could be maybe have a null value if the user knows the pipeline has a no-op persist implementation).

Let me know what you think.

@jpolchlo jpolchlo self-assigned this Jul 27, 2020
@JonMcPherson
Copy link
Contributor Author

JonMcPherson commented Jul 30, 2020

@jpolchlo Yea that idea cross my mind, but I still find a little odd that the user would need to provide the output URI to VectorPipe::apply only for it to be provided back to them in the call to Pipeline::persist when it is implemented by the user. However, this is fine when the user doesn't override the default persist implementation.
One approach that I think might be the best for both use cases is to instead remove Pipeline.persist and the call to persist from VectorPipe::apply entirely, and instead just let the user do what they want with the returned vector tiles rather than making the assumption for them. The vectorpipe.vectortile.export.saveVectorTiles() util can maybe be used by the user in the same way that vectorpipe.OSM.toGeometry() is used to prepare the input data, except here we have util to assist with what to do with the output data. The VectorPipe::apply method could also be overloaded with the additional case provided the outputUri used to invoke the default export.saveVectorTiles() which would be functionally equivalent to the current implementation except that the user provides the outputUri (like you mentioned) for cases where the default S3/HDFS output is needed immediately after the vectortiles are processed.

I'm thinking something like this:

def apply(input: DataFrame, pipeline: Pipeline, options: Options, outputUri: URI): Unit = {
  val vectorTilesByZoom = apply(input, pipeline, options)
  
  vectorTilesByZoom.foreach { case (zoom, vectorTiles) =>
    saveVectorTiles(vectorTiles, zoom, outputUri)
  }
}

def apply(input: DataFrame, pipeline: Pipeline, options: Options): Map[Int, RDD[(SpatialKey, VectorTile)]] = {
  // same impl except doesn't persist
}

Also, the overloaded apply could probably just return Unit assuming the user doesn't want to do anything else with the data after the data was written much like the current implementation.

What do you think? I have this change ready and can update this PR if it sounds good to you.

@jpolchlo
Copy link
Contributor

As much as it would help solve some weirdnesses to extract the write step out of the pipeline, it does kinda ruin the completeness of the pipeline, and so I'd rather consider some alternative.

It occurs to me that your addition of Pipeline::persist() is correct. This singles out the baseOutputURI as the odd member that should be killed from Pipeline. I would then suggest that it's up to the pipeline designer to ensure that the output location is communicated clearly when the pipeline is created. Sadly, this makes it harder to specify a default implementation of persist, and then implies that the baseOutputURI is required after all. And we're back to the same crappy starting point. Unless Pipeline has no default persist implementation, and a Pipeline subclass which provides the standard output routine and baseOutputURI is provided (FileSystemOutputPipeline? (ewww)).

On a second point, your amendment to VectorPipe::apply to have two forms makes sense, but I am a bit weirded out by the Map[Int, RDD[...]], and I wonder if an Iterator would confer some improved laziness? I'm concerned about having to do all the computation all at once and hold on to all past zoom levels to fill the map.

The Pipeline.Output mixin trait provides the existing default output behavior saving to S3 or Hadoop using the baseOutputURI which has been moved out of the base Pipeline trait. Additionally, the persist() method was renamed to finalize() to be more general since users may not want to persist immediately or even persist at all. I feel this approach provides a good abstraction while making the pipeline implementation more clear as it could then be implemented like "case class SomePipeline(baseOutputURI: URI) extends Pipeline with Pipeline.Output"

The Pipeline scaladocs were also updated and fixed to create valid links to referenced code.
@JonMcPherson
Copy link
Contributor Author

@jpolchlo Hey sorry for such a late response! I forgot about this PR, but now I'm back working with VectorPipe and I think I have a good solution which has addressed your good points.
From the commit message:

Add Pipeline.Output mixin trait and revert return type back to Unit

The Pipeline.Output mixin trait provides the existing default output behavior saving to S3 or Hadoop using the baseOutputURI which has been moved out of the base Pipeline trait. Additionally, the persist() method was renamed to finalize() to be more general since users may not want to persist immediately or even persist at all. I feel this approach provides a good abstraction while making the pipeline implementation more clear as it could then be implemented like:
case class SomePipeline(baseOutputURI: URI) extends Pipeline with Pipeline.Output

The Pipeline scaladocs were also updated and fixed to create valid links to referenced code.

The finalize method is described as

Receive the final RDD of generated vector tiles for a zoom level to finish any additional processing or handling such as persisting to a datastore.

So it does not make assumptions about what the user wants to do with the final RDD of vector tiles. In my case, I actually want to do some additional processing before persisting, so this generalization makes more sense to me. This allows separation of concern for "output" by moving the baseOutputURI to a mixin trait in the companion object enabling users to mixin the current default output behavior. Of course this would be a breaking change and so a minor version bump would probably make sense I think

Additionally, I reverted the VectorPipe.apply return type change back to Unit since the Pipeline receives the vectortiles for each zoom level anyway, so returning a Map of the RDDs is unnecessary and ugly.

val geometryColumn: String
* Name of the column containing the JTS geometry.
*/
def geometryColumn: String
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed geometryColumn and layerMultiplicity to def from val to give users more freedom in how they are defined since generally traits shouldn't define abstract vals. Also changed to make the scaladoc links work properly :p

* The only extra note is that [[vectorpipe.vectortile.Pipeline#reduce reduce]]
* and [[vectorpipe.vectortile.Pipeline#simplify simplify]] will be called prior
* to processing the initial zoom level.
*/
Copy link
Contributor Author

@JonMcPherson JonMcPherson Sep 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The github diffs make it looks like the scaladocs changed a lot, but I just fixed links and formatting (indentation was a result of my IDE). They all still say the same thing.

* Receive the final RDD of generated vector tiles for a zoom level
* to finish any additional processing or handling such as persisting to a datastore.
*/
def finalize(vectorTiles: RDD[(SpatialKey, VectorTile)], zoom: Int): Unit
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't think of a better name for this method. Alternatives I was thinking of was: finish, complete, done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finalize works for me

Copy link
Contributor

@jpolchlo jpolchlo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you nailed this. It's a nice, clean solution. Thanks for updating the docs as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants