Skip to content

Commit

Permalink
Merge branch 'kafka-metrics-apache-master' of https://github.com/omkr…
Browse files Browse the repository at this point in the history
…eddy/storm into asfgit-master
  • Loading branch information
srdo committed Jan 3, 2018
2 parents d644e29 + 9b2b2db commit 69e596a
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang.Validate;
Expand All @@ -56,6 +57,7 @@
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand Down Expand Up @@ -104,6 +106,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient TopologyContext context;
// Metadata information to commit to Kafka. It is unique per spout per topology.
private transient String commitMetadata;
private transient KafkaOffsetMetric kafkaOffsetMetric;

public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
Expand Down Expand Up @@ -142,10 +145,29 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
setCommitMetadata(context);

tupleListener.open(conf, context);
if (canRegisterMetrics()) {
registerMetric();
}

LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}

private void registerMetric() {
LOG.info("Registering Spout Metrics");
kafkaOffsetMetric = new KafkaOffsetMetric(() -> offsetManagers, () -> kafkaConsumer);
context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
}

private boolean canRegisterMetrics() {
try {
KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class);
} catch (NoSuchMethodException e) {
LOG.warn("Minimum required kafka-clients library version to enable metrics is 0.10.1.0. Disabling spout metrics.");
return false;
}
return true;
}

private void setCommitMetadata(TopologyContext context) {
try {
commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
Expand Down Expand Up @@ -707,4 +729,9 @@ public boolean shouldPoll() {
return !this.pollablePartitions.isEmpty();
}
}

@VisibleForTesting
KafkaOffsetMetric getKafkaOffsetMetric() {
return kafkaOffsetMetric;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -70,6 +69,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener();
public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class);

public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60;


// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
private final Subscription subscription;
Expand All @@ -86,6 +88,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private final boolean emitNullTuples;
private final ProcessingGuarantee processingGuarantee;
private final boolean tupleTrackingEnforced;
private final int metricsTimeBucketSizeInSecs;

/**
* Creates a new KafkaSpoutConfig using a Builder.
Expand All @@ -107,6 +110,7 @@ public KafkaSpoutConfig(Builder<K, V> builder) {
this.emitNullTuples = builder.emitNullTuples;
this.processingGuarantee = builder.processingGuarantee;
this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
}

/**
Expand Down Expand Up @@ -177,6 +181,7 @@ public static class Builder<K, V> {
private boolean emitNullTuples = false;
private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
private boolean tupleTrackingEnforced = false;
private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;

public Builder(String bootstrapServers, String... topics) {
this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
Expand Down Expand Up @@ -395,6 +400,15 @@ public Builder<K, V> setTupleTrackingEnforced(boolean tupleTrackingEnforced) {
return this;
}

/**
* The time period that metrics data in bucketed into.
* @param metricsTimeBucketSizeInSecs time in seconds
*/
public Builder<K, V> setMetricsTimeBucketSizeInSecs(int metricsTimeBucketSizeInSecs) {
this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs;
return this;
}

public KafkaSpoutConfig<K, V> build() {
return new KafkaSpoutConfig<>(this);
}
Expand Down Expand Up @@ -533,6 +547,10 @@ public boolean isEmitNullTuples() {
return emitNullTuples;
}

public int getMetricsTimeBucketSizeInSecs() {
return metricsTimeBucketSizeInSecs;
}

@Override
public String toString() {
return "KafkaSpoutConfig{"
Expand All @@ -545,6 +563,8 @@ public String toString() {
+ ", translator=" + translator
+ ", retryService=" + retryService
+ ", tupleListener=" + tupleListener
+ ", processingGuarantee=" + processingGuarantee
+ ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class OffsetManager {
private long committedOffset;
// True if this OffsetManager has made at least one commit to Kafka
private boolean committed;
private long latestEmittedOffset;

/**
* Creates a new OffsetManager.
Expand All @@ -63,7 +64,8 @@ public void addToAckMsgs(KafkaSpoutMessageId msgId) { // O(Log N)
}

public void addToEmitMsgs(long offset) {
this.emittedOffsets.add(offset); // O(Log N)
this.emittedOffsets.add(offset); // O(Log N)
this.latestEmittedOffset = Math.max(latestEmittedOffset, offset);
}

public int getNumUncommittedOffsets() {
Expand Down Expand Up @@ -215,13 +217,22 @@ boolean containsEmitted(long offset) {
return emittedOffsets.contains(offset);
}

public long getLatestEmittedOffset() {
return latestEmittedOffset;
}

public long getCommittedOffset() {
return committedOffset;
}

@Override
public final String toString() {
return "OffsetManager{"
+ "topic-partition=" + tp
+ ", committedOffset=" + committedOffset
+ ", emittedOffsets=" + emittedOffsets
+ ", ackedMsgs=" + ackedMsgs
+ ", latestEmittedOffset=" + latestEmittedOffset
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.kafka.spout.metrics;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used compute the partition and topic level offset metrics.
* <p>
* Partition level metrics are:
* topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition
* topicName/partition_{number}/latestTimeOffset //gives end offset of the partition
* topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout
* topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout
* topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset
* topicName/partition_{number}/recordsInPartition // total number of records in the partition
* </p>
* <p>
* Topic level metrics are:
* topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout
* topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout
* topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout
* topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout
* topicName/spoutLag // total spout lag of all the associated partitions of this spout
* topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout
* </p>
*/
public class KafkaOffsetMetric implements IMetric {

private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
private final Supplier<KafkaConsumer> consumerSupplier;

public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<KafkaConsumer> consumerSupplier) {
this.offsetManagerSupplier = offsetManagerSupplier;
this.consumerSupplier = consumerSupplier;
}

@Override
public Object getValueAndReset() {

Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get();
KafkaConsumer kafkaConsumer = consumerSupplier.get();

if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) {
LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
return null;
}

Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
Set<TopicPartition> topicPartitions = offsetManagers.keySet();

Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);
//map to hold partition level and topic level metrics
Map<String, Long> result = new HashMap<>();

for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetManager offsetManager = entry.getValue();

long latestTimeOffset = endOffsets.get(topicPartition);
long earliestTimeOffset = beginningOffsets.get(topicPartition);

long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
long latestCompletedOffset = offsetManager.getCommittedOffset();
long spoutLag = latestTimeOffset - latestCompletedOffset;
long recordsInPartition = latestTimeOffset - earliestTimeOffset;

String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition();
result.put(metricPath + "/" + "spoutLag", spoutLag);
result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset);
result.put(metricPath + "/" + "recordsInPartition", recordsInPartition);

TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic());
if (topicMetrics == null) {
topicMetrics = new TopicMetrics();
topicMetricsMap.put(topicPartition.topic(), topicMetrics);
}

topicMetrics.totalSpoutLag += spoutLag;
topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
topicMetrics.totalLatestTimeOffset += latestTimeOffset;
topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
topicMetrics.totalRecordsInPartitions += recordsInPartition;
}

