Skip to content

Commit

Permalink
Merge branch 'STORM-2343' of https://github.com/srdo/storm
Browse files Browse the repository at this point in the history
  • Loading branch information
Sriharsha Chintalapani committed May 1, 2017
2 parents a4afacd + d98a5b8 commit d39e3a7
Show file tree
Hide file tree
Showing 9 changed files with 616 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -79,7 +80,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient Map<TopicPartition, OffsetManager> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode
private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
private transient Timer refreshSubscriptionTimer; // Triggers when a subscription should be refreshed
private transient TopologyContext context;

Expand Down Expand Up @@ -250,9 +251,13 @@ private boolean commit() {

private boolean poll() {
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
final boolean poll = !waitingToEmit()
&& (numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode);

final int readyMessageCount = retryService.readyMessageCount();
final boolean poll = !waitingToEmit() &&
//Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit
//Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, and prevents locking up the spout when there are too many retriable tuples
(numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets ||
consumerAutoCommitMode);

if (!poll) {
if (waitingToEmit()) {
LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
Expand Down Expand Up @@ -290,15 +295,11 @@ private ConsumerRecords<K, V> pollKafkaBroker() {
}

private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
final Map<TopicPartition, Long> retriableTopicPartitions = retryService.earliestRetriableOffsets();

for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1); // Seek to last committed offset
}
for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : retriableTopicPartitions.entrySet()) {
//Seek directly to the earliest retriable message for each retriable topic partition
kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
}
}

Expand All @@ -318,7 +319,6 @@ private void emit() {
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);

if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,9 @@ public Builder<K,V> setMaxPartitionFectchBytes(int bytes) {

/**
* The maximum number of records a poll will return.
* Will only work with Kafka 0.10.0 and above.
*/
public Builder<K,V> setMaxPollRecords(int records) {
//to avoid issues with 0.9 versions that technically still work
// with this we do not use ConsumerConfig.MAX_POLL_RECORDS_CONFIG
return setProp("max.poll.records", records);
return setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
}

//Security Related Configs
Expand Down Expand Up @@ -330,11 +327,12 @@ public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
this.offsetCommitPeriodMs = offsetCommitPeriodMs;
return this;
}

/**
* Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
* Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
* of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
* of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
* Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1.
* @param maxUncommittedOffsets max number of records that can be be pending commit
*/
public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.storm.utils.Time;

/**
* Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1) where failCount = 1, 2, 3, ...
* nextRetry = Min(nextRetry, currentTime + maxDelay)
*/
public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
Expand All @@ -54,21 +57,28 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
@Override
public int compare(RetrySchedule entry1, RetrySchedule entry2) {
return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());

if(result == 0) {
//TreeSet uses compareTo instead of equals() for the Set contract
//Ensure that we can save two retry schedules with the same timestamp
result = entry1.hashCode() - entry2.hashCode();
}
return result;
}
}

private class RetrySchedule {
private final KafkaSpoutMessageId msgId;
private long nextRetryTimeNanos;

public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) {
this.msgId = msgId;
this.nextRetryTimeNanos = nextRetryTime;
this.nextRetryTimeNanos = nextRetryTimeNanos;
LOG.debug("Created {}", this);
}

public void setNextRetryTime() {
public void setNextRetryTimeNanos() {
nextRetryTimeNanos = nextTime(msgId);
LOG.debug("Updated {}", this);
}
Expand All @@ -81,7 +91,7 @@ public boolean retry(long currentTimeNanos) {
public String toString() {
return "RetrySchedule{" +
"msgId=" + msgId +
", nextRetryTime=" + nextRetryTimeNanos +
", nextRetryTimeNanos=" + nextRetryTimeNanos +
'}';
}

Expand All @@ -96,39 +106,35 @@ public long nextRetryTimeNanos() {

public static class TimeInterval implements Serializable {
private final long lengthNanos;
private final long length;
private final TimeUnit timeUnit;
private final long length;

/**
* @param length length of the time interval in the units specified by {@link TimeUnit}
* @param timeUnit unit used to specify a time interval on which to specify a time unit
*/
public TimeInterval(long length, TimeUnit timeUnit) {
this.length = length;
this.timeUnit = timeUnit;
this.lengthNanos = timeUnit.toNanos(length);
this.timeUnit = timeUnit;
this.length = length;
}

public static TimeInterval seconds(long length) {
return new TimeInterval(length, TimeUnit.SECONDS);
}

public static TimeInterval milliSeconds(long length) {
return new TimeInterval(length, TimeUnit.MILLISECONDS);
}

public static TimeInterval microSeconds(long length) {
return new TimeInterval(length, TimeUnit.MICROSECONDS);
}

public long lengthNanos() {
return lengthNanos;
}

public long length() {
return length;
}


public TimeUnit timeUnit() {
return timeUnit;
}
Expand Down Expand Up @@ -165,26 +171,27 @@ public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval
}

@Override
public Set<TopicPartition> retriableTopicPartitions() {
final Set<TopicPartition> tps = new HashSet<>();
final long currentTimeNanos = System.nanoTime();
public Map<TopicPartition, Long> earliestRetriableOffsets() {
final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
final KafkaSpoutMessageId msgId = retrySchedule.msgId;
tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
tpToEarliestRetriableOffset.merge(tpForMessage, msgId.offset(), Math::min);
} else {
break; // Stop searching as soon as passed current time
}
}
LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
return tps;
LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
return tpToEarliestRetriableOffset;
}

@Override
public boolean isReady(KafkaSpoutMessageId msgId) {
boolean retry = false;
if (toRetryMsgs.contains(msgId)) {
final long currentTimeNanos = System.nanoTime();
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
if (retrySchedule.msgId.equals(msgId)) {
Expand Down Expand Up @@ -265,13 +272,27 @@ public boolean schedule(KafkaSpoutMessageId msgId) {
return true;
}
}

@Override
public int readyMessageCount() {
int count = 0;
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
++count;
} else {
break; //Stop counting when past current time
}
}
return count;
}

// if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
private long nextTime(KafkaSpoutMessageId msgId) {
final long currentTimeNanos = System.nanoTime();
final long currentTimeNanos = Time.nanoTime();
final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
? currentTimeNanos + initialDelay.lengthNanos
: (currentTimeNanos + delayPeriod.timeUnit.toNanos((long) Math.pow(delayPeriod.length, msgId.numFails() - 1)));
: currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1));
return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -54,10 +55,11 @@ public interface KafkaSpoutRetryService extends Serializable {
boolean retainAll(Collection<TopicPartition> topicPartitions);

/**
* @return set of topic partitions that have offsets that are ready to be retried, i.e.,
* for which a tuple has failed and has retry time less than current time
* @return The earliest retriable offset for each TopicPartition that has
* offsets ready to be retried, i.e. for which a tuple has failed
* and has retry time less than current time
*/
Set<TopicPartition> retriableTopicPartitions();
Map<TopicPartition, Long> earliestRetriableOffsets();

/**
* Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried,
Expand All @@ -75,4 +77,6 @@ public interface KafkaSpoutRetryService extends Serializable {
* Returns false is this message is not scheduled for retrial
*/
boolean isScheduled(KafkaSpoutMessageId msgId);

int readyMessageCount();
}
Loading

0 comments on commit d39e3a7

Please sign in to comment.