Skip to content

Commit

Permalink
Merge branch 'Apache_master_STORM-2844_ISEEarliest' of https://github…
Browse files Browse the repository at this point in the history
….com/hmcl/storm-apache into asfgit-master
  • Loading branch information
srdo committed Dec 26, 2017
2 parents 5e696ff + a3899b7 commit f998e26
Show file tree
Hide file tree
Showing 14 changed files with 896 additions and 125 deletions.
8 changes: 8 additions & 0 deletions external/storm-kafka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!--test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -35,6 +40,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -45,6 +51,7 @@
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.internal.CommitMetadata;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
Expand All @@ -63,6 +70,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
//Initial delay for the commit and subscription refresh timers
public static final long TIMER_DELAY_MS = 500;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();

// Storm
protected SpoutOutputCollector collector;
Expand Down Expand Up @@ -94,12 +102,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Triggers when a subscription should be refreshed
private transient Timer refreshSubscriptionTimer;
private transient TopologyContext context;
// Metadata information to commit to Kafka. It is unique per spout per topology.
private transient String commitMetadata;

public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
}

//This constructor is here for testing
@VisibleForTesting
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.kafkaSpoutConfig = kafkaSpoutConfig;
Expand Down Expand Up @@ -129,12 +139,23 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = Collections.emptyListIterator();
setCommitMetadata(context);

tupleListener.open(conf, context);

LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}

private void setCommitMetadata(TopologyContext context) {
try {
commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
} catch (JsonProcessingException e) {
LOG.error("Failed to create Kafka commit metadata due to JSON serialization error",e);
throw new RuntimeException(e);
}
}

private boolean isAtLeastOnceProcessing() {
return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
}
Expand All @@ -143,7 +164,7 @@ private boolean isAtLeastOnceProcessing() {
private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {

private Collection<TopicPartition> previousAssignment = new HashSet<>();

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
previousAssignment = partitions;
Expand All @@ -167,7 +188,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

private void initialize(Collection<TopicPartition> partitions) {
if (isAtLeastOnceProcessing()) {
// remove from acked all partitions that are no longer assigned to this spout
// remove offsetManagers for all partitions that are no longer assigned to this spout
offsetManagers.keySet().retainAll(partitions);
retryService.retainAll(partitions);

Expand All @@ -180,14 +201,14 @@ private void initialize(Collection<TopicPartition> partitions) {

Set<TopicPartition> newPartitions = new HashSet<>(partitions);
newPartitions.removeAll(previousAssignment);
for (TopicPartition tp : newPartitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
final long fetchOffset = doSeek(tp, committedOffset);
LOG.debug("Set consumer position to [{}] for topic-partition [{}], based on strategy [{}] and committed offset [{}]",
fetchOffset, tp, firstPollOffsetStrategy, committedOffset);
for (TopicPartition newTp : newPartitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(newTp);
final long fetchOffset = doSeek(newTp, committedOffset);
LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]",
fetchOffset, newTp, firstPollOffsetStrategy, committedOffset);
// If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(tp)) {
offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(newTp)) {
offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset));
}
}
LOG.info("Initialization complete");
Expand All @@ -196,24 +217,62 @@ private void initialize(Collection<TopicPartition> partitions) {
/**
* Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset.
*/
private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
if (committedOffset != null) { // offset was committed for this TopicPartition
if (firstPollOffsetStrategy.equals(EARLIEST)) {
kafkaConsumer.seekToBeginning(Collections.singleton(tp));
} else if (firstPollOffsetStrategy.equals(LATEST)) {
kafkaConsumer.seekToEnd(Collections.singleton(tp));
private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) {
LOG.trace("Seeking offset for topic-partition [{}] with [{}] and committed offset [{}]",
newTp, firstPollOffsetStrategy, committedOffset);

if (committedOffset != null) {
// offset was previously committed for this consumer group and topic-partition, either by this or another topology.
if (isOffsetCommittedByThisTopology(newTp, committedOffset)) {
// Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply.
kafkaConsumer.seek(newTp, committedOffset.offset());
} else {
// By default polling starts at the last committed offset, i.e. the first offset that was not marked as processed.
kafkaConsumer.seek(tp, committedOffset.offset());
// offset was not committed by this topology, therefore FirstPollOffsetStrategy applies
// (only when the topology is first deployed).
if (firstPollOffsetStrategy.equals(EARLIEST)) {
kafkaConsumer.seekToBeginning(Collections.singleton(newTp));
} else if (firstPollOffsetStrategy.equals(LATEST)) {
kafkaConsumer.seekToEnd(Collections.singleton(newTp));
} else {
// Resume polling at the last committed offset, i.e. the first offset that is not marked as processed.
kafkaConsumer.seek(newTp, committedOffset.offset());
}
}
} else { // no commits have ever been done, so start at the beginning or end depending on the strategy
} else {
// no offset commits have ever been done for this consumer group and topic-partition,
// so start at the beginning or end depending on FirstPollOffsetStrategy
if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
kafkaConsumer.seekToBeginning(Collections.singleton(tp));
kafkaConsumer.seekToBeginning(Collections.singleton(newTp));
} else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
kafkaConsumer.seekToEnd(Collections.singleton(tp));
kafkaConsumer.seekToEnd(Collections.singleton(newTp));
}
}
return kafkaConsumer.position(tp);
return kafkaConsumer.position(newTp);
}
}

