forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-2419][Streaming][Docs] Updates to the streaming programming guide
Updated the main streaming programming guide, and also added source-specific guides for Kafka, Flume, Kinesis. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Jacek Laskowski <jacek@japila.pl> Closes apache#2254 from tdas/streaming-doc-fix and squashes the following commits: e45c6d7 [Jacek Laskowski] More fixes from an old PR 5125316 [Tathagata Das] Fixed links dc02f26 [Tathagata Das] Refactored streaming kinesis guide and made many other changes. acbc3e3 [Tathagata Das] Fixed links between streaming guides. cb7007f [Tathagata Das] Added Streaming + Flume integration guide. 9bd9407 [Tathagata Das] Updated streaming programming guide with additional information from SPARK-2419.
- Loading branch information
Showing
5 changed files
with
622 additions
and
239 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
--- | ||
layout: global | ||
title: Spark Streaming + Flume Integration Guide | ||
--- | ||
|
||
[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this. | ||
|
||
## Approach 1: Flume-style Push-based Approach | ||
Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps. | ||
|
||
#### General Requirements | ||
Choose a machine in your cluster such that | ||
|
||
- When your Flume + Spark Streaming application is launched, one of the Spark workers must run on that machine. | ||
|
||
- Flume can be configured to push data to a port on that machine. | ||
|
||
Due to the push model, the streaming application needs to be up, with the receiver scheduled and listening on the chosen port, for Flume to be able push data. | ||
|
||
#### Configuring Flume | ||
Configure Flume agent to send data to an Avro sink by having the following in the configuration file. | ||
|
||
agent.sinks = avroSink | ||
agent.sinks.avroSink.type = avro | ||
agent.sinks.avroSink.channel = memoryChannel | ||
agent.sinks.avroSink.hostname = <chosen machine's hostname> | ||
agent.sinks.avroSink.port = <chosen port on the machine> | ||
|
||
See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about | ||
configuring Flume agents. | ||
|
||
#### Configuring Spark Streaming Application | ||
1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). | ||
|
||
groupId = org.apache.spark | ||
artifactId = spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} | ||
version = {{site.SPARK_VERSION_SHORT}} | ||
|
||
2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows. | ||
|
||
<div class="codetabs"> | ||
<div data-lang="scala" markdown="1"> | ||
import org.apache.spark.streaming.flume._ | ||
|
||
val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]) | ||
|
||
See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$) | ||
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala). | ||
</div> | ||
<div data-lang="java" markdown="1"> | ||
import org.apache.spark.streaming.flume.*; | ||
|
||
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = | ||
FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]); | ||
|
||
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html) | ||
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java). | ||
</div> | ||
</div> | ||
|
||
Note that the hostname should be the same as the one used by the resource manager in the | ||
cluster (Mesos, YARN or Spark Standalone), so that resource allocation can match the names and launch | ||
the receiver in the right machine. | ||
|
||
3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). | ||
|
||
## Approach 2 (Experimental): Pull-based Approach using a Custom Sink | ||
Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following. | ||
- Flume pushes data into the sink, and the data stays buffered. | ||
- Spark Streaming uses transactions to pull data from the sink. Transactions succeed only after data is received and replicated by Spark Streaming. | ||
This ensures that better reliability and fault-tolerance than the previous approach. However, this requires configuring Flume to run a custom sink. Here are the configuration steps. | ||
|
||
#### General Requirements | ||
Choose a machine that will run the custom sink in a Flume agent. The rest of the Flume pipeline is configured to send data to that agent. Machines in the Spark cluster should have access to the chosen machine running the custom sink. | ||
|
||
#### Configuring Flume | ||
Configuring Flume on the chosen machine requires the following two steps. | ||
|
||
1. **Sink JARs**: Add the following JARs to Flume's classpath (see [Flume's documentation](https://flume.apache.org/documentation.html) to see how) in the machine designated to run the custom sink . | ||
|
||
(i) *Custom sink JAR*: Download the JAR corresponding to the following artifact (or [direct link](http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}/{{site.SPARK_VERSION_SHORT}}/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}-{{site.SPARK_VERSION_SHORT}}.jar)). | ||
|
||
groupId = org.apache.spark | ||
artifactId = spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}} | ||
version = {{site.SPARK_VERSION_SHORT}} | ||
|
||
(ii) *Scala library JAR*: Download the Scala library JAR for Scala {{site.SCALA_VERSION}}. It can be found with the following artifact detail (or, [direct link](http://search.maven.org/remotecontent?filepath=org/scala-lang/scala-library/{{site.SCALA_VERSION}}/scala-library-{{site.SCALA_VERSION}}.jar)). | ||
|
||
groupId = org.scala-lang | ||
artifactId = scala-library | ||
version = {{site.SCALA_VERSION}} | ||
|
||
2. **Configuration file**: On that machine, configure Flume agent to send data to an Avro sink by having the following in the configuration file. | ||
|
||
agent.sinks = spark | ||
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink | ||
agent.sinks.spark.hostname = <hostname of the local machine> | ||
agent.sinks.spark.port = <port to listen on for connection from Spark> | ||
agent.sinks.spark.channel = memoryChannel | ||
|
||
Also make sure that the upstream Flume pipeline is configured to send the data to the Flume agent running this sink. | ||
|
||
See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about | ||
configuring Flume agents. | ||
|
||
#### Configuring Spark Streaming Application | ||
1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide). | ||
|
||
2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows. | ||
|
||
<div class="codetabs"> | ||
<div data-lang="scala" markdown="1"> | ||
import org.apache.spark.streaming.flume._ | ||
|
||
val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]) | ||
</div> | ||
<div data-lang="java" markdown="1"> | ||
import org.apache.spark.streaming.flume.*; | ||
|
||
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream = | ||
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]); | ||
</div> | ||
</div> | ||
|
||
See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala). | ||
|
||
Note that each input DStream can be configured to receive data from multiple sinks. | ||
|
||
3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
--- | ||
layout: global | ||
title: Spark Streaming + Kafka Integration Guide | ||
--- | ||
[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka. | ||
|
||
1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). | ||
|
||
groupId = org.apache.spark | ||
artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}} | ||
version = {{site.SPARK_VERSION_SHORT}} | ||
|
||
2. **Programming:** In the streaming application code, import `KafkaUtils` and create input DStream as follows. | ||
|
||
<div class="codetabs"> | ||
<div data-lang="scala" markdown="1"> | ||
import org.apache.spark.streaming.kafka._ | ||
|
||
val kafkaStream = KafkaUtils.createStream( | ||
streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume]) | ||
|
||
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$) | ||
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala). | ||
</div> | ||
<div data-lang="java" markdown="1"> | ||
import org.apache.spark.streaming.kafka.*; | ||
|
||
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream( | ||
streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume]); | ||
|
||
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html) | ||
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java). | ||
</div> | ||
</div> | ||
|
||
*Points to remember:* | ||
|
||
- Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the `KafkaUtils.createStream()` only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that. | ||
|
||
- Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers. | ||
|
||
3. **Deploying:** Package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
--- | ||
layout: global | ||
title: Spark Streaming + Kinesis Integration | ||
--- | ||
[Amazon Kinesis](http://aws.amazon.com/kinesis/) is a fully managed service for real-time processing of streaming data at massive scale. | ||
The Kinesis input DStream and receiver uses the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL). | ||
The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concept of Workers, Checkpoints, and Shard Leases. | ||
Here we explain how to configure Spark Streaming to receive data from Kinesis. | ||
|
||
#### Configuring Kinesis | ||
|
||
A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or more shards per the following | ||
[guide](http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html). | ||
|
||
|
||
#### Configuring Spark Streaming Application | ||
|
||
1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). | ||
|
||
groupId = org.apache.spark | ||
artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} | ||
version = {{site.SPARK_VERSION_SHORT}} | ||
|
||
**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.** | ||
|
||
2. **Programming:** In the streaming application code, import `KinesisUtils` and create input DStream as follows. | ||
|
||
<div class="codetabs"> | ||
<div data-lang="scala" markdown="1"> | ||
import org.apache.spark.streaming.kinesis._ | ||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream | ||
|
||
val kinesisStream = KinesisUtils.createStream( | ||
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]) | ||
|
||
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$) | ||
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the next subsection for instructions to run the example. | ||
|
||
</div> | ||
<div data-lang="java" markdown="1"> | ||
import org.apache.spark.streaming.flume.*; | ||
|
||
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream( | ||
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]); | ||
|
||
See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html) | ||
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example. | ||
|
||
</div> | ||
</div> | ||
|
||
`[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region). | ||
|
||
`[checkpoint interval]`: The interval at which the Kinesis client library is going to save its position in the stream. For starters, set it to the same as the batch interval of the streaming application. | ||
|
||
`[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see later section and Amazon Kinesis API documentation for more details). | ||
|
||
*Points to remember:* | ||
|
||
- The name used in the context of the streaming application must be unique for a given account and region. Changing the app name or stream name could lead to Kinesis errors as only a single logical application can process a single stream. | ||
- A single Kinesis input DStream can receive many Kinesis shards by spinning up multiple KinesisRecordProcessor threads. Note that there is no correlation between number of shards in Kinesis and the number of partitions in the generated RDDs that is used for processing the data. | ||
- You never need more KinesisReceivers than the number of shards in your stream as each will spin up at least one KinesisRecordProcessor thread. | ||
- Horizontal scaling is achieved by autoscaling additional Kinesis input DStreams (separate processes) up to the number of current shards for a given stream, of course. | ||
|
||
3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). | ||
|
||
- A DynamoDB table and CloudWatch namespace are created during KCL initialization using this Kinesis application name. This DynamoDB table lives in the us-east-1 region regardless of the Kinesis endpoint URL. It is used to store KCL's checkpoint information. | ||
|
||
- If you are seeing errors after changing the app name or stream name, it may be necessary to manually delete the DynamoDB table and start from scratch. | ||
|
||
#### Running the Example | ||
To run the example, | ||
- Download Spark source and follow the [instructions](building-with-maven.html) to build Spark with profile *-Pkinesis-asl*. | ||
|
||
mvn -Pkinesis-asl -DskipTests clean package | ||
|
||
- Set up Kinesis stream (see earlier section). Note the name of the Kinesis stream, and the endpoint URL corresponding to the region the stream is based on. | ||
|
||
- Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials. | ||
|
||
- In the Spark root directory, run the example as | ||
<div class="codetabs"> | ||
<div data-lang="scala" markdown="1"> | ||
|
||
bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL] | ||
|
||
</div> | ||
<div data-lang="java" markdown="1"> | ||
|
||
bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL] | ||
|
||
</div> | ||
</div> | ||
|
||
This will wait for data to be received from Kinesis. | ||
|
||
- To generate random string data, in another terminal, run the associated Kinesis data producer. | ||
|
||
bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10 | ||
|
||
This will push random words to the Kinesis stream, which should then be received and processed by the running example. | ||
|
||
#### Kinesis Checkpointing | ||
The Kinesis receiver checkpoints the position of the stream that has been read periodically, so that the system can recover from failures and continue processing where it had left off. Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy. | ||
|
||
- If no Kinesis checkpoint info exists, the KinesisReceiver will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable. | ||
|
||
- InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running (and no checkpoint info is being stored). In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data. | ||
|
||
- InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency. |
Oops, something went wrong.