Skip to content

Commit

Permalink
STORM-2490: support user defined output fields
Browse files Browse the repository at this point in the history
  • Loading branch information
vesense committed Apr 26, 2017
1 parent 4d8efae commit ce58ae5
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ protected int run(String[] args) throws Exception {
// (or it will cause not serializable exception).
Prefix prefix = new Prefix("Hello lambda:");
String suffix = ":so cool!";
int tag = 999;

builder.setSpout("spout1", () -> UUID.randomUUID().toString());
builder.setBolt("bolt1", (tuple, collector) -> {
String[] parts = tuple.getStringByField("lambda").split("\\-");
collector.emit(new Values(prefix + parts[0] + suffix));
}).shuffleGrouping("spout1");
collector.emit(new Values(prefix + parts[0] + suffix, tag));
}, "strValue", "intValue").shuffleGrouping("spout1");
builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1");

Config conf = new Config();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,29 @@
package org.apache.storm.lambda;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

public class LambdaBiConsumerBolt extends AbstractLambdaBolt {
public class LambdaBiConsumerBolt extends BaseBasicBolt {

private SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer;

public LambdaBiConsumerBolt(SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer) {
private String[] fields;

public LambdaBiConsumerBolt(SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, String[] fields) {
this.biConsumer = biConsumer;
this.fields = fields;
}

@Override
public void execute(Tuple input, BasicOutputCollector collector) {
biConsumer.accept(input, collector);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(fields));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.storm.lambda;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

public class LambdaConsumerBolt extends AbstractLambdaBolt {
public class LambdaConsumerBolt extends BaseBasicBolt {

private SerializableConsumer<Tuple> consumer;

Expand All @@ -33,4 +35,8 @@ public void execute(Tuple input, BasicOutputCollector collector) {
consumer.accept(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt dosen't emit to downstream bolts
}
}
4 changes: 2 additions & 2 deletions storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import java.util.Map;

public class LambdaSpout extends BaseRichSpout {
private SerializableSupplier<Object> supplier;
private SerializableSupplier<?> supplier;
private SpoutOutputCollector collector;

public LambdaSpout(SerializableSupplier<Object> supplier) {
public LambdaSpout(SerializableSupplier<?> supplier) {
this.supplier = supplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,12 @@ public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param biConsumer lambda expression that implements tuple processing for this bolt
* @param fields fields for tuple that should be emitted to downstream bolts
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer) throws IllegalArgumentException {
return setBolt(id, biConsumer, null);
public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, String... fields) throws IllegalArgumentException {
return setBolt(id, biConsumer, null, fields);
}

/**
Expand All @@ -344,12 +345,13 @@ public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputC
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param biConsumer lambda expression that implements tuple processing for this bolt
* @param fields fields for tuple that should be emitted to downstream bolts
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, Number parallelism_hint) throws IllegalArgumentException {
return setBolt(id, new LambdaBiConsumerBolt(biConsumer), parallelism_hint);
public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, Number parallelism_hint, String... fields) throws IllegalArgumentException {
return setBolt(id, new LambdaBiConsumerBolt(biConsumer, fields), parallelism_hint);
}

/**
Expand Down Expand Up @@ -427,7 +429,7 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel
* @param supplier lambda expression that implements tuple generating for this spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier) throws IllegalArgumentException {
public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier) throws IllegalArgumentException {
return setSpout(id, supplier, null);
}

Expand All @@ -441,7 +443,7 @@ public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier)
* @param supplier lambda expression that implements tuple generating for this spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier, Number parallelism_hint) throws IllegalArgumentException {
public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier, Number parallelism_hint) throws IllegalArgumentException {
return setSpout(id, new LambdaSpout(supplier), parallelism_hint);
}

Expand Down

0 comments on commit ce58ae5

Please sign in to comment.