From 2f4aed450692907d85d4d2d671b1c41e0a9a3e16 Mon Sep 17 00:00:00 2001 From: Miklos <57664299+miklos-da@users.noreply.github.com> Date: Thu, 13 Aug 2020 18:59:56 +0200 Subject: [PATCH] Fix concurrent gauge creation (#7116) * Added failing test. CHANGELOG_BEGIN CHANGELOG_END * Fixed test. * Added missing header. * Reformatted. * Fixed concurrency issue for CacheMetrics as well. * Reworded test case description a bit. * Code tidying. * Use da_scala_test_suite instead. --- ledger/metrics/BUILD.bazel | 12 +++++++++ .../scala/com/daml/metrics/CacheMetrics.scala | 9 ++++--- .../main/scala/com/daml/metrics/Metrics.scala | 11 +++++--- .../com/daml/metrics/CacheMetricsSpec.scala | 27 +++++++++++++++++++ .../scala/com/daml/metrics/MetricsSpec.scala | 24 +++++++++++++++++ 5 files changed, 75 insertions(+), 8 deletions(-) create mode 100644 ledger/metrics/src/test/scala/com/daml/metrics/CacheMetricsSpec.scala create mode 100644 ledger/metrics/src/test/scala/com/daml/metrics/MetricsSpec.scala diff --git a/ledger/metrics/BUILD.bazel b/ledger/metrics/BUILD.bazel index 601d0ef5c313..95c581a3a282 100644 --- a/ledger/metrics/BUILD.bazel +++ b/ledger/metrics/BUILD.bazel @@ -4,6 +4,7 @@ load( "//bazel_tools:scala.bzl", "da_scala_library", + "da_scala_test_suite", ) da_scala_library( @@ -23,3 +24,14 @@ da_scala_library( "@maven//:io_dropwizard_metrics_metrics_jvm", ], ) + +da_scala_test_suite( + name = "metrics-tests", + size = "small", + srcs = glob(["src/test/scala/**/*.scala"]), + deps = [ + ":metrics", + "@maven//:io_dropwizard_metrics_metrics_core", + "@maven//:org_scalatest_scalatest_2_12", + ], +) diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/CacheMetrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/CacheMetrics.scala index 9241227842d7..afdb848dc96b 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/CacheMetrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/CacheMetrics.scala @@ -23,8 +23,9 @@ final class CacheMetrics( def registerWeightGauge(weightGauge: Gauge[Long]): Unit = register(prefix :+ "weight", () => weightGauge) - private def register[T](name: MetricName, gaugeSupplier: MetricSupplier[Gauge[_]]): Gauge[T] = { - registry.remove(name) - registry.gauge(name, gaugeSupplier).asInstanceOf[Gauge[T]] - } + private def register[T](name: MetricName, gaugeSupplier: MetricSupplier[Gauge[_]]): Gauge[T] = + registry.synchronized { + registry.remove(name) + registry.gauge(name, gaugeSupplier).asInstanceOf[Gauge[T]] + } } diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index 365e29ba3ae5..b26f7e4d901c 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -8,10 +8,13 @@ import com.codahale.metrics._ final class Metrics(val registry: MetricRegistry) { - private def gauge[T](name: MetricName, metricSupplier: MetricSupplier[Gauge[_]]): Gauge[T] = { - registry.remove(name) - registry.gauge(name, metricSupplier).asInstanceOf[Gauge[T]] - } + private[metrics] def gauge[T]( + name: MetricName, + metricSupplier: MetricSupplier[Gauge[_]]): Gauge[T] = + registry.synchronized { + registry.remove(name) + registry.gauge(name, metricSupplier).asInstanceOf[Gauge[T]] + } object test { private val Prefix: MetricName = MetricName("test") diff --git a/ledger/metrics/src/test/scala/com/daml/metrics/CacheMetricsSpec.scala b/ledger/metrics/src/test/scala/com/daml/metrics/CacheMetricsSpec.scala new file mode 100644 index 000000000000..5c5e050e6c3f --- /dev/null +++ b/ledger/metrics/src/test/scala/com/daml/metrics/CacheMetricsSpec.scala @@ -0,0 +1,27 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.metrics + +import com.codahale.metrics.MetricRegistry +import org.scalatest.{AsyncWordSpec, Matchers} + +import scala.concurrent.{ExecutionContext, Future} + +class CacheMetricsSpec extends AsyncWordSpec with Matchers { + "gauge registrations" should { + "succeed on multiple threads in parallel for the same metric registry" in { + val cacheMetrics = new CacheMetrics(new MetricRegistry, MetricName.DAML) + implicit val executionContext: ExecutionContext = ExecutionContext.global + val instances = + (1 to 1000).map(_ => + Future { + cacheMetrics.registerSizeGauge(() => 1L) + cacheMetrics.registerWeightGauge(() => 2L) + }) + Future.sequence(instances).map { _ => + succeed + } + } + } +} diff --git a/ledger/metrics/src/test/scala/com/daml/metrics/MetricsSpec.scala b/ledger/metrics/src/test/scala/com/daml/metrics/MetricsSpec.scala new file mode 100644 index 000000000000..65db7c555b37 --- /dev/null +++ b/ledger/metrics/src/test/scala/com/daml/metrics/MetricsSpec.scala @@ -0,0 +1,24 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.metrics + +import com.codahale.metrics.MetricRegistry +import org.scalatest.{AsyncWordSpec, Matchers} + +import scala.concurrent.{ExecutionContext, Future} + +class MetricsSpec extends AsyncWordSpec with Matchers { + "gauge registration" should { + "succeed on multiple threads in parallel for the same metric name" in { + val metrics = new Metrics(new MetricRegistry) + implicit val executionContext: ExecutionContext = ExecutionContext.global + val metricName = MetricName.DAML :+ "a" :+ "test" + val instances = + (1 to 1000).map(_ => Future(metrics.gauge[Double](metricName, () => () => 1.0))) + Future.sequence(instances).map { _ => + succeed + } + } + } +}