Skip to content

Commit

Permalink
Instrument the executor services using OpenTelemetry [PLEN-105] (digi…
Browse files Browse the repository at this point in the history
nicu-da authored Feb 17, 2023
1 parent d3feb57 commit af44452
Showing 16 changed files with 338 additions and 202 deletions.
Original file line number Diff line number Diff line change
@@ -279,8 +279,5 @@ class MainIndexDBMetrics(prefix: MetricName, factory: MetricsFactory)

val connection: MetricName = prefix :+ "connection"

val instrumentedExecutorServiceForDocs = new InstrumentedExecutorServiceForDocs(
connection :+ "<server_role>"
)
}
}
Original file line number Diff line number Diff line change
@@ -37,10 +37,6 @@ class LAPIMetrics(val prefix: MetricName, val factory: MetricsFactory) {
object threadpool {
private val prefix: MetricName = LAPIMetrics.this.prefix :+ "threadpool"

val instrumentedExecutorServiceForDocs = new InstrumentedExecutorServiceForDocs(
prefix :+ "<threadpool>"
)

val apiServices: MetricName = prefix :+ "api-services"

val inMemoryFanOut: MetricName = prefix :+ "in_memory_fan_out"
Original file line number Diff line number Diff line change
@@ -8,10 +8,6 @@ import com.daml.metrics.api.MetricHandle.{Counter, MetricsFactory, Histogram, Ti
import com.daml.metrics.api.dropwizard.DropwizardTimer
import com.daml.metrics.api.{MetricDoc, MetricName}

@MetricDoc.GroupTag(
representative = "daml.parallel_indexer.<stage>.executor",
groupableClass = classOf[InstrumentedExecutorServiceForDocs],
)
@MetricDoc.GroupTag(
representative = "daml.parallel_indexer.<stage>",
groupableClass = classOf[DatabaseMetrics],
@@ -55,7 +51,6 @@ class ParallelIndexerMetrics(prefix: MetricName, factory: MetricsFactory) {

// Bundle of metrics coming from instrumentation of the underlying thread-pool
val executor: MetricName = prefix :+ "executor"
val instrumentedExecutorServiceForDocs = new InstrumentedExecutorServiceForDocs(executor)

@MetricDoc.Tag(
summary = "The batch sizes in the indexer.",
@@ -73,7 +68,6 @@ class ParallelIndexerMetrics(prefix: MetricName, factory: MetricsFactory) {

// Bundle of metrics coming from instrumentation of the underlying thread-pool
val executor: MetricName = prefix :+ "executor"
val instrumentedExecutorServiceForDocs = new InstrumentedExecutorServiceForDocs(executor)
}

// Sequence Mapping stage
Original file line number Diff line number Diff line change
@@ -92,15 +92,13 @@ private[platform] object InMemoryStateUpdater {
InstrumentedExecutors.newWorkStealingExecutor(
metrics.daml.lapi.threadpool.indexBypass.prepareUpdates,
prepareUpdatesParallelism,
metrics.registry,
metrics.executorServiceMetrics,
)
)
updateCachesExecutor <- ResourceOwner.forExecutorService(() =>
InstrumentedExecutors.newFixedThreadPool(
metrics.daml.lapi.threadpool.indexBypass.updateInMemoryState,
1,
metrics.registry,
metrics.executorServiceMetrics,
)
)
Original file line number Diff line number Diff line change
@@ -187,7 +187,6 @@ final class IndexServiceOwner(
InstrumentedExecutors.newWorkStealingExecutor(
metrics.daml.lapi.threadpool.inMemoryFanOut.toString,
threadPoolSize,
metrics.registry,
metrics.executorServiceMetrics,
)
)
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@

package com.daml.platform.indexer.parallel

import com.codahale.metrics.MetricRegistry
import com.daml.executors.InstrumentedExecutors
import com.daml.ledger.resources.ResourceOwner
import com.daml.logging.{ContextualizedLogger, LoggingContext}
@@ -30,18 +29,17 @@ object AsyncSupport {
def asyncPool(
size: Int,
namePrefix: String,
withMetric: (MetricName, MetricRegistry, ExecutorServiceMetrics),
withMetric: (MetricName, ExecutorServiceMetrics),
)(implicit loggingContext: LoggingContext): ResourceOwner[Executor] =
ResourceOwner
.forExecutorService { () =>
val (executorName, metricRegistry, executorServiceMetrics) = withMetric
val (executorName, executorServiceMetrics) = withMetric
InstrumentedExecutors.newFixedThreadPoolWithFactory(
executorName,
size,
new ThreadFactoryBuilder()
.setNameFormat(s"$namePrefix-%d")
.build,
metricRegistry,
executorServiceMetrics,
throwable =>
ContextualizedLogger
Original file line number Diff line number Diff line change
@@ -48,7 +48,6 @@ object ParallelIndexerFactory {
"input-mapping-pool",
(
metrics.daml.parallelIndexer.inputMapping.executor,
metrics.registry,
metrics.executorServiceMetrics,
),
)
@@ -57,7 +56,6 @@ object ParallelIndexerFactory {
"batching-pool",
(
metrics.daml.parallelIndexer.batching.executor,
metrics.registry,
metrics.executorServiceMetrics,
),
)
@@ -71,7 +69,6 @@ object ParallelIndexerFactory {
"ha-coordinator",
1,
new ThreadFactoryBuilder().setNameFormat("ha-coordinator-%d").build,
metrics.registry,
metrics.executorServiceMetrics,
throwable =>
ContextualizedLogger
Original file line number Diff line number Diff line change
@@ -144,7 +144,6 @@ object DbDispatcher {
logger.error("Uncaught exception in the SQL executor.", e)
)
.build(),
metrics.registry,
metrics.executorServiceMetrics,
)
)
Original file line number Diff line number Diff line change
@@ -241,7 +241,6 @@ object SandboxOnXRunner {
InstrumentedExecutors.newWorkStealingExecutor(
metrics.daml.lapi.threadpool.apiServices,
servicesThreadPoolSize,
metrics.registry,
metrics.executorServiceMetrics,
)
)
1 change: 0 additions & 1 deletion libs-scala/executors/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -14,6 +14,5 @@ da_scala_library(
deps = [
"//libs-scala/scala-utils",
"//observability/metrics",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)
Original file line number Diff line number Diff line change
@@ -3,9 +3,8 @@

package com.daml.executors

import java.util.concurrent.{ExecutorService, ThreadFactory, Executors => JavaExecutors}
import java.util.concurrent.{ThreadFactory, Executors => JavaExecutors}

import com.codahale.metrics.{InstrumentedExecutorService, MetricRegistry}
import com.daml.executors.executors.QueueAwareExecutionContextExecutorService
import com.daml.metrics.ExecutorServiceMetrics

@@ -16,13 +15,12 @@ object InstrumentedExecutors {
def newWorkStealingExecutor(
name: String,
parallelism: Int,
registry: MetricRegistry,
executorServiceMetrics: ExecutorServiceMetrics,
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
): QueueAwareExecutionContextExecutorService = {
val executorService = JavaExecutors.newWorkStealingPool(parallelism)
val executorServiceWithMetrics =
addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService)
val executorServiceWithMetrics = executorServiceMetrics
.monitorExecutorService(name, executorService)
new QueueAwareExecutionContextExecutorService(
executorServiceWithMetrics,
name,
@@ -33,13 +31,12 @@ object InstrumentedExecutors {
def newFixedThreadPool(
name: String,
nThreads: Int,
registry: MetricRegistry,
executorServiceMetrics: ExecutorServiceMetrics,
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
): QueueAwareExecutionContextExecutorService = {
val executorService = JavaExecutors.newFixedThreadPool(nThreads)
val executorServiceWithMetrics =
addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService)
val executorServiceWithMetrics = executorServiceMetrics
.monitorExecutorService(name, executorService)
new QueueAwareExecutionContextExecutorService(
executorServiceWithMetrics,
name,
@@ -51,13 +48,12 @@ object InstrumentedExecutors {
name: String,
nThreads: Int,
threadFactory: ThreadFactory,
registry: MetricRegistry,
executorServiceMetrics: ExecutorServiceMetrics,
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
): QueueAwareExecutionContextExecutorService = {
val executorService = JavaExecutors.newFixedThreadPool(nThreads, threadFactory)
val executorServiceWithMetrics =
addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService)
val executorServiceWithMetrics = executorServiceMetrics
.monitorExecutorService(name, executorService)
new QueueAwareExecutionContextExecutorService(
executorServiceWithMetrics,
name,
@@ -68,29 +64,17 @@ object InstrumentedExecutors {
def newCachedThreadPoolWithFactory(
name: String,
threadFactory: ThreadFactory,
registry: MetricRegistry,
executorServiceMetrics: ExecutorServiceMetrics,
errorReporter: Throwable => Unit = ExecutionContext.defaultReporter,
): QueueAwareExecutionContextExecutorService = {
val executorService = JavaExecutors.newCachedThreadPool(threadFactory)
val executorServiceWithMetrics =
addMetricsToExecutorService(name, registry, executorServiceMetrics, executorService)
val executorServiceWithMetrics = executorServiceMetrics
.monitorExecutorService(name, executorService)
new QueueAwareExecutionContextExecutorService(
executorServiceWithMetrics,
name,
errorReporter,
)
}

private def addMetricsToExecutorService(
name: String,
registry: MetricRegistry,
executorServiceMetrics: ExecutorServiceMetrics,
executorService: ExecutorService,
) = new InstrumentedExecutorService(
executorServiceMetrics
.monitorExecutorService(name, executorService),
registry,
name,
)
}
Loading
Oops, something went wrong.

0 comments on commit af44452

Please sign in to comment.