Skip to content

Commit

Permalink
Remove participant-side command deduplication [DPP-848] (digital-asse…
Browse files Browse the repository at this point in the history
…t#12677)

* Remove participant-side command deduplication

changelog_begin
changelog_end

* Addressed review comments
  • Loading branch information
tudor-da authored Feb 1, 2022
1 parent 85f2690 commit 5390505
Show file tree
Hide file tree
Showing 59 changed files with 47 additions and 1,204 deletions.
23 changes: 0 additions & 23 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ final class Metrics(val registry: MetricRegistry) {

val failedCommandInterpretations: Meter =
registry.meter(Prefix :+ "failed_command_interpretations")
val deduplicatedCommands: Meter =
registry.meter(Prefix :+ "deduplicated_commands")
val delayedSubmissions: Meter =
registry.meter(Prefix :+ "delayed_submissions")
val validSubmissions: Meter =
Expand Down Expand Up @@ -367,11 +365,6 @@ final class Metrics(val registry: MetricRegistry) {
val listLfPackages: Timer = registry.timer(Prefix :+ "list_lf_packages")
val getLfArchive: Timer = registry.timer(Prefix :+ "get_lf_archive")
val getLfPackage: Timer = registry.timer(Prefix :+ "get_lf_package")
val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command")
val removeExpiredDeduplicationData: Timer =
registry.timer(Prefix :+ "remove_expired_deduplication_data")
val stopDeduplicatingCommand: Timer =
registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")
val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering")

Expand Down Expand Up @@ -429,11 +422,6 @@ final class Metrics(val registry: MetricRegistry) {
val listKnownParties: Timer = registry.timer(Prefix :+ "list_known_parties")
val listLfPackages: Timer = registry.timer(Prefix :+ "list_lf_packages")
val getLfArchive: Timer = registry.timer(Prefix :+ "get_lf_archive")
val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command")
val removeExpiredDeduplicationData: Timer =
registry.timer(Prefix :+ "remove_expired_deduplication_data")
val stopDeduplicatingCommand: Timer =
registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")

private val createDbMetrics: String => DatabaseMetrics =
Expand Down Expand Up @@ -497,15 +485,6 @@ final class Metrics(val registry: MetricRegistry) {
"store_package_entry"
) // FIXME Base name conflicts with storePackageEntry
val loadPackageEntries: DatabaseMetrics = createDbMetrics("load_package_entries")
val deduplicateCommandDbMetrics: DatabaseMetrics = createDbMetrics(
"deduplicate_command"
) // FIXME Base name conflicts with deduplicateCommand
val removeExpiredDeduplicationDataDbMetrics: DatabaseMetrics = createDbMetrics(
"remove_expired_deduplication_data"
) // FIXME Base name conflicts with removeExpiredDeduplicationData
val stopDeduplicatingCommandDbMetrics: DatabaseMetrics = createDbMetrics(
"stop_deduplicating_command"
) // FIXME Base name conflicts with stopDeduplicatingCommand
val pruneDbMetrics: DatabaseMetrics = createDbMetrics(
"prune"
) // FIXME Base name conflicts with prune
Expand Down Expand Up @@ -679,8 +658,6 @@ final class Metrics(val registry: MetricRegistry) {
val partyEntries: Timer = registry.timer(Prefix :+ "party_entries")
val lookupConfiguration: Timer = registry.timer(Prefix :+ "lookup_configuration")
val configurationEntries: Timer = registry.timer(Prefix :+ "configuration_entries")
val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command")
val stopDeduplicateCommand: Timer = registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")
val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering")

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5cfddb9932a7ef4ea3ba7439cf9861110181fdfd97709df0fd21af8a89e9ce76
005e1b4b85ad740cbf402f7d95eb88a15baefbe8d0c3d885639ee9a907399193
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,6 @@ CREATE INDEX idx_party_entries ON party_entries (submission_id);
CREATE INDEX idx_party_entries_party_and_ledger_offset ON party_entries(party, ledger_offset);
CREATE INDEX idx_party_entries_party_id_and_ledger_offset ON party_entries(party_id, ledger_offset);

---------------------------------------------------------------------------------------------------
-- Submissions table
---------------------------------------------------------------------------------------------------
CREATE TABLE participant_command_submissions (
deduplication_key VARCHAR PRIMARY KEY NOT NULL,
deduplicate_until BIGINT NOT NULL
);

---------------------------------------------------------------------------------------------------
-- Completions table
---------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
f0518b9fdf84752d0b47a57aa9725a2b4ef820bd44ba79f2a82482d089e1c5fc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Participant-side deduplication not supported anymore
DROP TABLE participant_command_submissions PURGE;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e61171ce37fbf7e81ef7c24bd4773bd5ca0536109f5048e9140f69f80dc893f8
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Participant-side deduplication not supported anymore
DROP TABLE participant_command_submissions;
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import com.daml.platform.configuration.{
CommandConfiguration,
InitialLedgerConfiguration,
PartyConfiguration,
SubmissionConfiguration,
}
import com.daml.platform.server.api.services.domain.CommandCompletionService
import com.daml.platform.server.api.services.grpc.{GrpcHealthService, GrpcTransactionService}
Expand Down Expand Up @@ -79,7 +78,6 @@ private[daml] object ApiServices {
initialLedgerConfiguration: Option[InitialLedgerConfiguration],
commandConfig: CommandConfiguration,
partyConfig: PartyConfiguration,
submissionConfig: SubmissionConfiguration,
optTimeServiceBackend: Option[TimeServiceBackend],
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
Expand All @@ -104,7 +102,6 @@ private[daml] object ApiServices {
private val completionsService: IndexCompletionsService = indexService
private val partyManagementService: IndexPartyManagementService = indexService
private val configManagementService: IndexConfigManagementService = indexService
private val submissionService: IndexSubmissionService = indexService
private val meteringStore: MeteringStore = indexService

private val configurationInitializer = new LedgerConfigurationInitializer(
Expand Down Expand Up @@ -260,7 +257,6 @@ private[daml] object ApiServices {
val apiSubmissionService = ApiSubmissionService.create(
ledgerId,
writeService,
submissionService,
partyManagementService,
timeProvider,
timeProviderType,
Expand All @@ -269,8 +265,7 @@ private[daml] object ApiServices {
commandExecutor,
checkOverloaded,
ApiSubmissionService.Configuration(
partyConfig.implicitPartyAllocation,
submissionConfig.enableDeduplication,
partyConfig.implicitPartyAllocation
),
metrics,
errorsVersionsSwitcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@ package com.daml.platform.apiserver
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain.{
CommandId,
ConfigurationEntry,
LedgerId,
LedgerOffset,
TransactionId,
}
import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerId, LedgerOffset, TransactionId}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
Expand Down Expand Up @@ -181,20 +175,6 @@ private[daml] final class SpannedIndexService(delegate: IndexService) extends In
): Source[(LedgerOffset.Absolute, ConfigurationEntry), NotUsed] =
delegate.configurationEntries(startExclusive)

override def deduplicateCommand(
commandId: CommandId,
submitter: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[v2.CommandDeduplicationResult] =
delegate.deduplicateCommand(commandId, submitter, submittedAt, deduplicateUntil)

override def stopDeduplicatingCommand(
commandId: CommandId,
submitter: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit] =
delegate.stopDeduplicatingCommand(commandId, submitter)

override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ import com.daml.lf.data.Ref
import com.daml.lf.engine.Engine
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.configuration.{
CommandConfiguration,
PartyConfiguration,
SubmissionConfiguration,
}
import com.daml.platform.configuration.{CommandConfiguration, PartyConfiguration}
import com.daml.platform.services.time.TimeProviderType
import com.daml.platform.usermanagement.UserManagementConfig
import com.daml.ports.{Port, PortFiles}
Expand All @@ -47,7 +43,6 @@ object StandaloneApiServer {
config: ApiServerConfig,
commandConfig: CommandConfiguration,
partyConfig: PartyConfiguration,
submissionConfig: SubmissionConfiguration,
optWriteService: Option[state.WriteService],
authService: AuthService,
healthChecks: HealthChecks,
Expand Down Expand Up @@ -112,7 +107,6 @@ object StandaloneApiServer {
initialLedgerConfiguration = config.initialLedgerConfiguration,
commandConfig = commandConfig,
partyConfig = partyConfig,
submissionConfig = submissionConfig,
optTimeServiceBackend = timeServiceBackend,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,7 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.{
CommandId,
ConfigurationEntry,
LedgerId,
LedgerOffset,
TransactionId,
}
import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerId, LedgerOffset, TransactionId}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
Expand Down Expand Up @@ -202,26 +196,6 @@ private[daml] final class TimedIndexService(delegate: IndexService, metrics: Met
delegate.configurationEntries(startExclusive),
)

override def deduplicateCommand(
commandId: CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[v2.CommandDeduplicationResult] =
Timed.future(
metrics.daml.services.index.deduplicateCommand,
delegate.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil),
)

override def stopDeduplicatingCommand(
commandId: CommandId,
submitters: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit] =
Timed.future(
metrics.daml.services.index.stopDeduplicateCommand,
delegate.stopDeduplicatingCommand(commandId, submitters),
)

override def prune(
pruneUpToInclusive: Offset,
pruneAllDivulgedContracts: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.daml.error.{
}
import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2._
import com.daml.ledger.participant.state.{v2 => state}
Expand Down Expand Up @@ -45,15 +45,13 @@ import io.grpc.{Status, StatusRuntimeException}
import scala.annotation.nowarn
import scala.jdk.FutureConverters.CompletionStageOps
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

private[apiserver] object ApiSubmissionService {

def create(
ledgerId: LedgerId,
writeService: state.WriteService,
submissionService: IndexSubmissionService,
partyManagementService: IndexPartyManagementService,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
Expand All @@ -71,7 +69,6 @@ private[apiserver] object ApiSubmissionService {
new GrpcCommandSubmissionService(
service = new ApiSubmissionService(
writeService,
submissionService,
partyManagementService,
timeProvider,
timeProviderType,
Expand All @@ -94,15 +91,13 @@ private[apiserver] object ApiSubmissionService {
)

final case class Configuration(
implicitPartyAllocation: Boolean,
enableDeduplication: Boolean,
implicitPartyAllocation: Boolean
)

}

private[apiserver] final class ApiSubmissionService private[services] (
writeService: state.WriteService,
submissionService: IndexSubmissionService,
partyManagementService: IndexPartyManagementService,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
Expand Down Expand Up @@ -137,16 +132,8 @@ private[apiserver] final class ApiSubmissionService private[services] (
val evaluatedCommand = ledgerConfigurationSubscription
.latestConfiguration() match {
case Some(ledgerConfiguration) =>
if (writeService.isApiDeduplicationEnabled && configuration.enableDeduplication) {
deduplicateAndRecordOnLedger(
seedService.nextSeed(),
request.commands,
ledgerConfiguration,
)
} else {
evaluateAndSubmit(seedService.nextSeed(), request.commands, ledgerConfiguration)
.transform(handleSubmissionResult)
}
evaluateAndSubmit(seedService.nextSeed(), request.commands, ledgerConfiguration)
.transform(handleSubmissionResult)
case None =>
Future.failed(
errorFactories.missingLedgerConfig(Status.Code.UNAVAILABLE)(definiteAnswer =
Expand All @@ -157,47 +144,6 @@ private[apiserver] final class ApiSubmissionService private[services] (
evaluatedCommand.andThen(logger.logErrorsOnCall[Unit])
}

private def deduplicateAndRecordOnLedger(
seed: crypto.Hash,
commands: ApiCommands,
ledgerConfig: Configuration,
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[Unit] =
Future
.fromTry(
DeduplicationPeriod.deduplicateUntil(
commands.submittedAt,
commands.deduplicationPeriod,
)
)
.flatMap(deduplicateUntil =>
submissionService
.deduplicateCommand(
commands.commandId,
commands.actAs.toList,
commands.submittedAt,
deduplicateUntil,
)
.flatMap {
case CommandDeduplicationNew =>
evaluateAndSubmit(seed, commands, ledgerConfig)
.transform(handleSubmissionResult)
.recoverWith { case NonFatal(originalCause) =>
submissionService
.stopDeduplicatingCommand(commands.commandId, commands.actAs.toList)
.transform(_ => Failure(originalCause))
}
case _: CommandDeduplicationDuplicate =>
metrics.daml.commands.deduplicatedCommands.mark()
Future.failed(
errorFactories.duplicateCommandException(None)
)
}
)

private def handleSubmissionResult(result: Try[state.SubmissionResult])(implicit
loggingContext: LoggingContext
): Try[Unit] = {
Expand Down Expand Up @@ -370,5 +316,4 @@ private[apiserver] final class ApiSubmissionService private[services] (
)

override def close(): Unit = ()

}

This file was deleted.

Loading

0 comments on commit 5390505

Please sign in to comment.