Skip to content

Commit

Permalink
Fix concurrent gauge creation (#7116)
Browse files Browse the repository at this point in the history
* Added failing test.
CHANGELOG_BEGIN
CHANGELOG_END

* Fixed test.

* Added missing header.

* Reformatted.

* Fixed concurrency issue for CacheMetrics as well.

* Reworded test case description a bit.

* Code tidying.

* Use da_scala_test_suite instead.
  • Loading branch information
miklos-da authored Aug 13, 2020
1 parent bed52a0 commit 2f4aed4
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 8 deletions.
12 changes: 12 additions & 0 deletions ledger/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
load(
"//bazel_tools:scala.bzl",
"da_scala_library",
"da_scala_test_suite",
)

da_scala_library(
Expand All @@ -23,3 +24,14 @@ da_scala_library(
"@maven//:io_dropwizard_metrics_metrics_jvm",
],
)

da_scala_test_suite(
name = "metrics-tests",
size = "small",
srcs = glob(["src/test/scala/**/*.scala"]),
deps = [
":metrics",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:org_scalatest_scalatest_2_12",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ final class CacheMetrics(
def registerWeightGauge(weightGauge: Gauge[Long]): Unit =
register(prefix :+ "weight", () => weightGauge)

private def register[T](name: MetricName, gaugeSupplier: MetricSupplier[Gauge[_]]): Gauge[T] = {
registry.remove(name)
registry.gauge(name, gaugeSupplier).asInstanceOf[Gauge[T]]
}
private def register[T](name: MetricName, gaugeSupplier: MetricSupplier[Gauge[_]]): Gauge[T] =
registry.synchronized {
registry.remove(name)
registry.gauge(name, gaugeSupplier).asInstanceOf[Gauge[T]]
}
}
11 changes: 7 additions & 4 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import com.codahale.metrics._

final class Metrics(val registry: MetricRegistry) {

private def gauge[T](name: MetricName, metricSupplier: MetricSupplier[Gauge[_]]): Gauge[T] = {
registry.remove(name)
registry.gauge(name, metricSupplier).asInstanceOf[Gauge[T]]
}
private[metrics] def gauge[T](
name: MetricName,
metricSupplier: MetricSupplier[Gauge[_]]): Gauge[T] =
registry.synchronized {
registry.remove(name)
registry.gauge(name, metricSupplier).asInstanceOf[Gauge[T]]
}

object test {
private val Prefix: MetricName = MetricName("test")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.metrics

import com.codahale.metrics.MetricRegistry
import org.scalatest.{AsyncWordSpec, Matchers}

import scala.concurrent.{ExecutionContext, Future}

class CacheMetricsSpec extends AsyncWordSpec with Matchers {
"gauge registrations" should {
"succeed on multiple threads in parallel for the same metric registry" in {
val cacheMetrics = new CacheMetrics(new MetricRegistry, MetricName.DAML)
implicit val executionContext: ExecutionContext = ExecutionContext.global
val instances =
(1 to 1000).map(_ =>
Future {
cacheMetrics.registerSizeGauge(() => 1L)
cacheMetrics.registerWeightGauge(() => 2L)
})
Future.sequence(instances).map { _ =>
succeed
}
}
}
}
24 changes: 24 additions & 0 deletions ledger/metrics/src/test/scala/com/daml/metrics/MetricsSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.metrics

import com.codahale.metrics.MetricRegistry
import org.scalatest.{AsyncWordSpec, Matchers}

import scala.concurrent.{ExecutionContext, Future}

class MetricsSpec extends AsyncWordSpec with Matchers {
"gauge registration" should {
"succeed on multiple threads in parallel for the same metric name" in {
val metrics = new Metrics(new MetricRegistry)
implicit val executionContext: ExecutionContext = ExecutionContext.global
val metricName = MetricName.DAML :+ "a" :+ "test"
val instances =
(1 to 1000).map(_ => Future(metrics.gauge[Double](metricName, () => () => 1.0)))
Future.sequence(instances).map { _ =>
succeed
}
}
}
}

0 comments on commit 2f4aed4

Please sign in to comment.