diff --git a/beam/README.md b/beam/README.md index 57150a0208a..948c95cfc0f 100644 --- a/beam/README.md +++ b/beam/README.md @@ -8,7 +8,7 @@ Current interpreter implementation supports the static repl. It compiles the cod You have to first build the Beam interpreter by enable the **beam** profile as follows: ``` -mvn clean package -Pbeam -DskipTests +mvn clean package -Pbeam -DskipTests -Pscala-2.10 ``` ### Notice diff --git a/beam/pom.xml b/beam/pom.xml index c02695c460d..166652793fe 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -35,7 +35,7 @@ 2.3.0 1.6.2 - 0.2.0-incubating + 2.0.0 4.1.1.Final @@ -211,6 +211,14 @@ ${beam.beam.version} jar + + + org.apache.beam + beam-runners-flink_${scala.binary.version} + ${beam.beam.version} + + + ${project.groupId} diff --git a/docs/interpreter/beam.md b/docs/interpreter/beam.md index cbcd5e37d51..d992b8ee5b5 100644 --- a/docs/interpreter/beam.md +++ b/docs/interpreter/beam.md @@ -44,18 +44,10 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.ArrayList; -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.*; -import org.apache.spark.SparkContext; import org.apache.beam.runners.direct.*; import org.apache.beam.sdk.runners.*; import org.apache.beam.sdk.options.*; -import org.apache.beam.runners.spark.*; -import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.flink.*; -import org.apache.beam.runners.flink.examples.WordCount.Options; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -89,12 +81,12 @@ public class MinimalWordCount { }; static final List SENTENCES = Arrays.asList(SENTENCES_ARRAY); public static void main(String[] args) { - Options options = PipelineOptionsFactory.create().as(Options.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of())) .apply("ExtractWords", ParDo.of(new DoFn() { - @Override + @ProcessElement public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { @@ -105,7 +97,7 @@ public class MinimalWordCount { })) .apply(Count. perElement()) .apply("FormatResults", ParDo.of(new DoFn, String>() { - @Override + @ProcessElement public void processElement(DoFn, String>.ProcessContext arg0) throws Exception { s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());