diff --git a/README.md b/README.md index 0b947ae2..52547044 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,18 @@ Labels: `cluster_name, group, is_simple_consumer` The highest (maximum) lag in time for a given consumer group. +**`kafka_consumergroup_group_sum_lag`** + +Labels: `cluster_name, group` + +The sum of the difference between the last produced offset and the last consumed offset of all partitions for this group. + +**`kafka_consumergroup_group_topic_sum_lag`** + +Labels: `cluster_name, group, topic` + +The sum of the difference between the last produced offset and the last consumed offset of all partitions in this topic for this group. + **`kafka_partition_latest_offset`** Labels: `cluster_name, topic, partition` @@ -101,7 +113,6 @@ Labels: `cluster_name, topic, partition` The earliest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the earliest available offset. The earliest available offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the offset-based volume of a partition in certain panels. - ### Labels Each metric may include the following labels when reported. If you define the `labels` property for configuration of a cluster then those labels will also be included. diff --git a/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala index 99a621ea..2eef8cb6 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala @@ -229,12 +229,21 @@ object ConsumerGroupCollector { GroupPartitionLag(gtp, offsetLag, timeLag) } - for((group, values) <- groupLag.groupBy(_.gtp.id)) { - val maxOffsetLag = values.maxBy(_.offsetLag) - val maxTimeLag = values.maxBy(_.timeLag) + for((group, groupValues) <- groupLag.groupBy(_.gtp.id)) { + val maxOffsetLag = groupValues.maxBy(_.offsetLag) + val maxTimeLag = groupValues.maxBy(_.timeLag) reporter ! Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, config.cluster.name, group, maxOffsetLag.offsetLag) reporter ! Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, config.cluster.name, group, maxTimeLag.timeLag) + + val sumOffsetLag = groupValues.map(_.offsetLag).sum + reporter ! Metrics.GroupValueMessage(Metrics.SumGroupOffsetLagMetric, config.cluster.name, group, sumOffsetLag) + + for((topic, topicValues) <- groupValues.groupBy(_.gtp.topic)) { + val topicOffsetLag = topicValues.map(_.offsetLag).sum + + reporter ! Metrics.GroupTopicValueMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic, topicOffsetLag) + } } } @@ -269,12 +278,18 @@ object ConsumerGroupCollector { groups.foreach { group => reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupOffsetLagMetric, config.cluster.name, group) reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, config.cluster.name, group) + reporter ! Metrics.GroupRemoveMetricMessage(Metrics.SumGroupOffsetLagMetric, config.cluster.name, group) } gtps.foreach { gtp => reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.LastGroupOffsetMetric, config.cluster.name, gtp) reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.OffsetLagMetric, config.cluster.name, gtp) reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.TimeLagMetric, config.cluster.name, gtp) } + + for { + (group, gtps) <- gtps.groupBy(_.id) + topic <- gtps.map(_.topic).distinct + } reporter ! Metrics.GroupTopicRemoveMetricMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic) } } } diff --git a/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala b/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala index 15ea4114..d06fc7c7 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala @@ -13,8 +13,7 @@ object Metrics { def topicPartition: Domain.TopicPartition override def labels: List[String] = List( - clusterName, - topicPartition.topic, + clusterName, topicPartition.topic, topicPartition.partition.toString ) } @@ -54,6 +53,22 @@ object Metrics { final case class GroupPartitionValueMessage(definition: GaugeDefinition, clusterName: String, gtp: Domain.GroupTopicPartition, value: Double) extends GroupPartitionMessage with MetricValue final case class GroupPartitionRemoveMetricMessage(definition: GaugeDefinition, clusterName: String, gtp: Domain.GroupTopicPartition) extends GroupPartitionMessage with RemoveMetric + sealed trait GroupTopicMessage extends Message with Metric { + def definition: GaugeDefinition + def clusterName: String + def group: String + def topic: String + override def labels: List[String] = + List( + clusterName, + group, + topic + ) + } + + final case class GroupTopicValueMessage(definition: GaugeDefinition, clusterName: String, group: String, topic: String, value: Double) extends GroupTopicMessage with MetricValue + final case class GroupTopicRemoveMetricMessage(definition: GaugeDefinition, clusterName: String, group: String, topic: String) extends GroupTopicMessage with RemoveMetric + val topicPartitionLabels = List("cluster_name", "topic", "partition") val LatestOffsetMetric = GaugeDefinition( @@ -82,6 +97,12 @@ object Metrics { groupLabels ) + val SumGroupOffsetLagMetric = GaugeDefinition( + "kafka_consumergroup_group_sum_lag", + "Sum of group offset lag", + groupLabels + ) + val groupPartitionLabels = List("cluster_name", "group", "topic", "partition", "member_host", "consumer_id", "client_id") val LastGroupOffsetMetric = GaugeDefinition( @@ -102,6 +123,14 @@ object Metrics { groupPartitionLabels ) + val groupTopicLabels = List("cluster_name", "group", "topic") + + val SumGroupTopicOffsetLagMetric = GaugeDefinition( + "kafka_consumergroup_group_topic_sum_lag", + "Sum of group offset lag across topic partitions", + groupTopicLabels + ) + val definitions = List( LatestOffsetMetric, EarliestOffsetMetric, @@ -109,6 +138,8 @@ object Metrics { MaxGroupTimeLagMetric, LastGroupOffsetMetric, OffsetLagMetric, - TimeLagMetric + TimeLagMetric, + SumGroupOffsetLagMetric, + SumGroupTopicOffsetLagMetric ) } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala index 71a4c9b6..0b00164a 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala @@ -46,7 +46,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi val metrics = reporter.receiveAll() - "report 7 metrics" in { metrics.length shouldBe 7 } + "report 9 metrics" in { metrics.length shouldBe 9 } "earliest offset metric" in { metrics should contain( @@ -78,6 +78,14 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi "max group time lag metric" in { metrics should contain(GroupValueMessage(MaxGroupTimeLagMetric, config.cluster.name, groupId, value = 0.02)) } + + "sum group offset lag metric" in { + metrics should contain(GroupValueMessage(SumGroupOffsetLagMetric, config.cluster.name, groupId, value = 20)) + } + + "sum topic offset lag metric" in { + metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, config.cluster.name, groupId, topic, value = 20)) + } } "ConsumerGroupCollector should calculate max group metrics and send" - { @@ -126,6 +134,57 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi } } + "ConsumerGroupCollector should sum the group offset lag metrics and send" - { + val reporter = TestInbox[MetricsSink.Message]() + + val lookupTable = Table(20) + lookupTable.addPoint(Point(100, 100)) + + val state = ConsumerGroupCollector.CollectorState( + topicPartitionTables = TopicPartitionTable(config.lookupTableSize, Map( + topicPartition0 -> lookupTable.copy(), + topicPartition1 -> lookupTable.copy(), + topicPartition2 -> lookupTable.copy(), + topic2Partition0 -> lookupTable.copy() + )), + ) + + val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state) + val testKit = BehaviorTestKit(behavior) + + val newEarliestOffsets = PartitionOffsets( + topicPartition0 -> Point(offset = 0, time = 100), + topicPartition1 -> Point(offset = 0, time = 100), + topicPartition2 -> Point(offset = 0, time = 100), + topic2Partition0 -> Point(offset = 0, time = 100) + ) + val newLatestOffsets = PartitionOffsets( + topicPartition0 -> Point(offset = 100, time = 200), + topicPartition1 -> Point(offset = 100, time = 200), + topicPartition2 -> Point(offset = 100, time = 200), + topic2Partition0 -> Point(offset = 100, time = 200) + ) + val newLastGroupOffsets = GroupOffsets( + gtp0 -> Some(Point(offset = 10, time = 200)), + gtp1 -> Some(Point(offset = 20, time = 200)), + gtp2 -> Some(Point(offset = 30, time = 200)), + gt2p0 -> Some(Point(offset = 40, time = 200)) + ) + + testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets)) + + val metrics = reporter.receiveAll() + + "sum of offset lag metric" in { + metrics should contain(GroupValueMessage(SumGroupOffsetLagMetric, clusterName, groupId, value = 300)) + } + + "sum of offset lag by topic metric" in { + metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic, value = 240)) + metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic2, value = 60)) + } + } + "ConsumerGroupCollector should report group offset, lag, and time lag as NaN when no group offsets found" - { val reporter = TestInbox[MetricsSink.Message]() @@ -188,6 +247,12 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi metrics should contain( Metrics.TopicPartitionValueMessage(LatestOffsetMetric, config.cluster.name, topicPartition0, value = 200)) } + + "topic offset lag metric" in { + metrics.collectFirst { + case GroupTopicValueMessage(`SumGroupTopicOffsetLagMetric`, config.cluster.name, `groupId`, `topic`, value) if value.isNaN => true + }.nonEmpty shouldBe true + } } "ConsumerGroupCollector should evict data when group metadata changes" - { @@ -252,6 +317,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi metrics should contain(GroupPartitionRemoveMetricMessage(TimeLagMetric, clusterName, gtpSingleMember)) metrics should contain(GroupRemoveMetricMessage(MaxGroupTimeLagMetric, clusterName, groupId)) metrics should contain(GroupRemoveMetricMessage(MaxGroupOffsetLagMetric, clusterName, groupId)) + metrics should contain(GroupTopicRemoveMetricMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic)) } "remove metrics for topic partitions no longer being reported" - { @@ -284,6 +350,8 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi metrics should contain(GroupPartitionRemoveMetricMessage(TimeLagMetric, clusterName, gtpSingleMember)) metrics should contain(GroupRemoveMetricMessage(MaxGroupTimeLagMetric, clusterName, groupId)) metrics should contain(GroupRemoveMetricMessage(MaxGroupOffsetLagMetric, clusterName, groupId)) + metrics should contain(GroupRemoveMetricMessage(SumGroupOffsetLagMetric, clusterName, groupId)) + metrics should contain(GroupTopicRemoveMetricMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic)) } "topic partition in topic partition table removed" in {