From b87bcf5a996dbf40b2d4d9d2804e422235e47d6a Mon Sep 17 00:00:00 2001 From: mingmxu Date: Wed, 16 Aug 2017 09:47:25 -0700 Subject: [PATCH] [ZEPPELIN-2865] upgrade Beam interpreter to latest version ### What is this PR for? upgrade Beam interpreter to use the latest version of Apache Beam. ### What type of PR is it? [Improvement] ### Todos * ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2865 ### How should this be tested? * Start the Zeppelin server * The prefix of interpreter is %beam and then write your code with required imports and the runner Refer to `docs/interpreter/beam.md` for an example; ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? yes, updated `docs/interpreter/beam.md` and `README.md` Author: mingmxu Closes #2541 from XuMingmin/ZEPPELIN-2865 and squashes the following commits: 520f0fd7 [mingmxu] restore the notice message of scala-2.10 93b3e24d [mingmxu] upgrade to Apache Beam 2.0.0 --- beam/README.md | 2 +- beam/pom.xml | 10 +++++++++- docs/interpreter/beam.md | 14 +++----------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/beam/README.md b/beam/README.md index 57150a0208a..948c95cfc0f 100644 --- a/beam/README.md +++ b/beam/README.md @@ -8,7 +8,7 @@ Current interpreter implementation supports the static repl. It compiles the cod You have to first build the Beam interpreter by enable the **beam** profile as follows: ``` -mvn clean package -Pbeam -DskipTests +mvn clean package -Pbeam -DskipTests -Pscala-2.10 ``` ### Notice diff --git a/beam/pom.xml b/beam/pom.xml index c02695c460d..166652793fe 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -35,7 +35,7 @@ 2.3.0 1.6.2 - 0.2.0-incubating + 2.0.0 4.1.1.Final @@ -211,6 +211,14 @@ ${beam.beam.version} jar + + + org.apache.beam + beam-runners-flink_${scala.binary.version} + ${beam.beam.version} + + + ${project.groupId} diff --git a/docs/interpreter/beam.md b/docs/interpreter/beam.md index cbcd5e37d51..d992b8ee5b5 100644 --- a/docs/interpreter/beam.md +++ b/docs/interpreter/beam.md @@ -44,18 +44,10 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.ArrayList; -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.*; -import org.apache.spark.SparkContext; import org.apache.beam.runners.direct.*; import org.apache.beam.sdk.runners.*; import org.apache.beam.sdk.options.*; -import org.apache.beam.runners.spark.*; -import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.flink.*; -import org.apache.beam.runners.flink.examples.WordCount.Options; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -89,12 +81,12 @@ public class MinimalWordCount { }; static final List SENTENCES = Arrays.asList(SENTENCES_ARRAY); public static void main(String[] args) { - Options options = PipelineOptionsFactory.create().as(Options.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of())) .apply("ExtractWords", ParDo.of(new DoFn() { - @Override + @ProcessElement public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { @@ -105,7 +97,7 @@ public class MinimalWordCount { })) .apply(Count. perElement()) .apply("FormatResults", ParDo.of(new DoFn, String>() { - @Override + @ProcessElement public void processElement(DoFn, String>.ProcessContext arg0) throws Exception { s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());