/**
* Checks If {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
* This info is used to decide if {@link FirstPollOffsetStrategy} should be applied
*
* @param tp topic-partition
* @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
* @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
*/
private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset) {
try {
if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).hasCommitted()) {
return true;
}

final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
return committedMetadata.getTopologyId().equals(context.getStormId());
} catch (IOException e) {
LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit "
+ "for this topic-partition was done using an earlier version of Storm. "
+ "Defaulting to behavior compatible with earlier version", committedOffset);
LOG.trace("",e);
return false;
}
}

Expand Down Expand Up @@ -259,12 +318,12 @@ private PollablePartitionsInfo getPollablePartitionsInfo() {
LOG.debug("Not polling. Tuples waiting to be emitted.");
return new PollablePartitionsInfo(Collections.emptySet(), Collections.emptyMap());
}

Set<TopicPartition> assignment = kafkaConsumer.assignment();
if (!isAtLeastOnceProcessing()) {
return new PollablePartitionsInfo(assignment, Collections.emptyMap());
}

Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets();
Set<TopicPartition> pollablePartitions = new HashSet<>();
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
Expand Down Expand Up @@ -311,7 +370,7 @@ private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePar
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka.",
LOG.debug("Polled [{}] records from Kafka",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Expand Down Expand Up @@ -382,7 +441,11 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
} else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
} else {
if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset)
&& committedOffset.offset() > kafkaConsumer.position(tp)) {
// Ensures that after a topology with this id is started, the consumer fetch
// position never falls behind the committed offset (STORM-2844)
throw new IllegalStateException("Attempting to emit a message that has already been committed.");
}

Expand Down Expand Up @@ -437,7 +500,7 @@ private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions)

final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadata);
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
}
Expand Down Expand Up @@ -467,7 +530,7 @@ private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions)
kafkaConsumer.seek(tp, committedOffset);
waitingToEmit = null;
}

final OffsetManager offsetManager = assignedOffsetManagers.get(tp);
offsetManager.commit(tpOffset.getValue());
LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
Expand Down Expand Up @@ -615,20 +678,20 @@ public Map<String, Object> getComponentConfiguration() {
private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
}

private static class PollablePartitionsInfo {

private final Set<TopicPartition> pollablePartitions;
//The subset of earliest retriable offsets that are on pollable partitions
private final Map<TopicPartition, Long> pollableEarliestRetriableOffsets;

public PollablePartitionsInfo(Set<TopicPartition> pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
this.pollablePartitions = pollablePartitions;
this.pollableEarliestRetriableOffsets = earliestRetriableOffsets.entrySet().stream()
.filter(entry -> pollablePartitions.contains(entry.getKey()))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
}

public boolean shouldPoll() {
return !this.pollablePartitions.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,30 @@ public KafkaSpoutConfig(Builder<K, V> builder) {
}

/**
* The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will affect the number of consumer
* records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
* The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/>
* Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment.
* By default this parameter is set to UNCOMMITTED_EARLIEST. If the strategy is set to:
* <br/>
* <ul>
* <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous
* commits</li>
* <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of
* previous commits</li>
* <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been
* committed, it behaves as EARLIEST.</li>
* <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been
* committed, it behaves as LATEST.</li>
* <li>EARLIEST - the kafka spout polls records starting in the first offset of the partition, regardless
* of previous commits. This setting only takes effect on topology deployment.</li>
* <li>LATEST - the kafka spout polls records with offsets greater than the last offset in the partition,
* regardless of previous commits. This setting only takes effect on topology deployment.</li>
* <li>UNCOMMITTED_EARLIEST - the kafka spout polls records from the last committed offset, if any. If no offset has been
* committed it behaves as EARLIEST.</li>
* <li>UNCOMMITTED_LATEST - the kafka spout polls records from the last committed offset, if any. If no offset has been
* committed it behaves as LATEST.</li>
* </ul>
*
*/
public static enum FirstPollOffsetStrategy {
public enum FirstPollOffsetStrategy {
EARLIEST,
LATEST,
UNCOMMITTED_EARLIEST,
UNCOMMITTED_LATEST
UNCOMMITTED_LATEST;

@Override
public String toString() {
return "FirstPollOffsetStrategy{" + super.toString() + "}";
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class KafkaSpoutMessageId implements Serializable {
* true if the record was emitted using a form of collector.emit(...). false
* when skipping null tuples as configured by the user in KafkaSpoutConfig
*/
private boolean emitted;
private boolean emitted;

public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
this(consumerRecord, true);
Expand Down Expand Up @@ -88,27 +88,14 @@ public void setEmitted(boolean emitted) {
this.emitted = emitted;
}

/**
* Gets metadata for this message which may be committed to Kafka.
* @param currThread The calling thread
* @return The metadata
*/
public String getMetadata(Thread currThread) {
return "{"
+ "topic-partition=" + topicPart
+ ", offset=" + offset
+ ", numFails=" + numFails
+ ", thread='" + currThread.getName() + "'"
+ '}';
}

@Override
public String toString() {
return "{"
+ "topic-partition=" + topicPart
+ ", offset=" + offset
+ ", numFails=" + numFails
+ '}';
+ "topic-partition=" + topicPart
+ ", offset=" + offset
+ ", numFails=" + numFails
+ ", emitted=" + emitted
+ '}';
}

@Override
Expand Down
Loading

0 comments on commit f998e26

Please sign in to comment.