Skip to content

Commit

Permalink
upgrade to Apache Beam 2.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mingmxu committed Aug 16, 2017
1 parent 340b326 commit 93b3e24
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 16 deletions.
5 changes: 1 addition & 4 deletions beam/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ Current interpreter implementation supports the static repl. It compiles the cod
You have to first build the Beam interpreter by enable the **beam** profile as follows:

```
mvn clean package -Pbeam -DskipTests
mvn clean package -Pbeam -DskipTests -Pscala-2.10
```

### Notice
- Flink runner comes with binary compiled for scala 2.10. So, currently we support only Scala 2.10

### Technical overview

* Upon starting an interpreter, an instance of `JavaCompiler` is created.
Expand Down
10 changes: 9 additions & 1 deletion beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<properties>
<beam.hadoop.version>2.3.0</beam.hadoop.version>
<beam.spark.version>1.6.2</beam.spark.version>
<beam.beam.version>0.2.0-incubating</beam.beam.version>
<beam.beam.version>2.0.0</beam.beam.version>

<!-- library versions -->
<netty.version>4.1.1.Final</netty.version>
Expand Down Expand Up @@ -211,6 +211,14 @@
<version>${beam.beam.version}</version>
<type>jar</type>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_${scala.binary.version}</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
</exclusions>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
14 changes: 3 additions & 11 deletions docs/interpreter/beam.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,10 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.*;
import org.apache.spark.SparkContext;
import org.apache.beam.runners.direct.*;
import org.apache.beam.sdk.runners.*;
import org.apache.beam.sdk.options.*;
import org.apache.beam.runners.spark.*;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.flink.*;
import org.apache.beam.runners.flink.examples.WordCount.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -89,12 +81,12 @@ public class MinimalWordCount {
};
static final List<String> SENTENCES = Arrays.asList(SENTENCES_ARRAY);
public static void main(String[] args) {
Options options = PipelineOptionsFactory.create().as(Options.class);
PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class);
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of()))
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@Override
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
Expand All @@ -105,7 +97,7 @@ public class MinimalWordCount {
}))
.apply(Count.<String> perElement())
.apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
@Override
@ProcessElement
public void processElement(DoFn<KV<String, Long>, String>.ProcessContext arg0)
throws Exception {
s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());
Expand Down

0 comments on commit 93b3e24

Please sign in to comment.