Skip to content

Commit

Permalink
[ZEPPELIN-2865] upgrade Beam interpreter to latest version
Browse files Browse the repository at this point in the history
### What is this PR for?
upgrade Beam interpreter to use the latest version of Apache Beam.

### What type of PR is it?
[Improvement]

### Todos
*

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2865

### How should this be tested?
* Start the Zeppelin server
* The prefix of interpreter is %beam and then write your code with required imports and the runner

Refer to `docs/interpreter/beam.md` for an example;

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update?  no
* Is there breaking changes for older versions? no
* Does this needs documentation? yes, updated `docs/interpreter/beam.md` and `README.md`

Author: mingmxu <mingmxu@ebay.com>

Closes apache#2541 from XuMingmin/ZEPPELIN-2865 and squashes the following commits:

520f0fd [mingmxu] restore the notice message of scala-2.10
93b3e24 [mingmxu] upgrade to Apache Beam 2.0.0
  • Loading branch information
mingmxu authored and 1ambda committed Aug 20, 2017
1 parent 7b5db04 commit b87bcf5
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion beam/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Current interpreter implementation supports the static repl. It compiles the cod
You have to first build the Beam interpreter by enable the **beam** profile as follows:

```
mvn clean package -Pbeam -DskipTests
mvn clean package -Pbeam -DskipTests -Pscala-2.10
```

### Notice
Expand Down
10 changes: 9 additions & 1 deletion beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<properties>
<beam.hadoop.version>2.3.0</beam.hadoop.version>
<beam.spark.version>1.6.2</beam.spark.version>
<beam.beam.version>0.2.0-incubating</beam.beam.version>
<beam.beam.version>2.0.0</beam.beam.version>

<!-- library versions -->
<netty.version>4.1.1.Final</netty.version>
Expand Down Expand Up @@ -211,6 +211,14 @@
<version>${beam.beam.version}</version>
<type>jar</type>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_${scala.binary.version}</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
</exclusions>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
14 changes: 3 additions & 11 deletions docs/interpreter/beam.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,10 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.*;
import org.apache.spark.SparkContext;
import org.apache.beam.runners.direct.*;
import org.apache.beam.sdk.runners.*;
import org.apache.beam.sdk.options.*;
import org.apache.beam.runners.spark.*;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.flink.*;
import org.apache.beam.runners.flink.examples.WordCount.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -89,12 +81,12 @@ public class MinimalWordCount {
};
static final List<String> SENTENCES = Arrays.asList(SENTENCES_ARRAY);
public static void main(String[] args) {
Options options = PipelineOptionsFactory.create().as(Options.class);
PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class);
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of()))
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@Override
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
Expand All @@ -105,7 +97,7 @@ public class MinimalWordCount {
}))
.apply(Count.<String> perElement())
.apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
@Override
@ProcessElement
public void processElement(DoFn<KV<String, Long>, String>.ProcessContext arg0)
throws Exception {
s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());
Expand Down

0 comments on commit b87bcf5

Please sign in to comment.