From a3899b75a79781602fa58b90de6c8aa784af5332 Mon Sep 17 00:00:00 2001 From: Hugo Louro Date: Sun, 17 Dec 2017 19:59:38 -0800 Subject: [PATCH] STORM-2844: KafkaSpout Throws IllegalStateException After Committing to Kafka When First Poll Strategy Set to EARLIEST - Check if commits to Kafka were committed by this topology to correctly enforce FirstPollOffsetStrategy - Cache if OffsetManager has committed to avoid JSON deserialization for every tuple and speedup lookups - CommitMetadata includes topology id, and task id and threadName - Update FirstPollOffsetStrategy javadoc - Refactor and add unit tests --- external/storm-kafka-client/pom.xml | 8 + .../apache/storm/kafka/spout/KafkaSpout.java | 125 +++++-- .../storm/kafka/spout/KafkaSpoutConfig.java | 32 +- .../kafka/spout/KafkaSpoutMessageId.java | 25 +- .../kafka/spout/internal/CommitMetadata.java | 63 ++++ .../kafka/spout/internal/OffsetManager.java | 71 ++-- .../kafka/spout/KafkaSpoutAbstractTest.java | 179 +++++++++ .../storm/kafka/spout/KafkaSpoutEmitTest.java | 6 +- .../spout/KafkaSpoutReactivationTest.java | 3 - .../kafka/spout/KafkaSpoutRebalanceTest.java | 4 - .../spout/KafkaSpoutSingleTopicTest.java | 346 ++++++++++++++++++ ...tTopologyDeployActivateDeactivateTest.java | 116 ++++++ .../kafka/spout/MaxUncommittedOffsetTest.java | 8 +- .../spout/internal/OffsetManagerTest.java | 35 +- 14 files changed, 896 insertions(+), 125 deletions(-) create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index 816f5dd1160..5ee1a6118fe 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -75,6 +75,14 @@ + + com.fasterxml.jackson.core + jackson-databind + + + com.google.guava + guava + org.mockito diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 373630eee5b..b7e31368936 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -23,6 +23,11 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -35,6 +40,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + import org.apache.commons.lang.Validate; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -45,6 +51,7 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RetriableException; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.apache.storm.kafka.spout.internal.CommitMetadata; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; import org.apache.storm.kafka.spout.internal.OffsetManager; @@ -63,6 +70,7 @@ public class KafkaSpout extends BaseRichSpout { //Initial delay for the commit and subscription refresh timers public static final long TIMER_DELAY_MS = 500; private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); // Storm protected SpoutOutputCollector collector; @@ -94,12 +102,14 @@ public class KafkaSpout extends BaseRichSpout { // Triggers when a subscription should be refreshed private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; + // Metadata information to commit to Kafka. It is unique per spout per topology. + private transient String commitMetadata; public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>()); } - //This constructor is here for testing + @VisibleForTesting KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory kafkaConsumerFactory) { this.kafkaConsumerFactory = kafkaConsumerFactory; this.kafkaSpoutConfig = kafkaSpoutConfig; @@ -129,12 +139,23 @@ public void open(Map conf, TopologyContext context, SpoutOutputC offsetManagers = new HashMap<>(); emitted = new HashSet<>(); waitingToEmit = Collections.emptyListIterator(); + setCommitMetadata(context); tupleListener.open(conf, context); LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } + private void setCommitMetadata(TopologyContext context) { + try { + commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata( + context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName())); + } catch (JsonProcessingException e) { + LOG.error("Failed to create Kafka commit metadata due to JSON serialization error",e); + throw new RuntimeException(e); + } + } + private boolean isAtLeastOnceProcessing() { return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE; } @@ -143,7 +164,7 @@ private boolean isAtLeastOnceProcessing() { private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { private Collection previousAssignment = new HashSet<>(); - + @Override public void onPartitionsRevoked(Collection partitions) { previousAssignment = partitions; @@ -167,7 +188,7 @@ public void onPartitionsAssigned(Collection partitions) { private void initialize(Collection partitions) { if (isAtLeastOnceProcessing()) { - // remove from acked all partitions that are no longer assigned to this spout + // remove offsetManagers for all partitions that are no longer assigned to this spout offsetManagers.keySet().retainAll(partitions); retryService.retainAll(partitions); @@ -180,14 +201,14 @@ private void initialize(Collection partitions) { Set newPartitions = new HashSet<>(partitions); newPartitions.removeAll(previousAssignment); - for (TopicPartition tp : newPartitions) { - final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); - final long fetchOffset = doSeek(tp, committedOffset); - LOG.debug("Set consumer position to [{}] for topic-partition [{}], based on strategy [{}] and committed offset [{}]", - fetchOffset, tp, firstPollOffsetStrategy, committedOffset); + for (TopicPartition newTp : newPartitions) { + final OffsetAndMetadata committedOffset = kafkaConsumer.committed(newTp); + final long fetchOffset = doSeek(newTp, committedOffset); + LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]", + fetchOffset, newTp, firstPollOffsetStrategy, committedOffset); // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off - if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(tp)) { - offsetManagers.put(tp, new OffsetManager(tp, fetchOffset)); + if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(newTp)) { + offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset)); } } LOG.info("Initialization complete"); @@ -196,24 +217,62 @@ private void initialize(Collection partitions) { /** * Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset. */ - private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { - if (committedOffset != null) { // offset was committed for this TopicPartition - if (firstPollOffsetStrategy.equals(EARLIEST)) { - kafkaConsumer.seekToBeginning(Collections.singleton(tp)); - } else if (firstPollOffsetStrategy.equals(LATEST)) { - kafkaConsumer.seekToEnd(Collections.singleton(tp)); + private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) { + LOG.trace("Seeking offset for topic-partition [{}] with [{}] and committed offset [{}]", + newTp, firstPollOffsetStrategy, committedOffset); + + if (committedOffset != null) { + // offset was previously committed for this consumer group and topic-partition, either by this or another topology. + if (isOffsetCommittedByThisTopology(newTp, committedOffset)) { + // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply. + kafkaConsumer.seek(newTp, committedOffset.offset()); } else { - // By default polling starts at the last committed offset, i.e. the first offset that was not marked as processed. - kafkaConsumer.seek(tp, committedOffset.offset()); + // offset was not committed by this topology, therefore FirstPollOffsetStrategy applies + // (only when the topology is first deployed). + if (firstPollOffsetStrategy.equals(EARLIEST)) { + kafkaConsumer.seekToBeginning(Collections.singleton(newTp)); + } else if (firstPollOffsetStrategy.equals(LATEST)) { + kafkaConsumer.seekToEnd(Collections.singleton(newTp)); + } else { + // Resume polling at the last committed offset, i.e. the first offset that is not marked as processed. + kafkaConsumer.seek(newTp, committedOffset.offset()); + } } - } else { // no commits have ever been done, so start at the beginning or end depending on the strategy + } else { + // no offset commits have ever been done for this consumer group and topic-partition, + // so start at the beginning or end depending on FirstPollOffsetStrategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { - kafkaConsumer.seekToBeginning(Collections.singleton(tp)); + kafkaConsumer.seekToBeginning(Collections.singleton(newTp)); } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { - kafkaConsumer.seekToEnd(Collections.singleton(tp)); + kafkaConsumer.seekToEnd(Collections.singleton(newTp)); } } - return kafkaConsumer.position(tp); + return kafkaConsumer.position(newTp); + } + } + + /** + * Checks If {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. + * This info is used to decide if {@link FirstPollOffsetStrategy} should be applied + * + * @param tp topic-partition + * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka + * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise + */ + private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset) { + try { + if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).hasCommitted()) { + return true; + } + + final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class); + return committedMetadata.getTopologyId().equals(context.getStormId()); + } catch (IOException e) { + LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit " + + "for this topic-partition was done using an earlier version of Storm. " + + "Defaulting to behavior compatible with earlier version", committedOffset); + LOG.trace("",e); + return false; } } @@ -259,12 +318,12 @@ private PollablePartitionsInfo getPollablePartitionsInfo() { LOG.debug("Not polling. Tuples waiting to be emitted."); return new PollablePartitionsInfo(Collections.emptySet(), Collections.emptyMap()); } - + Set assignment = kafkaConsumer.assignment(); if (!isAtLeastOnceProcessing()) { return new PollablePartitionsInfo(assignment, Collections.emptyMap()); } - + Map earliestRetriableOffsets = retryService.earliestRetriableOffsets(); Set pollablePartitions = new HashSet<>(); final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); @@ -311,7 +370,7 @@ private ConsumerRecords pollKafkaBroker(PollablePartitionsInfo pollablePar final ConsumerRecords consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords); final int numPolledRecords = consumerRecords.count(); - LOG.debug("Polled [{}] records from Kafka.", + LOG.debug("Polled [{}] records from Kafka", numPolledRecords); if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { //Commit polled records immediately to ensure delivery is at-most-once. @@ -382,7 +441,11 @@ private boolean emitOrRetryTuple(ConsumerRecord record) { } else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { - if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) { + final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); + if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset) + && committedOffset.offset() > kafkaConsumer.position(tp)) { + // Ensures that after a topology with this id is started, the consumer fetch + // position never falls behind the committed offset (STORM-2844) throw new IllegalStateException("Attempting to emit a message that has already been committed."); } @@ -437,7 +500,7 @@ private void commitOffsetsForAckedTuples(Set assignedPartitions) final Map nextCommitOffsets = new HashMap<>(); for (Map.Entry tpOffset : assignedOffsetManagers.entrySet()) { - final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(); + final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadata); if (nextCommitOffset != null) { nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset); } @@ -467,7 +530,7 @@ private void commitOffsetsForAckedTuples(Set assignedPartitions) kafkaConsumer.seek(tp, committedOffset); waitingToEmit = null; } - + final OffsetManager offsetManager = assignedOffsetManagers.get(tp); offsetManager.commit(tpOffset.getValue()); LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp); @@ -615,20 +678,20 @@ public Map getComponentConfiguration() { private String getTopicsString() { return kafkaSpoutConfig.getSubscription().getTopicsString(); } - + private static class PollablePartitionsInfo { private final Set pollablePartitions; //The subset of earliest retriable offsets that are on pollable partitions private final Map pollableEarliestRetriableOffsets; - + public PollablePartitionsInfo(Set pollablePartitions, Map earliestRetriableOffsets) { this.pollablePartitions = pollablePartitions; this.pollableEarliestRetriableOffsets = earliestRetriableOffsets.entrySet().stream() .filter(entry -> pollablePartitions.contains(entry.getKey())) .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); } - + public boolean shouldPoll() { return !this.pollablePartitions.isEmpty(); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 6c792ab6a8a..1c99f24ef2f 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -110,26 +110,30 @@ public KafkaSpoutConfig(Builder builder) { } /** - * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will affect the number of consumer - * records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST.

- * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST.
+ * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment. + * By default this parameter is set to UNCOMMITTED_EARLIEST. If the strategy is set to: + *
*
    - *
  • EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous - * commits
  • - *
  • LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of - * previous commits
  • - *
  • UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been - * committed, it behaves as EARLIEST.
  • - *
  • UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been - * committed, it behaves as LATEST.
  • + *
  • EARLIEST - the kafka spout polls records starting in the first offset of the partition, regardless + * of previous commits. This setting only takes effect on topology deployment.
  • + *
  • LATEST - the kafka spout polls records with offsets greater than the last offset in the partition, + * regardless of previous commits. This setting only takes effect on topology deployment.
  • + *
  • UNCOMMITTED_EARLIEST - the kafka spout polls records from the last committed offset, if any. If no offset has been + * committed it behaves as EARLIEST.
  • + *
  • UNCOMMITTED_LATEST - the kafka spout polls records from the last committed offset, if any. If no offset has been + * committed it behaves as LATEST.
  • *
- * */ - public static enum FirstPollOffsetStrategy { + public enum FirstPollOffsetStrategy { EARLIEST, LATEST, UNCOMMITTED_EARLIEST, - UNCOMMITTED_LATEST + UNCOMMITTED_LATEST; + + @Override + public String toString() { + return "FirstPollOffsetStrategy{" + super.toString() + "}"; + } } /** diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java index b802a52b830..1626fee0028 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -30,7 +30,7 @@ public class KafkaSpoutMessageId implements Serializable { * true if the record was emitted using a form of collector.emit(...). false * when skipping null tuples as configured by the user in KafkaSpoutConfig */ - private boolean emitted; + private boolean emitted; public KafkaSpoutMessageId(ConsumerRecord consumerRecord) { this(consumerRecord, true); @@ -88,27 +88,14 @@ public void setEmitted(boolean emitted) { this.emitted = emitted; } - /** - * Gets metadata for this message which may be committed to Kafka. - * @param currThread The calling thread - * @return The metadata - */ - public String getMetadata(Thread currThread) { - return "{" - + "topic-partition=" + topicPart - + ", offset=" + offset - + ", numFails=" + numFails - + ", thread='" + currThread.getName() + "'" - + '}'; - } - @Override public String toString() { return "{" - + "topic-partition=" + topicPart - + ", offset=" + offset - + ", numFails=" + numFails - + '}'; + + "topic-partition=" + topicPart + + ", offset=" + offset + + ", numFails=" + numFails + + ", emitted=" + emitted + + '}'; } @Override diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java new file mode 100644 index 00000000000..b7fd1a6edca --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Object representing metadata committed to Kafka. + */ +public class CommitMetadata { + private final String topologyId; + private final int taskId; + private final String threadName; + + /** Kafka metadata. */ + @JsonCreator + public CommitMetadata(@JsonProperty("topologyId") String topologyId, + @JsonProperty("taskId") int taskId, + @JsonProperty("threadName") String threadName) { + + this.topologyId = topologyId; + this.taskId = taskId; + this.threadName = threadName; + } + + public String getTopologyId() { + return topologyId; + } + + public int getTaskId() { + return taskId; + } + + public String getThreadName() { + return threadName; + } + + @Override + public String toString() { + return "CommitMetadata{" + + "topologyId='" + topologyId + '\'' + + ", taskId=" + taskId + + ", threadName='" + threadName + '\'' + + '}'; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index e7711b0bb92..ec6f2a1b7a5 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -17,12 +17,13 @@ package org.apache.storm.kafka.spout.internal; import com.google.common.annotations.VisibleForTesting; + import java.util.Comparator; import java.util.Iterator; import java.util.NavigableSet; import java.util.NoSuchElementException; import java.util.TreeSet; -import org.apache.kafka.clients.consumer.ConsumerRecord; + import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutMessageId; @@ -33,28 +34,26 @@ * Manages acked and committed offsets for a TopicPartition. This class is not thread safe */ public class OffsetManager { - private static final Comparator OFFSET_COMPARATOR = new OffsetComparator(); private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class); + private final TopicPartition tp; - /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. - * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ - private final long initialFetchOffset; - // Committed offset, i.e. the offset where processing will resume if the spout restarts. Initially it is set to fetchOffset. - private long committedOffset; // Emitted Offsets List private final NavigableSet emittedOffsets = new TreeSet<>(); // Acked messages sorted by ascending order of offset private final NavigableSet ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); + // Committed offset, i.e. the offset where processing will resume upon spout restart. Initially it is set to fetchOffset. + private long committedOffset; + // True if this OffsetManager has made at least one commit to Kafka + private boolean committed; /** * Creates a new OffsetManager. - * @param tp The TopicPartition + * @param tp The TopicPartition * @param initialFetchOffset The initial fetch offset for the given TopicPartition */ public OffsetManager(TopicPartition tp, long initialFetchOffset) { this.tp = tp; - this.initialFetchOffset = initialFetchOffset; this.committedOffset = initialFetchOffset; LOG.debug("Instantiated {}", this.toString()); } @@ -95,18 +94,20 @@ public long getNthUncommittedOffsetAfterCommittedOffset(int index) { *

* The returned offset points to the earliest uncommitted offset, and matches the semantics of the KafkaConsumer.commitSync API. * + * @param commitMetadata Metadata information to commit to Kafka. It is constant per KafkaSpout instance per topology * @return the next OffsetAndMetadata to commit, or null if no offset is * ready to commit. */ - public OffsetAndMetadata findNextCommitOffset() { + public OffsetAndMetadata findNextCommitOffset(final String commitMetadata) { + boolean found = false; long currOffset; long nextCommitOffset = committedOffset; - KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); - if (currOffset == nextCommitOffset) { // found the next offset to commit - nextCommitMsg = currAckedMsg; + if (currOffset == nextCommitOffset) { + // found the next offset to commit + found = true; nextCommitOffset = currOffset + 1; } else if (currOffset > nextCommitOffset) { if (emittedOffsets.contains(nextCommitOffset)) { @@ -128,7 +129,6 @@ public OffsetAndMetadata findNextCommitOffset() { if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset", currOffset, nextCommitOffset); - nextCommitMsg = currAckedMsg; nextCommitOffset = currOffset + 1; } else { LOG.debug("Topic-partition [{}] has non-sequential offset [{}]." @@ -144,11 +144,11 @@ public OffsetAndMetadata findNextCommitOffset() { } OffsetAndMetadata nextCommitOffsetAndMetadata = null; - if (nextCommitMsg != null) { - nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, - nextCommitMsg.getMetadata(Thread.currentThread())); + if (found) { + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, commitMetadata); + LOG.debug("Topic-partition [{}] has offsets [{}-{}] ready to be committed." - + " Processing will resume at [{}] if the spout restarts", + + " Processing will resume at offset [{}] upon spout restart", tp, committedOffset, nextCommitOffsetAndMetadata.offset() - 1, nextCommitOffsetAndMetadata.offset()); } else { LOG.debug("Topic-partition [{}] has no offsets ready to be committed", tp); @@ -160,18 +160,19 @@ public OffsetAndMetadata findNextCommitOffset() { /** * Marks an offset as committed. This method has side effects - it sets the * internal state in such a way that future calls to - * {@link #findNextCommitOffset()} will return offsets greater than or equal to the + * {@link #findNextCommitOffset(String)} will return offsets greater than or equal to the * offset specified, if any. * - * @param committedOffset The committed offset. All lower offsets are expected to have been committed. + * @param committedOffsetAndMeta The committed offset. All lower offsets are expected to have been committed. * @return Number of offsets committed in this commit */ - public long commit(OffsetAndMetadata committedOffset) { + public long commit(OffsetAndMetadata committedOffsetAndMeta) { + committed = true; final long preCommitCommittedOffset = this.committedOffset; long numCommittedOffsets = 0; - this.committedOffset = committedOffset.offset(); + this.committedOffset = committedOffsetAndMeta.offset(); for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext();) { - if (iterator.next().offset() < committedOffset.offset()) { + if (iterator.next().offset() < committedOffsetAndMeta.offset()) { iterator.remove(); numCommittedOffsets++; } else { @@ -180,7 +181,7 @@ public long commit(OffsetAndMetadata committedOffset) { } for (Iterator iterator = emittedOffsets.iterator(); iterator.hasNext();) { - if (iterator.next() < committedOffset.offset()) { + if (iterator.next() < committedOffsetAndMeta.offset()) { iterator.remove(); } else { break; @@ -190,28 +191,25 @@ public long commit(OffsetAndMetadata committedOffset) { LOG.trace("{}", this); LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}]." - + " Processing will resume at offset [{}] if the spout restarts.", + + " Processing will resume at [{}] upon spout restart", numCommittedOffsets, preCommitCommittedOffset, this.committedOffset - 1, tp, this.committedOffset); return numCommittedOffsets; } - public long getCommittedOffset() { - return committedOffset; - } - - public boolean isEmpty() { - return ackedMsgs.isEmpty(); - } - - public boolean contains(ConsumerRecord record) { - return contains(new KafkaSpoutMessageId(record)); + /** + * Checks if this OffsetManager has committed to Kafka. + * + * @return true if this OffsetManager has made at least one commit to Kafka, false otherwise + */ + public boolean hasCommitted() { + return committed; } public boolean contains(KafkaSpoutMessageId msgId) { return ackedMsgs.contains(msgId); } - + @VisibleForTesting boolean containsEmitted(long offset) { return emittedOffsets.contains(offset); @@ -221,7 +219,6 @@ boolean containsEmitted(long offset) { public final String toString() { return "OffsetManager{" + "topic-partition=" + tp - + ", fetchOffset=" + initialFetchOffset + ", committedOffset=" + committedOffset + ", emittedOffsets=" + emittedOffsets + ", ackedMsgs=" + ackedMsgs diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java new file mode 100644 index 00000000000..5254320fa8c --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.KafkaUnitRule; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public abstract class KafkaSpoutAbstractTest { + @Rule + public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + + final TopologyContext topologyContext = mock(TopologyContext.class); + final Map conf = new HashMap<>(); + final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); + final long commitOffsetPeriodMs; + + KafkaConsumer consumerSpy; + KafkaSpout spout; + + @Captor + ArgumentCaptor> commitCapture; + private Time.SimulatedTime simulatedTime; + private KafkaSpoutConfig spoutConfig; + + /** + * This constructor should be called by the subclass' default constructor with the desired value + * @param commitOffsetPeriodMs commit offset period to be used in commit and verification of messages committed + */ + protected KafkaSpoutAbstractTest(long commitOffsetPeriodMs) { + this.commitOffsetPeriodMs = commitOffsetPeriodMs; + } + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + spoutConfig = createSpoutConfig(); + + consumerSpy = createConsumerSpy(); + + spout = new KafkaSpout<>(spoutConfig, createConsumerFactory()); + + simulatedTime = new Time.SimulatedTime(); + } + + private KafkaConsumerFactory createConsumerFactory() { + + return new KafkaConsumerFactory() { + @Override + public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { + return consumerSpy; + } + + }; + } + + KafkaConsumer createConsumerSpy() { + return spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); + } + + @After + public void tearDown() throws Exception { + simulatedTime.close(); + } + + abstract KafkaSpoutConfig createSpoutConfig(); + + void prepareSpout(int messageCount) throws Exception { + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + } + + /** + * Helper method to in sequence do: + *

  • + *
      spout.nexTuple()
    + *
      verify messageId
    + *
      spout.ack(msgId)
    + *
      reset(collector) to be able to reuse mock
    + *
  • + * + * @param offset offset of message to be verified + * @return {@link ArgumentCaptor} of the messageId verified + */ + ArgumentCaptor nextTuple_verifyEmitted_ack_resetCollector(int offset) { + spout.nextTuple(); + + ArgumentCaptor messageId = verifyMessageEmitted(offset); + + spout.ack(messageId.getValue()); + + reset(collector); + + return messageId; + } + + // offset and messageId are used interchangeably + ArgumentCaptor verifyMessageEmitted(int offset) { + final ArgumentCaptor messageId = ArgumentCaptor.forClass(Object.class); + + verify(collector).emit( + eq(SingleTopicKafkaSpoutConfiguration.STREAM), + eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC, + Integer.toString(offset), + Integer.toString(offset))), + messageId.capture()); + + return messageId; + } + + void commitAndVerifyAllMessagesCommitted(long msgCount) { + // reset commit timer such that commit happens on next call to nextTuple() + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(msgCount); + } + + /* + * Asserts that commitSync has been called once, + * that there are only commits on one topic, + * and that the committed offset covers messageCount messages + */ + void verifyAllMessagesCommitted(long messageCount) { + verify(consumerSpy).commitSync(commitCapture.capture()); + + final Map commits = commitCapture.getValue(); + assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1)); + + OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue(); + assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount)); + + reset(consumerSpy); + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 390851636db..317723db4f3 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -23,12 +23,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -88,7 +90,7 @@ public void testNextTupleEmitsAtMostOneTuple() { } @Test - public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() { + public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() throws IOException { //The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed @@ -168,7 +170,7 @@ public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() { Optional failedMessagePartitionTwo = messageIds.getAllValues().stream() .filter(messageId -> messageId.partition() == partitionTwo.partition()) .max((msgId, msgId2) -> (int)(msgId.offset() - msgId2.offset())); - + spout.fail(failedMessagePartitionTwo.get()); reset(collectorMock); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java index d25dd57a79c..2273117e91f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java @@ -18,8 +18,6 @@ import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -27,7 +25,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index d4ae7730650..676cb3d85a8 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -58,10 +58,6 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import java.util.HashSet; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRecord; - public class KafkaSpoutRebalanceTest { @Captor diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java new file mode 100644 index 00000000000..33c71bc97ff --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.tuple.Values; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.utils.Time; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; + +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.hamcrest.Matchers; + +public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { + private final int maxPollRecords = 10; + private final int maxRetries = 3; + + public KafkaSpoutSingleTopicTest() { + super(2_000); + } + + @Override + KafkaSpoutConfig createSpoutConfig() { + return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( + KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), + Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))) + .setOffsetCommitPeriodMs(commitOffsetPeriodMs) + .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), + maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) + .build(); + } + + @Test + public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception { + final int messageCount = maxPollRecords * 2; + prepareSpout(messageCount); + + //Emit all messages and fail the first one while acking the rest + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + ArgumentCaptor messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture()); + List messageIds = messageIdCaptor.getAllValues(); + for (int i = 1; i < messageIds.size(); i++) { + spout.ack(messageIds.get(i)); + } + KafkaSpoutMessageId failedTuple = messageIds.get(0); + spout.fail(failedTuple); + + //Advance the time and replay the failed tuple. + reset(collector); + spout.nextTuple(); + ArgumentCaptor failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector).emit(anyString(), anyList(), failedIdReplayCaptor.capture()); + + assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple)); + + /* Ack the tuple, and commit. + * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll. + */ + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs); + spout.ack(failedIdReplayCaptor.getValue()); + spout.nextTuple(); + verify(consumerSpy).commitSync(commitCapture.capture()); + + Map capturedCommit = commitCapture.getValue(); + TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); + assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp)); + assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount)); + + /* Verify that the following acked (now committed) tuples are not emitted again + * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, + * this verifies that the spout keeps the consumer position ahead of the committed offset when committing + */ + reset(collector); + //Just do a few polls to check that nothing more is emitted + for(int i = 0; i < 3; i++) { + spout.nextTuple(); + } + verify(collector, never()).emit(anyString(), anyList(), anyObject()); + } + + @Test + public void testShouldContinueWithSlowDoubleAcks() throws Exception { + final int messageCount = 20; + prepareSpout(messageCount); + + //play 1st tuple + ArgumentCaptor messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture()); + spout.ack(messageIdToDoubleAck.getValue()); + + //Emit some more messages + for(int i = 0; i < messageCount / 2; i++) { + spout.nextTuple(); + } + + spout.ack(messageIdToDoubleAck.getValue()); + + //Emit any remaining messages + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + //Verify that all messages are emitted, ack all the messages + ArgumentCaptor messageIds = ArgumentCaptor.forClass(Object.class); + verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), + anyList(), + messageIds.capture()); + for(Object id : messageIds.getAllValues()) { + spout.ack(id); + } + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } + + @Test + public void testShouldEmitAllMessages() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + //Emit all messages and check that they are emitted. Ack the messages too + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + ArgumentCaptor messageId = ArgumentCaptor.forClass(Object.class); + verify(collector).emit( + eq(SingleTopicKafkaSpoutConfiguration.STREAM), + eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC, + Integer.toString(i), + Integer.toString(i))), + messageId.capture()); + spout.ack(messageId.getValue()); + reset(collector); + } + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } + + @Test + public void testShouldReplayInOrderFailedMessages() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + //play and ack 1 tuple + ArgumentCaptor messageIdAcked = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdAcked.capture()); + spout.ack(messageIdAcked.getValue()); + reset(collector); + + //play and fail 1 tuple + ArgumentCaptor messageIdFailed = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdFailed.capture()); + spout.fail(messageIdFailed.getValue()); + reset(collector); + + //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + ArgumentCaptor remainingMessageIds = ArgumentCaptor.forClass(Object.class); + //All messages except the first acked message should have been emitted + verify(collector, times(messageCount - 1)).emit( + eq(SingleTopicKafkaSpoutConfiguration.STREAM), + anyList(), + remainingMessageIds.capture()); + for(Object id : remainingMessageIds.getAllValues()) { + spout.ack(id); + } + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } + + @Test + public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + //play 1st tuple + ArgumentCaptor messageIdToFail = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdToFail.capture()); + reset(collector); + + //play 2nd tuple + ArgumentCaptor messageIdToAck = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdToAck.capture()); + reset(collector); + + //ack 2nd tuple + spout.ack(messageIdToAck.getValue()); + //fail 1st tuple + spout.fail(messageIdToFail.getValue()); + + //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + ArgumentCaptor remainingIds = ArgumentCaptor.forClass(Object.class); + //All messages except the first acked message should have been emitted + verify(collector, times(messageCount - 1)).emit( + eq(SingleTopicKafkaSpoutConfiguration.STREAM), + anyList(), + remainingIds.capture()); + for(Object id : remainingIds.getAllValues()) { + spout.ack(id); + } + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } + + @Test + public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { + //The spout must reemit retriable tuples, even if they fail out of order. + //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries. + final int messageCount = 10; + prepareSpout(messageCount); + + //play all tuples + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIds.capture()); + reset(collector); + //Fail tuple 5 and 3, call nextTuple, then fail tuple 2 + List capturedMessageIds = messageIds.getAllValues(); + spout.fail(capturedMessageIds.get(5)); + spout.fail(capturedMessageIds.get(3)); + spout.nextTuple(); + spout.fail(capturedMessageIds.get(2)); + + //Check that the spout will reemit all 3 failed tuples and no other tuples + ArgumentCaptor reemittedMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + verify(collector, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture()); + Set expectedReemitIds = new HashSet<>(); + expectedReemitIds.add(capturedMessageIds.get(5)); + expectedReemitIds.add(capturedMessageIds.get(3)); + expectedReemitIds.add(capturedMessageIds.get(2)); + assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds)); + } + + @Test + public void testShouldDropMessagesAfterMaxRetriesAreReached() throws Exception { + //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted + final int messageCount = 1; + prepareSpout(messageCount); + + //Emit and fail the same tuple until we've reached retry limit + for (int i = 0; i <= maxRetries; i++) { + ArgumentCaptor messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture()); + KafkaSpoutMessageId msgId = messageIdFailed.getValue(); + spout.fail(msgId); + assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1)); + reset(collector); + } + + //Verify that the tuple is not emitted again + spout.nextTuple(); + verify(collector, never()).emit(anyString(), anyListOf(Object.class), anyObject()); + } + + @Test + public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception { + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + + //Nothing is assigned yet, should emit nothing + spout.nextTuple(); + verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS + KafkaSpout.TIMER_DELAY_MS); + + //The new partition should be discovered and the message should be emitted + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java new file mode 100644 index 00000000000..a96e28430f0 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.junit.Test; + +import java.util.regex.Pattern; + +import static org.mockito.Mockito.when; + +public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAbstractTest { + + public KafkaSpoutTopologyDeployActivateDeactivateTest() { + super(2_000); + } + + @Override + KafkaSpoutConfig createSpoutConfig() { + return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( + KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), + Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))) + .setOffsetCommitPeriodMs(commitOffsetPeriodMs) + .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) + .build(); + } + + @Test + public void test_FirstPollStrategy_Earliest_NotEnforced_OnTopologyActivateDeactivate() throws Exception { + final int messageCount = 2; + prepareSpout(messageCount); + + nextTuple_verifyEmitted_ack_resetCollector(0); + + //Commits offsets during deactivation + spout.deactivate(); + + verifyAllMessagesCommitted(1); + + consumerSpy = createConsumerSpy(); + + spout.activate(); + + nextTuple_verifyEmitted_ack_resetCollector(1); + + commitAndVerifyAllMessagesCommitted(messageCount); + } + + @Test + public void test_FirstPollStrategy_Earliest_NotEnforced_OnPartitionReassignment() throws Exception { + when(topologyContext.getStormId()).thenReturn("topology-1"); + + final int messageCount = 2; + prepareSpout(messageCount); + + nextTuple_verifyEmitted_ack_resetCollector(0); + + //Commits offsets during deactivation + spout.deactivate(); + + verifyAllMessagesCommitted(1); + + // Restart topology with the same topology id, which mimics the behavior of partition reassignment + setUp(); + // Initialize spout using the same populated data (i.e same kafkaUnitRule) + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + + nextTuple_verifyEmitted_ack_resetCollector(1); + + commitAndVerifyAllMessagesCommitted(messageCount); + } + + @Test + public void test_FirstPollStrategy_Earliest_Enforced_OnlyOnTopologyDeployment() throws Exception { + when(topologyContext.getStormId()).thenReturn("topology-1"); + + final int messageCount = 2; + prepareSpout(messageCount); + + nextTuple_verifyEmitted_ack_resetCollector(0); + + //Commits offsets during deactivation + spout.deactivate(); + + verifyAllMessagesCommitted(1); + + // Restart topology with a different topology id + setUp(); + when(topologyContext.getStormId()).thenReturn("topology-2"); + // Initialize spout using the same populated data (i.e same kafkaUnitRule) + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + + //Emit all messages and check that they are emitted. Ack the messages too + for (int i = 0; i < messageCount; i++) { + nextTuple_verifyEmitted_ack_resetCollector(i); + } + + commitAndVerifyAllMessagesCommitted(messageCount); + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index c76fc15cdbf..d92a3a74d47 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -31,9 +31,12 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; + import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.storm.kafka.KafkaUnitRule; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -80,7 +83,8 @@ public void setUp() { //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets. assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets)); MockitoAnnotations.initMocks(this); - this.spout = new KafkaSpout<>(spoutConfig); + spout = new KafkaSpout<>(spoutConfig); + new KafkaConsumerFactoryDefault().createConsumer(spoutConfig); } private void prepareSpout(int msgCount) throws Exception { @@ -118,6 +122,7 @@ public void testNextTupleCanEmitMoreMessagesWhenDroppingBelowMaxUncommittedOffse spout.ack(messageId); } Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + spout.nextTuple(); //Now check that the spout will emit another maxUncommittedOffsets messages @@ -281,5 +286,4 @@ public void testNextTupleWillAllowRetryForTuplesBelowEmitLimit() throws Exceptio assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch, plus the first failed tuple from the second batch", thirdRunOffsets, everyItem(either(isIn(firstRunOffsets)).or(is(secondRunMessageIds.getAllValues().get(0).offset())))); } } - } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java index 46b3c9be7f6..9972d4cb6da 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -18,7 +18,10 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import java.util.NoSuchElementException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -29,6 +32,7 @@ import org.junit.rules.ExpectedException; public class OffsetManagerTest { + private static final String COMMIT_METADATA = "{\"topologyId\":\"tp1\",\"taskId\":3,\"threadName\":\"Thread-20\"}"; @Rule public ExpectedException expect = ExpectedException.none(); @@ -55,12 +59,12 @@ public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAc manager.addToAckMsgs(getMessageId(initialFetchOffset + 2)); manager.addToAckMsgs(getMessageId(initialFetchOffset + 6)); - assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(initialFetchOffset + 3)); + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(COMMIT_METADATA).offset(), is(initialFetchOffset + 3)); manager.addToAckMsgs(getMessageId(initialFetchOffset + 5)); - assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", - manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7)); + assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", + manager.findNextCommitOffset(COMMIT_METADATA), is(new OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA))); } @Test @@ -70,17 +74,17 @@ public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked( manager.addToEmitMsgs(initialFetchOffset + 6); manager.addToAckMsgs(getMessageId(initialFetchOffset + 6)); - assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(), is(nullValue())); + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(COMMIT_METADATA), is(nullValue())); manager.addToAckMsgs(getMessageId(initialFetchOffset + 5)); assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", - manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7)); + manager.findNextCommitOffset(COMMIT_METADATA), is(new OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA))); } @Test public void testFindNextCommittedOffsetWithNoAcks() { - OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); assertThat("There shouldn't be a next commit offset when nothing has been acked", nextCommitOffset, is(nullValue())); } @@ -91,7 +95,7 @@ public void testFindNextCommitOffsetWithOneAck() { * lastProcessedMessageOffset + 1. " */ emitAndAckMessage(getMessageId(initialFetchOffset)); - OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1)); } @@ -99,7 +103,7 @@ public void testFindNextCommitOffsetWithOneAck() { public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() { emitAndAckMessage(getMessageId(initialFetchOffset + 1)); emitAndAckMessage(getMessageId(initialFetchOffset)); - OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2)); } @@ -108,7 +112,7 @@ public void testFindNextCommitOffsetWithAckedOffsetGap() { emitAndAckMessage(getMessageId(initialFetchOffset + 2)); manager.addToEmitMsgs(initialFetchOffset + 1); emitAndAckMessage(getMessageId(initialFetchOffset)); - OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); assertThat("The next commit offset should cover the sequential acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); } @@ -122,7 +126,7 @@ public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() { */ emitAndAckMessage(getMessageId(initialFetchOffset + 2)); emitAndAckMessage(getMessageId(initialFetchOffset)); - OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist", nextCommitOffset.offset(), is(initialFetchOffset + 3)); } @@ -131,7 +135,7 @@ public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() { public void testFindNextCommitOffsetWithUnackedOffsetGap() { manager.addToEmitMsgs(initialFetchOffset + 1); emitAndAckMessage(getMessageId(initialFetchOffset)); - OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); } @@ -139,7 +143,7 @@ public void testFindNextCommitOffsetWithUnackedOffsetGap() { public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() { OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10); emitAndAckMessage(getMessageId(0)); - OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset(); + OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset(COMMIT_METADATA); assertThat("Acking an offset earlier than the committed offset should have no effect", nextCommitOffset, is(nullValue())); } @@ -181,7 +185,12 @@ public void testGetNthUncommittedOffsetAfterCommittedOffset() { expect.expect(NoSuchElementException.class); manager.getNthUncommittedOffsetAfterCommittedOffset(5); - } + @Test + public void testCommittedFlagSetOnCommit() throws Exception { + assertFalse(manager.hasCommitted()); + manager.commit(mock(OffsetAndMetadata.class)); + assertTrue(manager.hasCommitted()); + } }