diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 9fe654f71e3..c52309ecf34 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -39,6 +39,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang.Validate; @@ -56,6 +57,7 @@ import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.kafka.spout.internal.Timer; +import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -104,6 +106,7 @@ public class KafkaSpout extends BaseRichSpout { private transient TopologyContext context; // Metadata information to commit to Kafka. It is unique per spout per topology. private transient String commitMetadata; + private transient KafkaOffsetMetric kafkaOffsetMetric; public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>()); @@ -142,10 +145,29 @@ public void open(Map conf, TopologyContext context, SpoutOutputC setCommitMetadata(context); tupleListener.open(conf, context); + if (canRegisterMetrics()) { + registerMetric(); + } LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } + private void registerMetric() { + LOG.info("Registering Spout Metrics"); + kafkaOffsetMetric = new KafkaOffsetMetric(() -> offsetManagers, () -> kafkaConsumer); + context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs()); + } + + private boolean canRegisterMetrics() { + try { + KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class); + } catch (NoSuchMethodException e) { + LOG.warn("Minimum required kafka-clients library version to enable metrics is 0.10.1.0. Disabling spout metrics."); + return false; + } + return true; + } + private void setCommitMetadata(TopologyContext context) { try { commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata( @@ -707,4 +729,9 @@ public boolean shouldPoll() { return !this.pollablePartitions.isEmpty(); } } + + @VisibleForTesting + KafkaOffsetMetric getKafkaOffsetMetric() { + return kafkaOffsetMetric; + } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 1c99f24ef2f..a063790b2c2 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -22,7 +22,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -70,6 +69,9 @@ public class KafkaSpoutConfig implements Serializable { public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener(); public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class); + public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60; + + // Kafka consumer configuration private final Map kafkaProps; private final Subscription subscription; @@ -86,6 +88,7 @@ public class KafkaSpoutConfig implements Serializable { private final boolean emitNullTuples; private final ProcessingGuarantee processingGuarantee; private final boolean tupleTrackingEnforced; + private final int metricsTimeBucketSizeInSecs; /** * Creates a new KafkaSpoutConfig using a Builder. @@ -107,6 +110,7 @@ public KafkaSpoutConfig(Builder builder) { this.emitNullTuples = builder.emitNullTuples; this.processingGuarantee = builder.processingGuarantee; this.tupleTrackingEnforced = builder.tupleTrackingEnforced; + this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs; } /** @@ -177,6 +181,7 @@ public static class Builder { private boolean emitNullTuples = false; private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE; private boolean tupleTrackingEnforced = false; + private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS; public Builder(String bootstrapServers, String... topics) { this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); @@ -395,6 +400,15 @@ public Builder setTupleTrackingEnforced(boolean tupleTrackingEnforced) { return this; } + /** + * The time period that metrics data in bucketed into. + * @param metricsTimeBucketSizeInSecs time in seconds + */ + public Builder setMetricsTimeBucketSizeInSecs(int metricsTimeBucketSizeInSecs) { + this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs; + return this; + } + public KafkaSpoutConfig build() { return new KafkaSpoutConfig<>(this); } @@ -533,6 +547,10 @@ public boolean isEmitNullTuples() { return emitNullTuples; } + public int getMetricsTimeBucketSizeInSecs() { + return metricsTimeBucketSizeInSecs; + } + @Override public String toString() { return "KafkaSpoutConfig{" @@ -545,6 +563,8 @@ public String toString() { + ", translator=" + translator + ", retryService=" + retryService + ", tupleListener=" + tupleListener + + ", processingGuarantee=" + processingGuarantee + + ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs + '}'; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index ec6f2a1b7a5..c9f9541a7df 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -46,6 +46,7 @@ public class OffsetManager { private long committedOffset; // True if this OffsetManager has made at least one commit to Kafka private boolean committed; + private long latestEmittedOffset; /** * Creates a new OffsetManager. @@ -63,7 +64,8 @@ public void addToAckMsgs(KafkaSpoutMessageId msgId) { // O(Log N) } public void addToEmitMsgs(long offset) { - this.emittedOffsets.add(offset); // O(Log N) + this.emittedOffsets.add(offset); // O(Log N) + this.latestEmittedOffset = Math.max(latestEmittedOffset, offset); } public int getNumUncommittedOffsets() { @@ -215,6 +217,14 @@ boolean containsEmitted(long offset) { return emittedOffsets.contains(offset); } + public long getLatestEmittedOffset() { + return latestEmittedOffset; + } + + public long getCommittedOffset() { + return committedOffset; + } + @Override public final String toString() { return "OffsetManager{" @@ -222,6 +232,7 @@ public final String toString() { + ", committedOffset=" + committedOffset + ", emittedOffsets=" + emittedOffsets + ", ackedMsgs=" + ackedMsgs + + ", latestEmittedOffset=" + latestEmittedOffset + '}'; } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java new file mode 100644 index 00000000000..d6ed209e610 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.apache.storm.metric.api.IMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used compute the partition and topic level offset metrics. + *

+ * Partition level metrics are: + * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition + * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition + * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout + * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout + * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset + * topicName/partition_{number}/recordsInPartition // total number of records in the partition + *

+ *

+ * Topic level metrics are: + * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout + * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout + * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout + * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout + * topicName/spoutLag // total spout lag of all the associated partitions of this spout + * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout + *

+ */ +public class KafkaOffsetMetric implements IMetric { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class); + private final Supplier> offsetManagerSupplier; + private final Supplier consumerSupplier; + + public KafkaOffsetMetric(Supplier> offsetManagerSupplier, Supplier consumerSupplier) { + this.offsetManagerSupplier = offsetManagerSupplier; + this.consumerSupplier = consumerSupplier; + } + + @Override + public Object getValueAndReset() { + + Map offsetManagers = offsetManagerSupplier.get(); + KafkaConsumer kafkaConsumer = consumerSupplier.get(); + + if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) { + LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null."); + return null; + } + + Map topicMetricsMap = new HashMap<>(); + Set topicPartitions = offsetManagers.keySet(); + + Map beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions); + Map endOffsets = kafkaConsumer.endOffsets(topicPartitions); + //map to hold partition level and topic level metrics + Map result = new HashMap<>(); + + for (Map.Entry entry : offsetManagers.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + OffsetManager offsetManager = entry.getValue(); + + long latestTimeOffset = endOffsets.get(topicPartition); + long earliestTimeOffset = beginningOffsets.get(topicPartition); + + long latestEmittedOffset = offsetManager.getLatestEmittedOffset(); + long latestCompletedOffset = offsetManager.getCommittedOffset(); + long spoutLag = latestTimeOffset - latestCompletedOffset; + long recordsInPartition = latestTimeOffset - earliestTimeOffset; + + String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); + result.put(metricPath + "/" + "spoutLag", spoutLag); + result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); + result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); + result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); + result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset); + result.put(metricPath + "/" + "recordsInPartition", recordsInPartition); + + TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic()); + if (topicMetrics == null) { + topicMetrics = new TopicMetrics(); + topicMetricsMap.put(topicPartition.topic(), topicMetrics); + } + + topicMetrics.totalSpoutLag += spoutLag; + topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; + topicMetrics.totalLatestTimeOffset += latestTimeOffset; + topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; + topicMetrics.totalLatestCompletedOffset += latestCompletedOffset; + topicMetrics.totalRecordsInPartitions += recordsInPartition; + } + + for (Map.Entry e : topicMetricsMap.entrySet()) { + String topic = e.getKey(); + TopicMetrics topicMetrics = e.getValue(); + result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); + result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); + result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset); + result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset); + result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset); + result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions); + } + + LOG.debug("Metrics Tick: value : {}", result); + return result; + } + + private class TopicMetrics { + long totalSpoutLag = 0; + long totalEarliestTimeOffset = 0; + long totalLatestTimeOffset = 0; + long totalLatestEmittedOffset = 0; + long totalLatestCompletedOffset = 0; + long totalRecordsInPartitions = 0; + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index a3fc984fe96..9885cf60811 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -45,6 +45,7 @@ public void testBasic() { expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); assertEquals(expected, conf.getKafkaProps()); + assertEquals(KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS, conf.getMetricsTimeBucketSizeInSecs()); } @Test @@ -75,4 +76,13 @@ public void testWillRespectExplicitAutoOffsetResetPolicy() { assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee", conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none")); } + + @Test + public void testMetricsTimeBucketSizeInSecs() { + KafkaSpoutConfig conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setMetricsTimeBucketSizeInSecs(100) + .build(); + + assertEquals(100, conf.getMetricsTimeBucketSizeInSecs()); + } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java index 14703329ef6..7842349c510 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java @@ -47,6 +47,7 @@ import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.junit.Assert.assertEquals; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -344,4 +345,36 @@ public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception { spout.nextTuple(); verify(collectorMock).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); } + + @Test + public void testOffsetMetrics() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + Map offsetMetric = (Map) spout.getKafkaOffsetMetric().getValueAndReset(); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue()); + // the offset of the last available message + 1. + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue()); + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue()); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue()); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue()); + //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue()); + + //Emit all messages and check that they are emitted. Ack the messages too + for (int i = 0; i < messageCount; i++) { + nextTuple_verifyEmitted_ack_resetCollector(i); + } + + commitAndVerifyAllMessagesCommitted(messageCount); + + offsetMetric = (Map) spout.getKafkaOffsetMetric().getValueAndReset(); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue()); + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue()); + //latest offset + assertEquals(9, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue()); + // offset where processing will resume upon spout restart + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue()); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue()); + } } diff --git a/pom.xml b/pom.xml index 543c0d05736..65b1cca8423 100644 --- a/pom.xml +++ b/pom.xml @@ -304,7 +304,7 @@ kafka_2.10 - 0.10.0.0 + 0.10.1.0