Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Add metric to represent a consumer group's total offset lag per topic (
Browse files Browse the repository at this point in the history
…#93)

* Add metric of a consumer group's total offset lag and total lag per topic
* Update name of group/topic aggregate metric
  • Loading branch information
dylanmei authored and seglo committed Nov 4, 2019
1 parent e2ef346 commit de1bb6c
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 8 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ Labels: `cluster_name, group, is_simple_consumer`

The highest (maximum) lag in time for a given consumer group.

**`kafka_consumergroup_group_sum_lag`**

Labels: `cluster_name, group`

The sum of the difference between the last produced offset and the last consumed offset of all partitions for this group.

**`kafka_consumergroup_group_topic_sum_lag`**

Labels: `cluster_name, group, topic`

The sum of the difference between the last produced offset and the last consumed offset of all partitions in this topic for this group.

**`kafka_partition_latest_offset`**

Labels: `cluster_name, topic, partition`
Expand All @@ -101,7 +113,6 @@ Labels: `cluster_name, topic, partition`

The earliest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the earliest available offset. The earliest available offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the offset-based volume of a partition in certain panels.


### Labels

Each metric may include the following labels when reported. If you define the `labels` property for configuration of a cluster then those labels will also be included.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,21 @@ object ConsumerGroupCollector {
GroupPartitionLag(gtp, offsetLag, timeLag)
}

for((group, values) <- groupLag.groupBy(_.gtp.id)) {
val maxOffsetLag = values.maxBy(_.offsetLag)
val maxTimeLag = values.maxBy(_.timeLag)
for((group, groupValues) <- groupLag.groupBy(_.gtp.id)) {
val maxOffsetLag = groupValues.maxBy(_.offsetLag)
val maxTimeLag = groupValues.maxBy(_.timeLag)

reporter ! Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, config.cluster.name, group, maxOffsetLag.offsetLag)
reporter ! Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, config.cluster.name, group, maxTimeLag.timeLag)

val sumOffsetLag = groupValues.map(_.offsetLag).sum
reporter ! Metrics.GroupValueMessage(Metrics.SumGroupOffsetLagMetric, config.cluster.name, group, sumOffsetLag)

for((topic, topicValues) <- groupValues.groupBy(_.gtp.topic)) {
val topicOffsetLag = topicValues.map(_.offsetLag).sum

reporter ! Metrics.GroupTopicValueMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic, topicOffsetLag)
}
}
}

Expand Down Expand Up @@ -269,12 +278,18 @@ object ConsumerGroupCollector {
groups.foreach { group =>
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupOffsetLagMetric, config.cluster.name, group)
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, config.cluster.name, group)
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.SumGroupOffsetLagMetric, config.cluster.name, group)
}
gtps.foreach { gtp =>
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.LastGroupOffsetMetric, config.cluster.name, gtp)
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.OffsetLagMetric, config.cluster.name, gtp)
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.TimeLagMetric, config.cluster.name, gtp)
}

