VectorPipe (VP) is a library for working with OpenStreetMap (OSM) vector data. Powered by Geotrellis and Apache Spark.
OSM provides a wealth of data which has broad coverage and a deep history. This comes at the price of very large size which can make accessing the power of OSM difficult. VectorPipe can help by making OSM processing in Apache Spark possible, leveraging large computing clusters to churn through the large volume of, say, an OSM full history file.
For those cases where an application needs to process incoming changes, VP
also provides streaming Spark DataSource
s for changesets, OsmChange files,
and Augmented diffs generated by Overpass.
For ease of use, the output of VP imports is a Spark DataFrame containing
columns of JTS Geometry
objects, enabled by the user-defined types provided
by GeoMesa. That package also
provides functions for manipulating those geometries via Spark SQL directives.
The final important contribution is a set of functions for exporting
geometries to vector tiles. This leans on the geotrellis-vectortile
package.
The fastest way to get started with VectorPipe is to invoke spark-shell
and
load the package jars from the Bintray repository:
spark-shell --packages com.azavea:vectorpipe_2.11:1.0.0 --repositories http://dl.bintray.com/azavea/maven
This will download the required components and set up a REPL with VectorPipe available. At which point, you may issue
// Make JTS types available to Spark
import org.locationtech.geomesa.spark.jts._
spark.withJTS
import vectorpipe._
and begin using the package.
Your local machine is probably insufficient for dealing with very large OSM
files. We recommend the use of Amazon's Elastic Map Reduce (EMR) service to
provision substantial clusters of computing resources. You'll want to supply
Spark, Hive, and Hadoop to your cluster, with Spark version 2.3. Creating a
cluster with EMR version between 5.13 and 5.19 should suffice. From there,
ssh
into the master node and run spark-shell
as above for an interactive
environment, or use spark-submit
for batch jobs. (You may submit Steps to
the EMR cluster using spark-submit
as well.)
Batch analysis can be performed in a few different ways. Perhaps the fastest way is to procure an OSM PBF file from a source such as GeoFabrik, which supplies various extracts of OSM, including the full planet worth of data.
VectorPipe does not provide the means to directly read these OSM PBF files,
however, and a conversion to a useful file format will thus be needed. We
suggest using osm2orc
to convert your
source file to the ORC format which can be read natively via Spark:
val df = spark.read.orc(path)
The resulting DataFrame
can be processed with VectorPipe.
It is also possible to read from a cache of OsmChange files directly rather than convert the PBF file:
import vectorpipe.sources.Source
val df = spark.read
.format(Source.Changes)
.options(Map[String, String](
Source.BaseURI -> "https://download.geofabrik.de/europe/isle-of-man-updates/",
Source.StartSequence -> "2080",
Source.EndSequence -> "2174",
Source.BatchSize -> "1"))
.load
.persist // recommended to avoid rereading
(Note that the start and end sequence will shift over time for Geofabrik. Please navigate to the base URI to determine these values, otherwise timeouts may occur.) This may issue errors, but should complete. This is much slower than using ORC files and is much touchier, but it stands as an option.
[It is also possible to build a dataframe from a stream of changesets in a
similar manner as above. Changesets carry additional metadata regarding the
author of the changes, but none of the geometric information. These tables
can be joined on changeset
.]
In either case, a useful place to start is to convert the incoming dataframe into a more usable format. We recommend calling
val geoms = OSM.toGeometry(df)
which will produce a frame consisting of "top-level" entities, which is to say
nodes that don't participate in a way, ways that don't participate in
relations, and a subset of the relations from the OSM data. The resulting
dataframe will represent these entities with JTS geometries in the geom
column.
The toGeometry
function keeps elements that fit one of the following
descriptions:
- points from tagged nodes (including tags that really ought to be dropped—e.g.
source=*
); - polygons derived from ways with tags that cause them to be considered as areas;
- lines from ways lacking area tags;
- multipolygons from multipolygon or boundary relations; and
- multilinestrings from route relations.
It is also possible to filter the results based on information in the tags. For instance, all buildings can be found as
import vectorpipe.functions.osm._
val buildings = geoms.filter(isBuilding('tags))
Again, the JTS user defined types allow for easier manipulation of and calculation from geometric types. See here for a list of functions that operate on geometries.
While most users will rely solely on the features exposed by the OSM
object,
finer-grained control of the output of the process—say, if one does not need
relations, for example—is available through the vectorpipe.internal
package.
There is a significant caveat here: there are two schemas that are
found in the system when working with imported OSM dataframes. The difference
is in the type of a sub-field of the members
list. This can cause errors of
the form
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Byte
when using the internal
package methods.
These type problems can be fixed by calling
vectorpipe.functions.osm.ensureCompressedMembers
on the input OSM data frame
before passing to any relation-generating functions, such as
reconstructRelationGeometries
. Top-level functions in the OSM
object
handle this conversion for you. Note that this only affects the data frames
carrying the initially imported OSM data.
If you are intending to contribute to VectorPipe, you may need to work with a development version. If that is the case, instead of loading from Bintray, you will need to build a fat jar using
./sbt assembly
and following that,
spark-shell --jars target/scala_2.11/vectorpipe.jar
When developing with IntelliJ IDEA, the sbt plugin will see Spark dependencies
as provided, which will prevent them from being indexed properly, resulting in
errors / warnings within the IDE. To fix this, create idea.sbt
at the root of
the project:
import Dependencies._
lazy val mainRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings(
libraryDependencies ++= Seq(
sparkSql % Compile
)
)