From ce58ae5388e5d8c60a511e239ecb57bdbfc17437 Mon Sep 17 00:00:00 2001 From: vesense Date: Wed, 26 Apr 2017 17:20:52 +0800 Subject: [PATCH] STORM-2490: support user defined output fields --- .../apache/storm/starter/LambdaTopology.java | 5 ++-- .../storm/lambda/AbstractLambdaBolt.java | 30 ------------------- .../storm/lambda/LambdaBiConsumerBolt.java | 14 +++++++-- .../storm/lambda/LambdaConsumerBolt.java | 8 ++++- .../org/apache/storm/lambda/LambdaSpout.java | 4 +-- .../storm/topology/TopologyBuilder.java | 14 +++++---- 6 files changed, 32 insertions(+), 43 deletions(-) delete mode 100644 storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java index 66307ef6429..61b02dbbd8c 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java @@ -44,12 +44,13 @@ protected int run(String[] args) throws Exception { // (or it will cause not serializable exception). Prefix prefix = new Prefix("Hello lambda:"); String suffix = ":so cool!"; + int tag = 999; builder.setSpout("spout1", () -> UUID.randomUUID().toString()); builder.setBolt("bolt1", (tuple, collector) -> { String[] parts = tuple.getStringByField("lambda").split("\\-"); - collector.emit(new Values(prefix + parts[0] + suffix)); - }).shuffleGrouping("spout1"); + collector.emit(new Values(prefix + parts[0] + suffix, tag)); + }, "strValue", "intValue").shuffleGrouping("spout1"); builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1"); Config conf = new Config(); diff --git a/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java deleted file mode 100644 index 3ecc4572461..00000000000 --- a/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.lambda; - -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; - -public abstract class AbstractLambdaBolt extends BaseBasicBolt { - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("lambda")); - } -} diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java index 96d48a251a4..7e7de9cf1bf 100644 --- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java +++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java @@ -18,14 +18,20 @@ package org.apache.storm.lambda; import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; -public class LambdaBiConsumerBolt extends AbstractLambdaBolt { +public class LambdaBiConsumerBolt extends BaseBasicBolt { private SerializableBiConsumer biConsumer; - public LambdaBiConsumerBolt(SerializableBiConsumer biConsumer) { + private String[] fields; + + public LambdaBiConsumerBolt(SerializableBiConsumer biConsumer, String[] fields) { this.biConsumer = biConsumer; + this.fields = fields; } @Override @@ -33,4 +39,8 @@ public void execute(Tuple input, BasicOutputCollector collector) { biConsumer.accept(input, collector); } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(fields)); + } } diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java index 29bb32e0d48..d9114ed8ebc 100644 --- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java +++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java @@ -18,9 +18,11 @@ package org.apache.storm.lambda; import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; -public class LambdaConsumerBolt extends AbstractLambdaBolt { +public class LambdaConsumerBolt extends BaseBasicBolt { private SerializableConsumer consumer; @@ -33,4 +35,8 @@ public void execute(Tuple input, BasicOutputCollector collector) { consumer.accept(input); } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // this bolt dosen't emit to downstream bolts + } } diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java index 51593b50720..6d0ba3a10c6 100644 --- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java +++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java @@ -27,10 +27,10 @@ import java.util.Map; public class LambdaSpout extends BaseRichSpout { - private SerializableSupplier supplier; + private SerializableSupplier supplier; private SpoutOutputCollector collector; - public LambdaSpout(SerializableSupplier supplier) { + public LambdaSpout(SerializableSupplier supplier) { this.supplier = supplier; } diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java index 23c55387265..d8d871145bc 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java +++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java @@ -329,11 +329,12 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt biConsumer) throws IllegalArgumentException { - return setBolt(id, biConsumer, null); + public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, String... fields) throws IllegalArgumentException { + return setBolt(id, biConsumer, null, fields); } /** @@ -344,12 +345,13 @@ public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, Number parallelism_hint) throws IllegalArgumentException { - return setBolt(id, new LambdaBiConsumerBolt(biConsumer), parallelism_hint); + public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, Number parallelism_hint, String... fields) throws IllegalArgumentException { + return setBolt(id, new LambdaBiConsumerBolt(biConsumer, fields), parallelism_hint); } /** @@ -427,7 +429,7 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel * @param supplier lambda expression that implements tuple generating for this spout * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ - public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) throws IllegalArgumentException { + public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) throws IllegalArgumentException { return setSpout(id, supplier, null); } @@ -441,7 +443,7 @@ public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) * @param supplier lambda expression that implements tuple generating for this spout * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ - public SpoutDeclarer setSpout(String id, SerializableSupplier supplier, Number parallelism_hint) throws IllegalArgumentException { + public SpoutDeclarer setSpout(String id, SerializableSupplier supplier, Number parallelism_hint) throws IllegalArgumentException { return setSpout(id, new LambdaSpout(supplier), parallelism_hint); }