for {
(group, gtps) <- gtps.groupBy(_.id)
topic <- gtps.map(_.topic).distinct
} reporter ! Metrics.GroupTopicRemoveMetricMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic)
}
}
}
37 changes: 34 additions & 3 deletions src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ object Metrics {
def topicPartition: Domain.TopicPartition
override def labels: List[String] =
List(
clusterName,
topicPartition.topic,
clusterName, topicPartition.topic,
topicPartition.partition.toString
)
}
Expand Down Expand Up @@ -54,6 +53,22 @@ object Metrics {
final case class GroupPartitionValueMessage(definition: GaugeDefinition, clusterName: String, gtp: Domain.GroupTopicPartition, value: Double) extends GroupPartitionMessage with MetricValue
final case class GroupPartitionRemoveMetricMessage(definition: GaugeDefinition, clusterName: String, gtp: Domain.GroupTopicPartition) extends GroupPartitionMessage with RemoveMetric

sealed trait GroupTopicMessage extends Message with Metric {
def definition: GaugeDefinition
def clusterName: String
def group: String
def topic: String
override def labels: List[String] =
List(
clusterName,
group,
topic
)
}

final case class GroupTopicValueMessage(definition: GaugeDefinition, clusterName: String, group: String, topic: String, value: Double) extends GroupTopicMessage with MetricValue
final case class GroupTopicRemoveMetricMessage(definition: GaugeDefinition, clusterName: String, group: String, topic: String) extends GroupTopicMessage with RemoveMetric

val topicPartitionLabels = List("cluster_name", "topic", "partition")

val LatestOffsetMetric = GaugeDefinition(
Expand Down Expand Up @@ -82,6 +97,12 @@ object Metrics {
groupLabels
)

val SumGroupOffsetLagMetric = GaugeDefinition(
"kafka_consumergroup_group_sum_lag",
"Sum of group offset lag",
groupLabels
)

val groupPartitionLabels = List("cluster_name", "group", "topic", "partition", "member_host", "consumer_id", "client_id")

val LastGroupOffsetMetric = GaugeDefinition(
Expand All @@ -102,13 +123,23 @@ object Metrics {
groupPartitionLabels
)

val groupTopicLabels = List("cluster_name", "group", "topic")

val SumGroupTopicOffsetLagMetric = GaugeDefinition(
"kafka_consumergroup_group_topic_sum_lag",
"Sum of group offset lag across topic partitions",
groupTopicLabels
)

val definitions = List(
LatestOffsetMetric,
EarliestOffsetMetric,
MaxGroupOffsetLagMetric,
MaxGroupTimeLagMetric,
LastGroupOffsetMetric,
OffsetLagMetric,
TimeLagMetric
TimeLagMetric,
SumGroupOffsetLagMetric,
SumGroupTopicOffsetLagMetric
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi

val metrics = reporter.receiveAll()

"report 7 metrics" in { metrics.length shouldBe 7 }
"report 9 metrics" in { metrics.length shouldBe 9 }

"earliest offset metric" in {
metrics should contain(
Expand Down Expand Up @@ -78,6 +78,14 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
"max group time lag metric" in {
metrics should contain(GroupValueMessage(MaxGroupTimeLagMetric, config.cluster.name, groupId, value = 0.02))
}

"sum group offset lag metric" in {
metrics should contain(GroupValueMessage(SumGroupOffsetLagMetric, config.cluster.name, groupId, value = 20))
}

"sum topic offset lag metric" in {
metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, config.cluster.name, groupId, topic, value = 20))
}
}

"ConsumerGroupCollector should calculate max group metrics and send" - {
Expand Down Expand Up @@ -126,6 +134,57 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
}
}

"ConsumerGroupCollector should sum the group offset lag metrics and send" - {
val reporter = TestInbox[MetricsSink.Message]()

val lookupTable = Table(20)
lookupTable.addPoint(Point(100, 100))

val state = ConsumerGroupCollector.CollectorState(
topicPartitionTables = TopicPartitionTable(config.lookupTableSize, Map(
topicPartition0 -> lookupTable.copy(),
topicPartition1 -> lookupTable.copy(),
topicPartition2 -> lookupTable.copy(),
topic2Partition0 -> lookupTable.copy()
)),
)

val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state)
val testKit = BehaviorTestKit(behavior)

val newEarliestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 0, time = 100),
topicPartition1 -> Point(offset = 0, time = 100),
topicPartition2 -> Point(offset = 0, time = 100),
topic2Partition0 -> Point(offset = 0, time = 100)
)
val newLatestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 100, time = 200),
topicPartition1 -> Point(offset = 100, time = 200),
topicPartition2 -> Point(offset = 100, time = 200),
topic2Partition0 -> Point(offset = 100, time = 200)
)
val newLastGroupOffsets = GroupOffsets(
gtp0 -> Some(Point(offset = 10, time = 200)),
gtp1 -> Some(Point(offset = 20, time = 200)),
gtp2 -> Some(Point(offset = 30, time = 200)),
gt2p0 -> Some(Point(offset = 40, time = 200))
)

testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets))

val metrics = reporter.receiveAll()

"sum of offset lag metric" in {
metrics should contain(GroupValueMessage(SumGroupOffsetLagMetric, clusterName, groupId, value = 300))
}

"sum of offset lag by topic metric" in {
metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic, value = 240))
metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic2, value = 60))
}
}

"ConsumerGroupCollector should report group offset, lag, and time lag as NaN when no group offsets found" - {
val reporter = TestInbox[MetricsSink.Message]()

Expand Down Expand Up @@ -188,6 +247,12 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
metrics should contain(
Metrics.TopicPartitionValueMessage(LatestOffsetMetric, config.cluster.name, topicPartition0, value = 200))
}

"topic offset lag metric" in {
metrics.collectFirst {
case GroupTopicValueMessage(`SumGroupTopicOffsetLagMetric`, config.cluster.name, `groupId`, `topic`, value) if value.isNaN => true
}.nonEmpty shouldBe true
}
}

"ConsumerGroupCollector should evict data when group metadata changes" - {
Expand Down Expand Up @@ -252,6 +317,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
metrics should contain(GroupPartitionRemoveMetricMessage(TimeLagMetric, clusterName, gtpSingleMember))
metrics should contain(GroupRemoveMetricMessage(MaxGroupTimeLagMetric, clusterName, groupId))
metrics should contain(GroupRemoveMetricMessage(MaxGroupOffsetLagMetric, clusterName, groupId))
metrics should contain(GroupTopicRemoveMetricMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic))
}

"remove metrics for topic partitions no longer being reported" - {
Expand Down Expand Up @@ -284,6 +350,8 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
metrics should contain(GroupPartitionRemoveMetricMessage(TimeLagMetric, clusterName, gtpSingleMember))
metrics should contain(GroupRemoveMetricMessage(MaxGroupTimeLagMetric, clusterName, groupId))
metrics should contain(GroupRemoveMetricMessage(MaxGroupOffsetLagMetric, clusterName, groupId))
metrics should contain(GroupRemoveMetricMessage(SumGroupOffsetLagMetric, clusterName, groupId))
metrics should contain(GroupTopicRemoveMetricMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic))
}

"topic partition in topic partition table removed" in {
Expand Down

0 comments on commit de1bb6c

Please sign in to comment.