Skip to content

Commit

Permalink
[SPARK-2419][Streaming][Docs] Updates to the streaming programming guide
Browse files Browse the repository at this point in the history
Updated the main streaming programming guide, and also added source-specific guides for Kafka, Flume, Kinesis.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Jacek Laskowski <jacek@japila.pl>

Closes apache#2254 from tdas/streaming-doc-fix and squashes the following commits:

e45c6d7 [Jacek Laskowski] More fixes from an old PR
5125316 [Tathagata Das] Fixed links
dc02f26 [Tathagata Das] Refactored streaming kinesis guide and made many other changes.
acbc3e3 [Tathagata Das] Fixed links between streaming guides.
cb7007f [Tathagata Das] Added Streaming + Flume integration guide.
9bd9407 [Tathagata Das] Updated streaming programming guide with additional information from SPARK-2419.
  • Loading branch information
tdas committed Sep 4, 2014
1 parent 996b743 commit a522407
Show file tree
Hide file tree
Showing 5 changed files with 622 additions and 239 deletions.
132 changes: 132 additions & 0 deletions docs/streaming-flume-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
---
layout: global
title: Spark Streaming + Flume Integration Guide
---

[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.

## Approach 1: Flume-style Push-based Approach
Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps.

#### General Requirements
Choose a machine in your cluster such that

- When your Flume + Spark Streaming application is launched, one of the Spark workers must run on that machine.

- Flume can be configured to push data to a port on that machine.

Due to the push model, the streaming application needs to be up, with the receiver scheduled and listening on the chosen port, for Flume to be able push data.

#### Configuring Flume
Configure Flume agent to send data to an Avro sink by having the following in the configuration file.

agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine's hostname>
agent.sinks.avroSink.port = <chosen port on the machine>

See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about
configuring Flume agents.

#### Configuring Spark Streaming Application
1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).

groupId = org.apache.spark
artifactId = spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}

2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows.

<div class="codetabs">
<div data-lang="scala" markdown="1">
import org.apache.spark.streaming.flume._

val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala).
</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.flume.*;

JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]);

See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
</div>
</div>

Note that the hostname should be the same as the one used by the resource manager in the
cluster (Mesos, YARN or Spark Standalone), so that resource allocation can match the names and launch
the receiver in the right machine.

3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).

## Approach 2 (Experimental): Pull-based Approach using a Custom Sink
Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following.
- Flume pushes data into the sink, and the data stays buffered.
- Spark Streaming uses transactions to pull data from the sink. Transactions succeed only after data is received and replicated by Spark Streaming.
This ensures that better reliability and fault-tolerance than the previous approach. However, this requires configuring Flume to run a custom sink. Here are the configuration steps.

#### General Requirements
Choose a machine that will run the custom sink in a Flume agent. The rest of the Flume pipeline is configured to send data to that agent. Machines in the Spark cluster should have access to the chosen machine running the custom sink.

#### Configuring Flume
Configuring Flume on the chosen machine requires the following two steps.

1. **Sink JARs**: Add the following JARs to Flume's classpath (see [Flume's documentation](https://flume.apache.org/documentation.html) to see how) in the machine designated to run the custom sink .

(i) *Custom sink JAR*: Download the JAR corresponding to the following artifact (or [direct link](http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}/{{site.SPARK_VERSION_SHORT}}/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}-{{site.SPARK_VERSION_SHORT}}.jar)).

groupId = org.apache.spark
artifactId = spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}

(ii) *Scala library JAR*: Download the Scala library JAR for Scala {{site.SCALA_VERSION}}. It can be found with the following artifact detail (or, [direct link](http://search.maven.org/remotecontent?filepath=org/scala-lang/scala-library/{{site.SCALA_VERSION}}/scala-library-{{site.SCALA_VERSION}}.jar)).

groupId = org.scala-lang
artifactId = scala-library
version = {{site.SCALA_VERSION}}

2. **Configuration file**: On that machine, configure Flume agent to send data to an Avro sink by having the following in the configuration file.

agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = <port to listen on for connection from Spark>
agent.sinks.spark.channel = memoryChannel

Also make sure that the upstream Flume pipeline is configured to send the data to the Flume agent running this sink.

See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about
configuring Flume agents.

#### Configuring Spark Streaming Application
1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide).

2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows.

<div class="codetabs">
<div data-lang="scala" markdown="1">
import org.apache.spark.streaming.flume._

val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.flume.*;

JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
</div>
</div>

See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).