for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
String topic = e.getKey();
TopicMetrics topicMetrics = e.getValue();
result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions);
}

LOG.debug("Metrics Tick: value : {}", result);
return result;
}

private class TopicMetrics {
long totalSpoutLag = 0;
long totalEarliestTimeOffset = 0;
long totalLatestTimeOffset = 0;
long totalLatestEmittedOffset = 0;
long totalLatestCompletedOffset = 0;
long totalRecordsInPartitions = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void testBasic() {
expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
assertEquals(expected, conf.getKafkaProps());
assertEquals(KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS, conf.getMetricsTimeBucketSizeInSecs());
}

@Test
Expand Down Expand Up @@ -75,4 +76,13 @@ public void testWillRespectExplicitAutoOffsetResetPolicy() {
assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee",
conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none"));
}

@Test
public void testMetricsTimeBucketSizeInSecs() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.setMetricsTimeBucketSizeInSecs(100)
.build();

assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.junit.Assert.assertEquals;

import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -344,4 +345,36 @@ public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception {
spout.nextTuple();
verify(collectorMock).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
}

@Test
public void testOffsetMetrics() throws Exception {
final int messageCount = 10;
prepareSpout(messageCount);

Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
// the offset of the last available message + 1.
assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue());
assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
//totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());

//Emit all messages and check that they are emitted. Ack the messages too
for (int i = 0; i < messageCount; i++) {
nextTuple_verifyEmitted_ack_resetCollector(i);
}

commitAndVerifyAllMessagesCommitted(messageCount);

offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
//latest offset
assertEquals(9, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
// offset where processing will resume upon spout restart
assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
}
}
Loading

0 comments on commit 69e596a

Please sign in to comment.