diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 816f5dd1160..5ee1a6118fe 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -75,6 +75,14 @@
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.google.guava
+ guava
+ org.mockito
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 373630eee5b..b7e31368936 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -23,6 +23,11 @@
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -35,6 +40,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -45,6 +51,7 @@
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.internal.CommitMetadata;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
@@ -63,6 +70,7 @@ public class KafkaSpout extends BaseRichSpout {
//Initial delay for the commit and subscription refresh timers
public static final long TIMER_DELAY_MS = 500;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
+ private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
// Storm
protected SpoutOutputCollector collector;
@@ -94,12 +102,14 @@ public class KafkaSpout extends BaseRichSpout {
// Triggers when a subscription should be refreshed
private transient Timer refreshSubscriptionTimer;
private transient TopologyContext context;
+ // Metadata information to commit to Kafka. It is unique per spout per topology.
+ private transient String commitMetadata;
public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) {
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
}
- //This constructor is here for testing
+ @VisibleForTesting
KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory kafkaConsumerFactory) {
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.kafkaSpoutConfig = kafkaSpoutConfig;
@@ -129,12 +139,23 @@ public void open(Map conf, TopologyContext context, SpoutOutputC
offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = Collections.emptyListIterator();
+ setCommitMetadata(context);
tupleListener.open(conf, context);
LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}
+ private void setCommitMetadata(TopologyContext context) {
+ try {
+ commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
+ context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
+ } catch (JsonProcessingException e) {
+ LOG.error("Failed to create Kafka commit metadata due to JSON serialization error",e);
+ throw new RuntimeException(e);
+ }
+ }
+
private boolean isAtLeastOnceProcessing() {
return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
}
@@ -143,7 +164,7 @@ private boolean isAtLeastOnceProcessing() {
private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
private Collection previousAssignment = new HashSet<>();
-
+
@Override
public void onPartitionsRevoked(Collection partitions) {
previousAssignment = partitions;
@@ -167,7 +188,7 @@ public void onPartitionsAssigned(Collection partitions) {
private void initialize(Collection partitions) {
if (isAtLeastOnceProcessing()) {
- // remove from acked all partitions that are no longer assigned to this spout
+ // remove offsetManagers for all partitions that are no longer assigned to this spout
offsetManagers.keySet().retainAll(partitions);
retryService.retainAll(partitions);
@@ -180,14 +201,14 @@ private void initialize(Collection partitions) {
Set newPartitions = new HashSet<>(partitions);
newPartitions.removeAll(previousAssignment);
- for (TopicPartition tp : newPartitions) {
- final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
- final long fetchOffset = doSeek(tp, committedOffset);
- LOG.debug("Set consumer position to [{}] for topic-partition [{}], based on strategy [{}] and committed offset [{}]",
- fetchOffset, tp, firstPollOffsetStrategy, committedOffset);
+ for (TopicPartition newTp : newPartitions) {
+ final OffsetAndMetadata committedOffset = kafkaConsumer.committed(newTp);
+ final long fetchOffset = doSeek(newTp, committedOffset);
+ LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]",
+ fetchOffset, newTp, firstPollOffsetStrategy, committedOffset);
// If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
- if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(tp)) {
- offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
+ if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(newTp)) {
+ offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset));
}
}
LOG.info("Initialization complete");
@@ -196,24 +217,62 @@ private void initialize(Collection partitions) {
/**
* Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset.
*/
- private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
- if (committedOffset != null) { // offset was committed for this TopicPartition
- if (firstPollOffsetStrategy.equals(EARLIEST)) {
- kafkaConsumer.seekToBeginning(Collections.singleton(tp));
- } else if (firstPollOffsetStrategy.equals(LATEST)) {
- kafkaConsumer.seekToEnd(Collections.singleton(tp));
+ private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) {
+ LOG.trace("Seeking offset for topic-partition [{}] with [{}] and committed offset [{}]",
+ newTp, firstPollOffsetStrategy, committedOffset);
+
+ if (committedOffset != null) {
+ // offset was previously committed for this consumer group and topic-partition, either by this or another topology.
+ if (isOffsetCommittedByThisTopology(newTp, committedOffset)) {
+ // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply.
+ kafkaConsumer.seek(newTp, committedOffset.offset());
} else {
- // By default polling starts at the last committed offset, i.e. the first offset that was not marked as processed.
- kafkaConsumer.seek(tp, committedOffset.offset());
+ // offset was not committed by this topology, therefore FirstPollOffsetStrategy applies
+ // (only when the topology is first deployed).
+ if (firstPollOffsetStrategy.equals(EARLIEST)) {
+ kafkaConsumer.seekToBeginning(Collections.singleton(newTp));
+ } else if (firstPollOffsetStrategy.equals(LATEST)) {
+ kafkaConsumer.seekToEnd(Collections.singleton(newTp));
+ } else {
+ // Resume polling at the last committed offset, i.e. the first offset that is not marked as processed.
+ kafkaConsumer.seek(newTp, committedOffset.offset());
+ }
}
- } else { // no commits have ever been done, so start at the beginning or end depending on the strategy
+ } else {
+ // no offset commits have ever been done for this consumer group and topic-partition,
+ // so start at the beginning or end depending on FirstPollOffsetStrategy
if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
- kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+ kafkaConsumer.seekToBeginning(Collections.singleton(newTp));
} else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
- kafkaConsumer.seekToEnd(Collections.singleton(tp));
+ kafkaConsumer.seekToEnd(Collections.singleton(newTp));
}
}
- return kafkaConsumer.position(tp);
+ return kafkaConsumer.position(newTp);
+ }
+ }
+
+ /**
+ * Checks If {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
+ * This info is used to decide if {@link FirstPollOffsetStrategy} should be applied
+ *
+ * @param tp topic-partition
+ * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
+ * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
+ */
+ private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset) {
+ try {
+ if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).hasCommitted()) {
+ return true;
+ }
+
+ final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+ return committedMetadata.getTopologyId().equals(context.getStormId());
+ } catch (IOException e) {
+ LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit "
+ + "for this topic-partition was done using an earlier version of Storm. "
+ + "Defaulting to behavior compatible with earlier version", committedOffset);
+ LOG.trace("",e);
+ return false;
}
}
@@ -259,12 +318,12 @@ private PollablePartitionsInfo getPollablePartitionsInfo() {
LOG.debug("Not polling. Tuples waiting to be emitted.");
return new PollablePartitionsInfo(Collections.emptySet(), Collections.emptyMap());
}
-
+
Set assignment = kafkaConsumer.assignment();
if (!isAtLeastOnceProcessing()) {
return new PollablePartitionsInfo(assignment, Collections.emptyMap());
}
-
+
Map earliestRetriableOffsets = retryService.earliestRetriableOffsets();
Set pollablePartitions = new HashSet<>();
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
@@ -311,7 +370,7 @@ private ConsumerRecords pollKafkaBroker(PollablePartitionsInfo pollablePar
final ConsumerRecords consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
- LOG.debug("Polled [{}] records from Kafka.",
+ LOG.debug("Polled [{}] records from Kafka",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
@@ -382,7 +441,11 @@ private boolean emitOrRetryTuple(ConsumerRecord record) {
} else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
} else {
- if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) {
+ final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
+ if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset)
+ && committedOffset.offset() > kafkaConsumer.position(tp)) {
+ // Ensures that after a topology with this id is started, the consumer fetch
+ // position never falls behind the committed offset (STORM-2844)
throw new IllegalStateException("Attempting to emit a message that has already been committed.");
}
@@ -437,7 +500,7 @@ private void commitOffsetsForAckedTuples(Set assignedPartitions)
final Map nextCommitOffsets = new HashMap<>();
for (Map.Entry tpOffset : assignedOffsetManagers.entrySet()) {
- final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
+ final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadata);
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
}
@@ -467,7 +530,7 @@ private void commitOffsetsForAckedTuples(Set assignedPartitions)
kafkaConsumer.seek(tp, committedOffset);
waitingToEmit = null;
}
-
+
final OffsetManager offsetManager = assignedOffsetManagers.get(tp);
offsetManager.commit(tpOffset.getValue());
LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
@@ -615,20 +678,20 @@ public Map getComponentConfiguration() {
private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
}
-
+
private static class PollablePartitionsInfo {
private final Set pollablePartitions;
//The subset of earliest retriable offsets that are on pollable partitions
private final Map pollableEarliestRetriableOffsets;
-
+
public PollablePartitionsInfo(Set pollablePartitions, Map earliestRetriableOffsets) {
this.pollablePartitions = pollablePartitions;
this.pollableEarliestRetriableOffsets = earliestRetriableOffsets.entrySet().stream()
.filter(entry -> pollablePartitions.contains(entry.getKey()))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
}
-
+
public boolean shouldPoll() {
return !this.pollablePartitions.isEmpty();
}
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 6c792ab6a8a..1c99f24ef2f 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -110,26 +110,30 @@ public KafkaSpoutConfig(Builder builder) {
}
/**
- * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will affect the number of consumer
- * records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST.
- * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST.
+ * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment.
+ * By default this parameter is set to UNCOMMITTED_EARLIEST. If the strategy is set to:
+ *
*
- *
EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous
- * commits
- *
LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of
- * previous commits
- *
UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been
- * committed, it behaves as EARLIEST.
- *
UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been
- * committed, it behaves as LATEST.
+ *
EARLIEST - the kafka spout polls records starting in the first offset of the partition, regardless
+ * of previous commits. This setting only takes effect on topology deployment.
+ *
LATEST - the kafka spout polls records with offsets greater than the last offset in the partition,
+ * regardless of previous commits. This setting only takes effect on topology deployment.
+ *
UNCOMMITTED_EARLIEST - the kafka spout polls records from the last committed offset, if any. If no offset has been
+ * committed it behaves as EARLIEST.
+ *
UNCOMMITTED_LATEST - the kafka spout polls records from the last committed offset, if any. If no offset has been
+ * committed it behaves as LATEST.
*
- *
*/
- public static enum FirstPollOffsetStrategy {
+ public enum FirstPollOffsetStrategy {
EARLIEST,
LATEST,
UNCOMMITTED_EARLIEST,
- UNCOMMITTED_LATEST
+ UNCOMMITTED_LATEST;
+
+ @Override
+ public String toString() {
+ return "FirstPollOffsetStrategy{" + super.toString() + "}";
+ }
}
/**
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index b802a52b830..1626fee0028 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -30,7 +30,7 @@ public class KafkaSpoutMessageId implements Serializable {
* true if the record was emitted using a form of collector.emit(...). false
* when skipping null tuples as configured by the user in KafkaSpoutConfig
*/
- private boolean emitted;
+ private boolean emitted;
public KafkaSpoutMessageId(ConsumerRecord, ?> consumerRecord) {
this(consumerRecord, true);
@@ -88,27 +88,14 @@ public void setEmitted(boolean emitted) {
this.emitted = emitted;
}
- /**
- * Gets metadata for this message which may be committed to Kafka.
- * @param currThread The calling thread
- * @return The metadata
- */
- public String getMetadata(Thread currThread) {
- return "{"
- + "topic-partition=" + topicPart
- + ", offset=" + offset
- + ", numFails=" + numFails
- + ", thread='" + currThread.getName() + "'"
- + '}';
- }
-
@Override
public String toString() {
return "{"
- + "topic-partition=" + topicPart
- + ", offset=" + offset
- + ", numFails=" + numFails
- + '}';
+ + "topic-partition=" + topicPart
+ + ", offset=" + offset
+ + ", numFails=" + numFails
+ + ", emitted=" + emitted
+ + '}';
}
@Override
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java
new file mode 100644
index 00000000000..b7fd1a6edca
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Object representing metadata committed to Kafka.
+ */
+public class CommitMetadata {
+ private final String topologyId;
+ private final int taskId;
+ private final String threadName;
+
+ /** Kafka metadata. */
+ @JsonCreator
+ public CommitMetadata(@JsonProperty("topologyId") String topologyId,
+ @JsonProperty("taskId") int taskId,
+ @JsonProperty("threadName") String threadName) {
+
+ this.topologyId = topologyId;
+ this.taskId = taskId;
+ this.threadName = threadName;
+ }
+
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public String getThreadName() {
+ return threadName;
+ }
+
+ @Override
+ public String toString() {
+ return "CommitMetadata{"
+ + "topologyId='" + topologyId + '\''
+ + ", taskId=" + taskId
+ + ", threadName='" + threadName + '\''
+ + '}';
+ }
+}
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index e7711b0bb92..ec6f2a1b7a5 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -17,12 +17,13 @@
package org.apache.storm.kafka.spout.internal;
import com.google.common.annotations.VisibleForTesting;
+
import java.util.Comparator;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.TreeSet;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
@@ -33,28 +34,26 @@
* Manages acked and committed offsets for a TopicPartition. This class is not thread safe
*/
public class OffsetManager {
-
private static final Comparator OFFSET_COMPARATOR = new OffsetComparator();
private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class);
+
private final TopicPartition tp;
- /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
- * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
- private final long initialFetchOffset;
- // Committed offset, i.e. the offset where processing will resume if the spout restarts. Initially it is set to fetchOffset.
- private long committedOffset;
// Emitted Offsets List
private final NavigableSet emittedOffsets = new TreeSet<>();
// Acked messages sorted by ascending order of offset
private final NavigableSet ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);
+ // Committed offset, i.e. the offset where processing will resume upon spout restart. Initially it is set to fetchOffset.
+ private long committedOffset;
+ // True if this OffsetManager has made at least one commit to Kafka
+ private boolean committed;
/**
* Creates a new OffsetManager.
- * @param tp The TopicPartition
+ * @param tp The TopicPartition
* @param initialFetchOffset The initial fetch offset for the given TopicPartition
*/
public OffsetManager(TopicPartition tp, long initialFetchOffset) {
this.tp = tp;
- this.initialFetchOffset = initialFetchOffset;
this.committedOffset = initialFetchOffset;
LOG.debug("Instantiated {}", this.toString());
}
@@ -95,18 +94,20 @@ public long getNthUncommittedOffsetAfterCommittedOffset(int index) {
*
* The returned offset points to the earliest uncommitted offset, and matches the semantics of the KafkaConsumer.commitSync API.
*
+ * @param commitMetadata Metadata information to commit to Kafka. It is constant per KafkaSpout instance per topology
* @return the next OffsetAndMetadata to commit, or null if no offset is
* ready to commit.
*/
- public OffsetAndMetadata findNextCommitOffset() {
+ public OffsetAndMetadata findNextCommitOffset(final String commitMetadata) {
+ boolean found = false;
long currOffset;
long nextCommitOffset = committedOffset;
- KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
currOffset = currAckedMsg.offset();
- if (currOffset == nextCommitOffset) { // found the next offset to commit
- nextCommitMsg = currAckedMsg;
+ if (currOffset == nextCommitOffset) {
+ // found the next offset to commit
+ found = true;
nextCommitOffset = currOffset + 1;
} else if (currOffset > nextCommitOffset) {
if (emittedOffsets.contains(nextCommitOffset)) {
@@ -128,7 +129,6 @@ public OffsetAndMetadata findNextCommitOffset() {
if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset",
currOffset, nextCommitOffset);
- nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset + 1;
} else {
LOG.debug("Topic-partition [{}] has non-sequential offset [{}]."
@@ -144,11 +144,11 @@ public OffsetAndMetadata findNextCommitOffset() {
}
OffsetAndMetadata nextCommitOffsetAndMetadata = null;
- if (nextCommitMsg != null) {
- nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset,
- nextCommitMsg.getMetadata(Thread.currentThread()));
+ if (found) {
+ nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, commitMetadata);
+
LOG.debug("Topic-partition [{}] has offsets [{}-{}] ready to be committed."
- + " Processing will resume at [{}] if the spout restarts",
+ + " Processing will resume at offset [{}] upon spout restart",
tp, committedOffset, nextCommitOffsetAndMetadata.offset() - 1, nextCommitOffsetAndMetadata.offset());
} else {
LOG.debug("Topic-partition [{}] has no offsets ready to be committed", tp);
@@ -160,18 +160,19 @@ public OffsetAndMetadata findNextCommitOffset() {
/**
* Marks an offset as committed. This method has side effects - it sets the
* internal state in such a way that future calls to
- * {@link #findNextCommitOffset()} will return offsets greater than or equal to the
+ * {@link #findNextCommitOffset(String)} will return offsets greater than or equal to the
* offset specified, if any.
*
- * @param committedOffset The committed offset. All lower offsets are expected to have been committed.
+ * @param committedOffsetAndMeta The committed offset. All lower offsets are expected to have been committed.
* @return Number of offsets committed in this commit
*/
- public long commit(OffsetAndMetadata committedOffset) {
+ public long commit(OffsetAndMetadata committedOffsetAndMeta) {
+ committed = true;
final long preCommitCommittedOffset = this.committedOffset;
long numCommittedOffsets = 0;
- this.committedOffset = committedOffset.offset();
+ this.committedOffset = committedOffsetAndMeta.offset();
for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext();) {
- if (iterator.next().offset() < committedOffset.offset()) {
+ if (iterator.next().offset() < committedOffsetAndMeta.offset()) {
iterator.remove();
numCommittedOffsets++;
} else {
@@ -180,7 +181,7 @@ public long commit(OffsetAndMetadata committedOffset) {
}
for (Iterator iterator = emittedOffsets.iterator(); iterator.hasNext();) {
- if (iterator.next() < committedOffset.offset()) {
+ if (iterator.next() < committedOffsetAndMeta.offset()) {
iterator.remove();
} else {
break;
@@ -190,28 +191,25 @@ public long commit(OffsetAndMetadata committedOffset) {
LOG.trace("{}", this);
LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}]."
- + " Processing will resume at offset [{}] if the spout restarts.",
+ + " Processing will resume at [{}] upon spout restart",
numCommittedOffsets, preCommitCommittedOffset, this.committedOffset - 1, tp, this.committedOffset);
return numCommittedOffsets;
}
- public long getCommittedOffset() {
- return committedOffset;
- }
-
- public boolean isEmpty() {
- return ackedMsgs.isEmpty();
- }
-
- public boolean contains(ConsumerRecord record) {
- return contains(new KafkaSpoutMessageId(record));
+ /**
+ * Checks if this OffsetManager has committed to Kafka.
+ *
+ * @return true if this OffsetManager has made at least one commit to Kafka, false otherwise
+ */
+ public boolean hasCommitted() {
+ return committed;
}
public boolean contains(KafkaSpoutMessageId msgId) {
return ackedMsgs.contains(msgId);
}
-
+
@VisibleForTesting
boolean containsEmitted(long offset) {
return emittedOffsets.contains(offset);
@@ -221,7 +219,6 @@ boolean containsEmitted(long offset) {
public final String toString() {
return "OffsetManager{"
+ "topic-partition=" + tp
- + ", fetchOffset=" + initialFetchOffset
+ ", committedOffset=" + committedOffset
+ ", emittedOffsets=" + emittedOffsets
+ ", ackedMsgs=" + ackedMsgs
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
new file mode 100644
index 00000000000..5254320fa8c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaSpoutAbstractTest {
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+ final TopologyContext topologyContext = mock(TopologyContext.class);
+ final Map conf = new HashMap<>();
+ final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ final long commitOffsetPeriodMs;
+
+ KafkaConsumer consumerSpy;
+ KafkaSpout spout;
+
+ @Captor
+ ArgumentCaptor