Note that each input DStream can be configured to receive data from multiple sinks.

3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).



42 changes: 42 additions & 0 deletions docs/streaming-kafka-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
layout: global
title: Spark Streaming + Kafka Integration Guide
---
[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka.

1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).

groupId = org.apache.spark
artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}

2. **Programming:** In the streaming application code, import `KafkaUtils` and create input DStream as follows.

<div class="codetabs">
<div data-lang="scala" markdown="1">
import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(
streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume])

See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.kafka.*;

JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(
streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume]);

See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
</div>
</div>

*Points to remember:*

- Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the `KafkaUtils.createStream()` only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.

- Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.

3. **Deploying:** Package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
110 changes: 110 additions & 0 deletions docs/streaming-kinesis-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
---
layout: global
title: Spark Streaming + Kinesis Integration
---
[Amazon Kinesis](http://aws.amazon.com/kinesis/) is a fully managed service for real-time processing of streaming data at massive scale.
The Kinesis input DStream and receiver uses the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL).
The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concept of Workers, Checkpoints, and Shard Leases.
Here we explain how to configure Spark Streaming to receive data from Kinesis.

#### Configuring Kinesis

A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or more shards per the following
[guide](http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html).


#### Configuring Spark Streaming Application

1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).

groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}

**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.**

2. **Programming:** In the streaming application code, import `KinesisUtils` and create input DStream as follows.

<div class="codetabs">
<div data-lang="scala" markdown="1">
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position])

See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the next subsection for instructions to run the example.

</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.flume.*;

JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]);

See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.

</div>
</div>

`[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

`[checkpoint interval]`: The interval at which the Kinesis client library is going to save its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

`[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see later section and Amazon Kinesis API documentation for more details).

*Points to remember:*

- The name used in the context of the streaming application must be unique for a given account and region. Changing the app name or stream name could lead to Kinesis errors as only a single logical application can process a single stream.
- A single Kinesis input DStream can receive many Kinesis shards by spinning up multiple KinesisRecordProcessor threads. Note that there is no correlation between number of shards in Kinesis and the number of partitions in the generated RDDs that is used for processing the data.
- You never need more KinesisReceivers than the number of shards in your stream as each will spin up at least one KinesisRecordProcessor thread.
- Horizontal scaling is achieved by autoscaling additional Kinesis input DStreams (separate processes) up to the number of current shards for a given stream, of course.

3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).

- A DynamoDB table and CloudWatch namespace are created during KCL initialization using this Kinesis application name. This DynamoDB table lives in the us-east-1 region regardless of the Kinesis endpoint URL. It is used to store KCL's checkpoint information.

- If you are seeing errors after changing the app name or stream name, it may be necessary to manually delete the DynamoDB table and start from scratch.

#### Running the Example
To run the example,
- Download Spark source and follow the [instructions](building-with-maven.html) to build Spark with profile *-Pkinesis-asl*.

mvn -Pkinesis-asl -DskipTests clean package

- Set up Kinesis stream (see earlier section). Note the name of the Kinesis stream, and the endpoint URL corresponding to the region the stream is based on.

- Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials.

- In the Spark root directory, run the example as
<div class="codetabs">
<div data-lang="scala" markdown="1">

bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL]

</div>
<div data-lang="java" markdown="1">

bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL]

</div>
</div>

This will wait for data to be received from Kinesis.

- To generate random string data, in another terminal, run the associated Kinesis data producer.

bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10

This will push random words to the Kinesis stream, which should then be received and processed by the running example.

#### Kinesis Checkpointing
The Kinesis receiver checkpoints the position of the stream that has been read periodically, so that the system can recover from failures and continue processing where it had left off. Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy.

- If no Kinesis checkpoint info exists, the KinesisReceiver will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable.

- InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running (and no checkpoint info is being stored). In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data.

- InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency.
Loading

0 comments on commit a522407

Please sign in to comment.