From 5390505627d89a5a56e42ba237b08504123f325e Mon Sep 17 00:00:00 2001 From: tudor-da Date: Tue, 1 Feb 2022 21:50:25 +0100 Subject: [PATCH] Remove participant-side command deduplication [DPP-848] (#12677) * Remove participant-side command deduplication changelog_begin changelog_end * Addressed review comments --- .../main/scala/com/daml/metrics/Metrics.scala | 23 ---- .../V1__Append_only_schema.sha256 | 2 +- .../V1__Append_only_schema.sql | 8 -- ...ticipant_side_command_deduplication.sha256 | 1 + ...participant_side_command_deduplication.sql | 5 + ...ticipant_side_command_deduplication.sha256 | 1 + ...participant_side_command_deduplication.sql | 5 + .../platform/apiserver/ApiServices.scala | 7 +- .../apiserver/SpannedIndexService.scala | 22 +--- .../apiserver/StandaloneApiServer.scala | 8 +- .../apiserver/TimedIndexService.scala | 28 +---- .../services/ApiSubmissionService.scala | 63 +--------- .../SubmissionConfiguration.scala | 23 ---- .../index/LedgerBackedIndexService.scala | 16 --- .../index/MeteredReadOnlyLedger.scala | 37 +----- .../platform/index/ReadOnlySqlLedger.scala | 35 +----- .../scala/platform/store/BaseLedger.scala | 22 +--- .../scala/platform/store/ReadOnlyLedger.scala | 44 +------ .../appendonlydao/DeduplicationKeyMaker.scala | 23 ---- .../store/appendonlydao/JdbcLedgerDao.scala | 58 +-------- .../store/appendonlydao/LedgerDao.scala | 49 +------- .../appendonlydao/MeteredLedgerDao.scala | 36 +----- .../scala/platform/store/backend/DbDto.scala | 2 - .../store/backend/StorageBackend.scala | 11 -- .../store/backend/StorageBackendFactory.scala | 1 - .../store/backend/UpdateToDbDto.scala | 11 +- .../DeduplicationStorageBackendTemplate.scala | 49 -------- .../store/backend/common/Schema.scala | 7 -- .../h2/H2DeduplicationStorageBackend.scala | 49 -------- .../backend/h2/H2ResetStorageBackend.scala | 1 - .../backend/h2/H2StorageBackendFactory.scala | 4 - .../OracleDeduplicationStorageBackend.scala | 52 -------- .../oracle/OracleResetStorageBackend.scala | 1 - .../oracle/OracleStorageBackendFactory.scala | 4 - .../PostgresDeduplicationStorageBackend.scala | 29 ----- .../PostgresResetStorageBackend.scala | 1 - .../PostgresStorageBackendFactory.scala | 4 - .../backend/StorageBackendProvider.scala | 2 - .../store/backend/StorageBackendSuite.scala | 1 - .../StorageBackendTestsDeduplication.scala | 97 -------------- ...dbcLedgerDaoCommandDeduplicationSpec.scala | 104 --------------- .../services/ApiSubmissionServiceSpec.scala | 119 +----------------- .../DbDtoToStringsForInterningSpec.scala | 3 - .../store/backend/UpdateToDbDtoSpec.scala | 19 +-- .../store/dao/DeduplicationKeyMakerSpec.scala | 80 ------------ .../state/index/v2/IndexService.scala | 1 - .../index/v2/IndexSubmissionService.scala | 28 ----- .../participant/state/index/v2/package.scala | 9 -- .../state/v2/metrics/TimedWriteService.scala | 2 - .../state/kvutils/app/Config.scala | 19 +-- .../state/kvutils/app/Runner.scala | 1 - .../api/KeyValueParticipantStateWriter.scala | 1 - ...WriteServiceWithDeduplicationSupport.scala | 3 - .../participant/state/v2/WriteService.scala | 5 - .../sandbox/config/SandboxConfig.scala | 8 +- .../platform/ledger/ConfigConverter.scala | 1 - .../ledger/sandbox/BridgeWriteService.scala | 2 - .../ledger/sandbox/SandboxOnXRunner.scala | 1 - .../scala/platform/sandboxnext/Runner.scala | 3 +- 59 files changed, 47 insertions(+), 1204 deletions(-) create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V11__drop_participant_side_command_deduplication.sha256 create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V11__drop_participant_side_command_deduplication.sql create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V121__drop_participant_side_command_deduplication.sha256 create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V121__drop_participant_side_command_deduplication.sql delete mode 100644 ledger/participant-integration-api/src/main/scala/platform/configuration/SubmissionConfiguration.scala delete mode 100644 ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DeduplicationKeyMaker.scala delete mode 100644 ledger/participant-integration-api/src/main/scala/platform/store/backend/common/DeduplicationStorageBackendTemplate.scala delete mode 100644 ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2DeduplicationStorageBackend.scala delete mode 100644 ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleDeduplicationStorageBackend.scala delete mode 100644 ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresDeduplicationStorageBackend.scala delete mode 100644 ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDeduplication.scala delete mode 100644 ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoCommandDeduplicationSpec.scala delete mode 100644 ledger/participant-integration-api/src/test/suite/scala/platform/store/dao/DeduplicationKeyMakerSpec.scala delete mode 100644 ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexSubmissionService.scala diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index 6bed79aece55..7211cca445ca 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -31,8 +31,6 @@ final class Metrics(val registry: MetricRegistry) { val failedCommandInterpretations: Meter = registry.meter(Prefix :+ "failed_command_interpretations") - val deduplicatedCommands: Meter = - registry.meter(Prefix :+ "deduplicated_commands") val delayedSubmissions: Meter = registry.meter(Prefix :+ "delayed_submissions") val validSubmissions: Meter = @@ -367,11 +365,6 @@ final class Metrics(val registry: MetricRegistry) { val listLfPackages: Timer = registry.timer(Prefix :+ "list_lf_packages") val getLfArchive: Timer = registry.timer(Prefix :+ "get_lf_archive") val getLfPackage: Timer = registry.timer(Prefix :+ "get_lf_package") - val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command") - val removeExpiredDeduplicationData: Timer = - registry.timer(Prefix :+ "remove_expired_deduplication_data") - val stopDeduplicatingCommand: Timer = - registry.timer(Prefix :+ "stop_deduplicating_command") val prune: Timer = registry.timer(Prefix :+ "prune") val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering") @@ -429,11 +422,6 @@ final class Metrics(val registry: MetricRegistry) { val listKnownParties: Timer = registry.timer(Prefix :+ "list_known_parties") val listLfPackages: Timer = registry.timer(Prefix :+ "list_lf_packages") val getLfArchive: Timer = registry.timer(Prefix :+ "get_lf_archive") - val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command") - val removeExpiredDeduplicationData: Timer = - registry.timer(Prefix :+ "remove_expired_deduplication_data") - val stopDeduplicatingCommand: Timer = - registry.timer(Prefix :+ "stop_deduplicating_command") val prune: Timer = registry.timer(Prefix :+ "prune") private val createDbMetrics: String => DatabaseMetrics = @@ -497,15 +485,6 @@ final class Metrics(val registry: MetricRegistry) { "store_package_entry" ) // FIXME Base name conflicts with storePackageEntry val loadPackageEntries: DatabaseMetrics = createDbMetrics("load_package_entries") - val deduplicateCommandDbMetrics: DatabaseMetrics = createDbMetrics( - "deduplicate_command" - ) // FIXME Base name conflicts with deduplicateCommand - val removeExpiredDeduplicationDataDbMetrics: DatabaseMetrics = createDbMetrics( - "remove_expired_deduplication_data" - ) // FIXME Base name conflicts with removeExpiredDeduplicationData - val stopDeduplicatingCommandDbMetrics: DatabaseMetrics = createDbMetrics( - "stop_deduplicating_command" - ) // FIXME Base name conflicts with stopDeduplicatingCommand val pruneDbMetrics: DatabaseMetrics = createDbMetrics( "prune" ) // FIXME Base name conflicts with prune @@ -679,8 +658,6 @@ final class Metrics(val registry: MetricRegistry) { val partyEntries: Timer = registry.timer(Prefix :+ "party_entries") val lookupConfiguration: Timer = registry.timer(Prefix :+ "lookup_configuration") val configurationEntries: Timer = registry.timer(Prefix :+ "configuration_entries") - val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command") - val stopDeduplicateCommand: Timer = registry.timer(Prefix :+ "stop_deduplicating_command") val prune: Timer = registry.timer(Prefix :+ "prune") val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering") diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 index 2086ee864e89..6954909e104d 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -5cfddb9932a7ef4ea3ba7439cf9861110181fdfd97709df0fd21af8a89e9ce76 +005e1b4b85ad740cbf402f7d95eb88a15baefbe8d0c3d885639ee9a907399193 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql index 0efb816f2e10..e20aff8b12a2 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql @@ -124,14 +124,6 @@ CREATE INDEX idx_party_entries ON party_entries (submission_id); CREATE INDEX idx_party_entries_party_and_ledger_offset ON party_entries(party, ledger_offset); CREATE INDEX idx_party_entries_party_id_and_ledger_offset ON party_entries(party_id, ledger_offset); ---------------------------------------------------------------------------------------------------- --- Submissions table ---------------------------------------------------------------------------------------------------- -CREATE TABLE participant_command_submissions ( - deduplication_key VARCHAR PRIMARY KEY NOT NULL, - deduplicate_until BIGINT NOT NULL -); - --------------------------------------------------------------------------------------------------- -- Completions table --------------------------------------------------------------------------------------------------- diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V11__drop_participant_side_command_deduplication.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V11__drop_participant_side_command_deduplication.sha256 new file mode 100644 index 000000000000..2eff548c3e08 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V11__drop_participant_side_command_deduplication.sha256 @@ -0,0 +1 @@ +f0518b9fdf84752d0b47a57aa9725a2b4ef820bd44ba79f2a82482d089e1c5fc diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V11__drop_participant_side_command_deduplication.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V11__drop_participant_side_command_deduplication.sql new file mode 100644 index 000000000000..7b69adec9d15 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V11__drop_participant_side_command_deduplication.sql @@ -0,0 +1,5 @@ +-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +-- Participant-side deduplication not supported anymore +DROP TABLE participant_command_submissions PURGE; diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V121__drop_participant_side_command_deduplication.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V121__drop_participant_side_command_deduplication.sha256 new file mode 100644 index 000000000000..ce47abbfa1a4 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V121__drop_participant_side_command_deduplication.sha256 @@ -0,0 +1 @@ +e61171ce37fbf7e81ef7c24bd4773bd5ca0536109f5048e9140f69f80dc893f8 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V121__drop_participant_side_command_deduplication.sql b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V121__drop_participant_side_command_deduplication.sql new file mode 100644 index 000000000000..fbea45f11730 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V121__drop_participant_side_command_deduplication.sql @@ -0,0 +1,5 @@ +-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +-- Participant-side deduplication not supported anymore +DROP TABLE participant_command_submissions; diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala index 8bce30de9f17..66d71d78db54 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala @@ -35,7 +35,6 @@ import com.daml.platform.configuration.{ CommandConfiguration, InitialLedgerConfiguration, PartyConfiguration, - SubmissionConfiguration, } import com.daml.platform.server.api.services.domain.CommandCompletionService import com.daml.platform.server.api.services.grpc.{GrpcHealthService, GrpcTransactionService} @@ -79,7 +78,6 @@ private[daml] object ApiServices { initialLedgerConfiguration: Option[InitialLedgerConfiguration], commandConfig: CommandConfiguration, partyConfig: PartyConfiguration, - submissionConfig: SubmissionConfiguration, optTimeServiceBackend: Option[TimeServiceBackend], servicesExecutionContext: ExecutionContext, metrics: Metrics, @@ -104,7 +102,6 @@ private[daml] object ApiServices { private val completionsService: IndexCompletionsService = indexService private val partyManagementService: IndexPartyManagementService = indexService private val configManagementService: IndexConfigManagementService = indexService - private val submissionService: IndexSubmissionService = indexService private val meteringStore: MeteringStore = indexService private val configurationInitializer = new LedgerConfigurationInitializer( @@ -260,7 +257,6 @@ private[daml] object ApiServices { val apiSubmissionService = ApiSubmissionService.create( ledgerId, writeService, - submissionService, partyManagementService, timeProvider, timeProviderType, @@ -269,8 +265,7 @@ private[daml] object ApiServices { commandExecutor, checkOverloaded, ApiSubmissionService.Configuration( - partyConfig.implicitPartyAllocation, - submissionConfig.enableDeduplication, + partyConfig.implicitPartyAllocation ), metrics, errorsVersionsSwitcher, diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/SpannedIndexService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/SpannedIndexService.scala index fa37133d2078..3840b5f92667 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/SpannedIndexService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/SpannedIndexService.scala @@ -6,13 +6,7 @@ package com.daml.platform.apiserver import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf -import com.daml.ledger.api.domain.{ - CommandId, - ConfigurationEntry, - LedgerId, - LedgerOffset, - TransactionId, -} +import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerId, LedgerOffset, TransactionId} import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse @@ -181,20 +175,6 @@ private[daml] final class SpannedIndexService(delegate: IndexService) extends In ): Source[(LedgerOffset.Absolute, ConfigurationEntry), NotUsed] = delegate.configurationEntries(startExclusive) - override def deduplicateCommand( - commandId: CommandId, - submitter: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[v2.CommandDeduplicationResult] = - delegate.deduplicateCommand(commandId, submitter, submittedAt, deduplicateUntil) - - override def stopDeduplicatingCommand( - commandId: CommandId, - submitter: List[Ref.Party], - )(implicit loggingContext: LoggingContext): Future[Unit] = - delegate.stopDeduplicatingCommand(commandId, submitter) - override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit loggingContext: LoggingContext ): Future[Unit] = diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala index 1adc49bc25c1..17d8a81c5b28 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala @@ -21,11 +21,7 @@ import com.daml.lf.data.Ref import com.daml.lf.engine.Engine import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics -import com.daml.platform.configuration.{ - CommandConfiguration, - PartyConfiguration, - SubmissionConfiguration, -} +import com.daml.platform.configuration.{CommandConfiguration, PartyConfiguration} import com.daml.platform.services.time.TimeProviderType import com.daml.platform.usermanagement.UserManagementConfig import com.daml.ports.{Port, PortFiles} @@ -47,7 +43,6 @@ object StandaloneApiServer { config: ApiServerConfig, commandConfig: CommandConfiguration, partyConfig: PartyConfiguration, - submissionConfig: SubmissionConfiguration, optWriteService: Option[state.WriteService], authService: AuthService, healthChecks: HealthChecks, @@ -112,7 +107,6 @@ object StandaloneApiServer { initialLedgerConfiguration = config.initialLedgerConfiguration, commandConfig = commandConfig, partyConfig = partyConfig, - submissionConfig = submissionConfig, optTimeServiceBackend = timeServiceBackend, servicesExecutionContext = servicesExecutionContext, metrics = metrics, diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/TimedIndexService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/TimedIndexService.scala index b6a84add4dec..2815b930190b 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/TimedIndexService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/TimedIndexService.scala @@ -7,13 +7,7 @@ import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf import com.daml.ledger.api.domain -import com.daml.ledger.api.domain.{ - CommandId, - ConfigurationEntry, - LedgerId, - LedgerOffset, - TransactionId, -} +import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerId, LedgerOffset, TransactionId} import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse @@ -202,26 +196,6 @@ private[daml] final class TimedIndexService(delegate: IndexService, metrics: Met delegate.configurationEntries(startExclusive), ) - override def deduplicateCommand( - commandId: CommandId, - submitters: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[v2.CommandDeduplicationResult] = - Timed.future( - metrics.daml.services.index.deduplicateCommand, - delegate.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil), - ) - - override def stopDeduplicatingCommand( - commandId: CommandId, - submitters: List[Ref.Party], - )(implicit loggingContext: LoggingContext): Future[Unit] = - Timed.future( - metrics.daml.services.index.stopDeduplicateCommand, - delegate.stopDeduplicatingCommand(commandId, submitters), - ) - override def prune( pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean, diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala index 0644916a9d27..1afff1977358 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala @@ -17,7 +17,7 @@ import com.daml.error.{ } import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands} import com.daml.ledger.api.messages.command.submission.SubmitRequest -import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator} +import com.daml.ledger.api.SubmissionIdGenerator import com.daml.ledger.configuration.Configuration import com.daml.ledger.participant.state.index.v2._ import com.daml.ledger.participant.state.{v2 => state} @@ -45,7 +45,6 @@ import io.grpc.{Status, StatusRuntimeException} import scala.annotation.nowarn import scala.jdk.FutureConverters.CompletionStageOps import scala.concurrent.{ExecutionContext, Future} -import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} private[apiserver] object ApiSubmissionService { @@ -53,7 +52,6 @@ private[apiserver] object ApiSubmissionService { def create( ledgerId: LedgerId, writeService: state.WriteService, - submissionService: IndexSubmissionService, partyManagementService: IndexPartyManagementService, timeProvider: TimeProvider, timeProviderType: TimeProviderType, @@ -71,7 +69,6 @@ private[apiserver] object ApiSubmissionService { new GrpcCommandSubmissionService( service = new ApiSubmissionService( writeService, - submissionService, partyManagementService, timeProvider, timeProviderType, @@ -94,15 +91,13 @@ private[apiserver] object ApiSubmissionService { ) final case class Configuration( - implicitPartyAllocation: Boolean, - enableDeduplication: Boolean, + implicitPartyAllocation: Boolean ) } private[apiserver] final class ApiSubmissionService private[services] ( writeService: state.WriteService, - submissionService: IndexSubmissionService, partyManagementService: IndexPartyManagementService, timeProvider: TimeProvider, timeProviderType: TimeProviderType, @@ -137,16 +132,8 @@ private[apiserver] final class ApiSubmissionService private[services] ( val evaluatedCommand = ledgerConfigurationSubscription .latestConfiguration() match { case Some(ledgerConfiguration) => - if (writeService.isApiDeduplicationEnabled && configuration.enableDeduplication) { - deduplicateAndRecordOnLedger( - seedService.nextSeed(), - request.commands, - ledgerConfiguration, - ) - } else { - evaluateAndSubmit(seedService.nextSeed(), request.commands, ledgerConfiguration) - .transform(handleSubmissionResult) - } + evaluateAndSubmit(seedService.nextSeed(), request.commands, ledgerConfiguration) + .transform(handleSubmissionResult) case None => Future.failed( errorFactories.missingLedgerConfig(Status.Code.UNAVAILABLE)(definiteAnswer = @@ -157,47 +144,6 @@ private[apiserver] final class ApiSubmissionService private[services] ( evaluatedCommand.andThen(logger.logErrorsOnCall[Unit]) } - private def deduplicateAndRecordOnLedger( - seed: crypto.Hash, - commands: ApiCommands, - ledgerConfig: Configuration, - )(implicit - loggingContext: LoggingContext, - telemetryContext: TelemetryContext, - contextualizedErrorLogger: ContextualizedErrorLogger, - ): Future[Unit] = - Future - .fromTry( - DeduplicationPeriod.deduplicateUntil( - commands.submittedAt, - commands.deduplicationPeriod, - ) - ) - .flatMap(deduplicateUntil => - submissionService - .deduplicateCommand( - commands.commandId, - commands.actAs.toList, - commands.submittedAt, - deduplicateUntil, - ) - .flatMap { - case CommandDeduplicationNew => - evaluateAndSubmit(seed, commands, ledgerConfig) - .transform(handleSubmissionResult) - .recoverWith { case NonFatal(originalCause) => - submissionService - .stopDeduplicatingCommand(commands.commandId, commands.actAs.toList) - .transform(_ => Failure(originalCause)) - } - case _: CommandDeduplicationDuplicate => - metrics.daml.commands.deduplicatedCommands.mark() - Future.failed( - errorFactories.duplicateCommandException(None) - ) - } - ) - private def handleSubmissionResult(result: Try[state.SubmissionResult])(implicit loggingContext: LoggingContext ): Try[Unit] = { @@ -370,5 +316,4 @@ private[apiserver] final class ApiSubmissionService private[services] ( ) override def close(): Unit = () - } diff --git a/ledger/participant-integration-api/src/main/scala/platform/configuration/SubmissionConfiguration.scala b/ledger/participant-integration-api/src/main/scala/platform/configuration/SubmissionConfiguration.scala deleted file mode 100644 index 5bf977f8db9a..000000000000 --- a/ledger/participant-integration-api/src/main/scala/platform/configuration/SubmissionConfiguration.scala +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.configuration - -/** Configuration of the Ledger API Command Submission Service - * @param enableDeduplication - * Specifies whether the participant-side command deduplication should be turned on - * when available. By default, the command deduplication is turned on. However, on - * ledgers where the deduplication is implemented by the committer this parameter - * has no impact. - */ -case class SubmissionConfiguration( - enableDeduplication: Boolean -) - -object SubmissionConfiguration { - - lazy val default: SubmissionConfiguration = - SubmissionConfiguration( - enableDeduplication = true - ) -} diff --git a/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala b/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala index aa76c65e2515..117b64d6a937 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala @@ -10,7 +10,6 @@ import com.daml.error.DamlContextualizedErrorLogger import com.daml.ledger.api.domain import com.daml.ledger.api.domain.ConfigurationEntry.Accepted import com.daml.ledger.api.domain.{ - CommandId, LedgerId, LedgerOffset, PackageEntry, @@ -321,21 +320,6 @@ private[platform] final class LedgerBackedIndexService( toAbsolute(offset) -> config.toDomain }) - /** Deduplicate commands */ - override def deduplicateCommand( - commandId: CommandId, - submitters: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] = - ledger.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil) - - override def stopDeduplicatingCommand( - commandId: CommandId, - submitters: List[Ref.Party], - )(implicit loggingContext: LoggingContext): Future[Unit] = - ledger.stopDeduplicatingCommand(commandId, submitters) - /** Participant pruning command */ override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit loggingContext: LoggingContext diff --git a/ledger/participant-integration-api/src/main/scala/platform/index/MeteredReadOnlyLedger.scala b/ledger/participant-integration-api/src/main/scala/platform/index/MeteredReadOnlyLedger.scala index c1210a4a7760..d960793194b3 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/index/MeteredReadOnlyLedger.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/index/MeteredReadOnlyLedger.scala @@ -6,7 +6,7 @@ package com.daml.platform.index import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf.Archive -import com.daml.ledger.api.domain.{CommandId, LedgerId, PartyDetails} +import com.daml.ledger.api.domain.{LedgerId, PartyDetails} import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse @@ -18,11 +18,8 @@ import com.daml.ledger.api.v1.transaction_service.{ } import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset -import com.daml.ledger.participant.state.index.v2.{ - CommandDeduplicationResult, - MeteringStore, - PackageDetails, -} +import com.daml.ledger.participant.state.index.v2.MeteringStore +import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.lf.data.Ref import com.daml.lf.data.Ref.ApplicationId import com.daml.lf.data.Time.Timestamp @@ -167,34 +164,6 @@ private[platform] class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: M )(implicit loggingContext: LoggingContext): Source[(Offset, ConfigurationEntry), NotUsed] = ledger.configurationEntries(startExclusive) - override def deduplicateCommand( - commandId: CommandId, - submitters: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] = - Timed.future( - metrics.daml.index.deduplicateCommand, - ledger.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil), - ) - - override def removeExpiredDeduplicationData(currentTime: Timestamp)(implicit - loggingContext: LoggingContext - ): Future[Unit] = - Timed.future( - metrics.daml.index.removeExpiredDeduplicationData, - ledger.removeExpiredDeduplicationData(currentTime), - ) - - override def stopDeduplicatingCommand( - commandId: CommandId, - submitters: List[Ref.Party], - )(implicit loggingContext: LoggingContext): Future[Unit] = - Timed.future( - metrics.daml.index.stopDeduplicatingCommand, - ledger.stopDeduplicatingCommand(commandId, submitters), - ) - override def prune( pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean, diff --git a/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedger.scala b/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedger.scala index 0420fdd3b94b..f922ff2ca6ee 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedger.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedger.scala @@ -3,10 +3,7 @@ package com.daml.platform.index -import akka.Done -import akka.actor.Cancellable import akka.stream._ -import akka.stream.scaladsl.{Keep, Sink, Source} import com.daml.error.definitions.IndexErrors.IndexDbException import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.health.HealthStatus @@ -14,7 +11,6 @@ import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2.ContractStore import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.lf.data.Ref -import com.daml.lf.data.Time.Timestamp import com.daml.lf.engine.ValueEnricher import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics @@ -38,7 +34,7 @@ import com.daml.resources.ProgramResource.StartupException import com.daml.timer.RetryStrategy import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} private[platform] object ReadOnlySqlLedger { @@ -192,8 +188,7 @@ private[index] abstract class ReadOnlySqlLedger( contractStore: ContractStore, pruneBuffers: PruneBuffers, dispatcher: Dispatcher[Offset], -)(implicit mat: Materializer, loggingContext: LoggingContext) - extends BaseLedger( +) extends BaseLedger( ledgerId, ledgerDao, ledgerDaoTransactionsReader, @@ -201,31 +196,5 @@ private[index] abstract class ReadOnlySqlLedger( pruneBuffers, dispatcher, ) { - - // Periodically remove all expired deduplication cache entries. - // The current approach is not ideal for multiple ReadOnlySqlLedgers sharing - // the same database (as is the case for a horizontally scaled ledger API server). - // In that case, an external process periodically clearing expired entries would be better. - // - // Deduplication entries are added by the submission service, which might use - // a different clock than the current clock (e.g., horizontally scaled ledger API server). - // This is not an issue, because applications are not expected to submit towards the end - // of the deduplication time window. - private val (deduplicationCleanupKillSwitch, deduplicationCleanupDone) = - Source - .tick[Unit](0.millis, 10.minutes, ()) - .mapAsync[Unit](1)(_ => ledgerDao.removeExpiredDeduplicationData(Timestamp.now())) - .viaMat(KillSwitches.single)(Keep.right[Cancellable, UniqueKillSwitch]) - .toMat(Sink.ignore)(Keep.both[UniqueKillSwitch, Future[Done]]) - .run() - override def currentHealth(): HealthStatus = ledgerDao.currentHealth() - - override def close(): Unit = { - deduplicationCleanupKillSwitch.shutdown() - - Await.result(deduplicationCleanupDone, 10.seconds) - - super.close() - } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala b/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala index fa2b2ad5a1a6..b7ae7bdf258b 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala @@ -7,7 +7,7 @@ import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf import com.daml.ledger.api.domain -import com.daml.ledger.api.domain.{CommandId, LedgerId} +import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse @@ -20,7 +20,7 @@ import com.daml.ledger.api.v1.transaction_service.{ import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2 -import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, ContractStore} +import com.daml.ledger.participant.state.index.v2.ContractStore import com.daml.lf.archive.Decode import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp @@ -180,24 +180,6 @@ private[platform] abstract class BaseLedger( ): Source[(Offset, ConfigurationEntry), NotUsed] = dispatcher.startingAt(startExclusive, RangeSource(ledgerDao.getConfigurationEntries)) - override def deduplicateCommand( - commandId: CommandId, - submitters: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] = - ledgerDao.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil) - - override def removeExpiredDeduplicationData(currentTime: Timestamp)(implicit - loggingContext: LoggingContext - ): Future[Unit] = - ledgerDao.removeExpiredDeduplicationData(currentTime) - - override def stopDeduplicatingCommand(commandId: CommandId, submitters: List[Ref.Party])(implicit - loggingContext: LoggingContext - ): Future[Unit] = - ledgerDao.stopDeduplicatingCommand(commandId, submitters) - override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit loggingContext: LoggingContext ): Future[Unit] = { diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/ReadOnlyLedger.scala b/ledger/participant-integration-api/src/main/scala/platform/store/ReadOnlyLedger.scala index 8cb55d44fb21..9908ed720349 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/ReadOnlyLedger.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/ReadOnlyLedger.scala @@ -6,7 +6,7 @@ package com.daml.platform.store import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf.Archive -import com.daml.ledger.api.domain.{CommandId, LedgerId, PartyDetails} +import com.daml.ledger.api.domain.{LedgerId, PartyDetails} import com.daml.ledger.api.health.ReportsHealth import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse @@ -19,7 +19,7 @@ import com.daml.ledger.api.v1.transaction_service.{ import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering -import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails} +import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp import com.daml.lf.language.Ast @@ -125,46 +125,6 @@ private[platform] trait ReadOnlyLedger extends ReportsHealth with AutoCloseable startExclusive: Offset )(implicit loggingContext: LoggingContext): Source[(Offset, ConfigurationEntry), NotUsed] - /** Deduplicates commands. - * Returns CommandDeduplicationNew if this is the first time the command is submitted - * Returns CommandDeduplicationDuplicate if the command was submitted before - * - * Note: The deduplication cache is used by the submission service, - * it does not modify any on-ledger data. - */ - def deduplicateCommand( - commandId: CommandId, - submitters: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] - - /** Stops deduplicating the given command. - * - * Note: The deduplication cache is used by the submission service, - * it does not modify any on-ledger data. - */ - def stopDeduplicatingCommand( - commandId: CommandId, - submitters: List[Ref.Party], - )(implicit loggingContext: LoggingContext): Future[Unit] - - /** Remove all expired deduplication entries. This method has to be called - * periodically to ensure that the deduplication cache does not grow unboundedly. - * - * @param currentTime The current time. This should use the same source of time as - * the `deduplicateUntil` argument of [[deduplicateCommand]]. - * @return when DAO has finished removing expired entries. Clients do not - * need to wait for the operation to finish, it is safe to concurrently - * call deduplicateCommand(). - * - * Note: The deduplication cache is used by the submission service, - * it does not modify any on-ledger data. - */ - def removeExpiredDeduplicationData( - currentTime: Timestamp - )(implicit loggingContext: LoggingContext): Future[Unit] - /** Performs participant ledger pruning up to and including the specified offset. */ def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DeduplicationKeyMaker.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DeduplicationKeyMaker.scala deleted file mode 100644 index 7cb90dec07d3..000000000000 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DeduplicationKeyMaker.scala +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.store.appendonlydao - -import com.daml.ledger.api.domain -import com.daml.lf.data.Ref - -import java.security.MessageDigest -import scalaz.syntax.tag._ - -// TODO append-only: move to store -object DeduplicationKeyMaker { - def make(commandId: domain.CommandId, submitters: List[Ref.Party]): String = - commandId.unwrap + "%" + hashSubmitters(submitters.sorted(Ordering.String).distinct) - - private def hashSubmitters(submitters: List[Ref.Party]): String = { - MessageDigest - .getInstance("SHA-256") - .digest(submitters.mkString.getBytes) - .mkString - } -} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala index e2e97b6ab11c..0a3c6b3df60f 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala @@ -7,18 +7,12 @@ import akka.stream.Materializer import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf.Archive import com.daml.error.DamlContextualizedErrorLogger -import com.daml.ledger.api.domain import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails} import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering -import com.daml.ledger.participant.state.index.v2.{ - CommandDeduplicationDuplicate, - CommandDeduplicationNew, - CommandDeduplicationResult, - PackageDetails, -} +import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.archive.ArchiveParser import com.daml.lf.data.Ref @@ -34,11 +28,7 @@ import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.store._ import com.daml.platform.store.appendonlydao.events._ import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd -import com.daml.platform.store.backend.{ - DeduplicationStorageBackend, - ParameterStorageBackend, - ReadStorageBackend, -} +import com.daml.platform.store.backend.{ParameterStorageBackend, ReadStorageBackend} import com.daml.platform.store.cache.LedgerEndCache import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry} import com.daml.platform.store.interning.StringInterning @@ -64,7 +54,6 @@ private class JdbcLedgerDao( participantId: Ref.ParticipantId, readStorageBackend: ReadStorageBackend, parameterStorageBackend: ParameterStorageBackend, - deduplicationStorageBackend: DeduplicationStorageBackend, errorFactories: ErrorFactories, materializer: Materializer, ) extends LedgerDao { @@ -366,47 +355,6 @@ private class JdbcLedgerDao( } } - override def deduplicateCommand( - commandId: domain.CommandId, - submitters: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] = - dbDispatcher.executeSql(metrics.daml.index.db.deduplicateCommandDbMetrics) { conn => - val key = DeduplicationKeyMaker.make(commandId, submitters) - // Insert a new deduplication entry, or update an expired entry - val updated = deduplicationStorageBackend.upsertDeduplicationEntry( - key = key, - submittedAt = submittedAt, - deduplicateUntil = deduplicateUntil, - )(conn) - - if (updated == 1) { - // New row inserted, this is the first time the command is submitted - CommandDeduplicationNew - } else { - // Deduplication row already exists - CommandDeduplicationDuplicate(deduplicationStorageBackend.deduplicatedUntil(key)(conn)) - } - } - - override def removeExpiredDeduplicationData( - currentTime: Timestamp - )(implicit loggingContext: LoggingContext): Future[Unit] = - dbDispatcher.executeSql(metrics.daml.index.db.removeExpiredDeduplicationDataDbMetrics)( - deduplicationStorageBackend.removeExpiredDeduplicationData(currentTime) - ) - - override def stopDeduplicatingCommand( - commandId: domain.CommandId, - submitters: List[Ref.Party], - )(implicit loggingContext: LoggingContext): Future[Unit] = { - val key = DeduplicationKeyMaker.make(commandId, submitters) - dbDispatcher.executeSql(metrics.daml.index.db.stopDeduplicatingCommandDbMetrics)( - deduplicationStorageBackend.stopDeduplicatingCommand(key) - ) - } - /** Prunes the events and command completions tables. * * @param pruneUpToInclusive Offset up to which to prune archived history inclusively. @@ -652,7 +600,6 @@ private[platform] object JdbcLedgerDao { participantId, dbSupport.storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning), dbSupport.storageBackendFactory.createParameterStorageBackend, - dbSupport.storageBackendFactory.createDeduplicationStorageBackend, errorFactories, materializer = materializer, ), @@ -697,7 +644,6 @@ private[platform] object JdbcLedgerDao { participantId, dbSupport.storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning), dbSupport.storageBackendFactory.createParameterStorageBackend, - dbSupport.storageBackendFactory.createDeduplicationStorageBackend, errorFactories, materializer = materializer, ), diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala index afeedac63714..4d10eaa1cf2d 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala @@ -6,7 +6,7 @@ package com.daml.platform.store.appendonlydao import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf.Archive -import com.daml.ledger.api.domain.{CommandId, LedgerId, ParticipantId, PartyDetails} +import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails} import com.daml.ledger.api.health.ReportsHealth import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse @@ -19,7 +19,7 @@ import com.daml.ledger.api.v1.transaction_service.{ import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering -import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails} +import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp @@ -163,50 +163,6 @@ private[platform] trait LedgerReadDao extends ReportsHealth { endInclusive: Offset, )(implicit loggingContext: LoggingContext): Source[(Offset, PackageLedgerEntry), NotUsed] - /** Deduplicates commands. - * - * @param commandId The command Id - * @param submitters The submitting parties - * @param submittedAt The time when the command was submitted - * @param deduplicateUntil The time until which the command should be deduplicated - * @return whether the command is a duplicate or not - */ - def deduplicateCommand( - commandId: CommandId, - submitters: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] - - /** Remove all expired deduplication entries. This method has to be called - * periodically to ensure that the deduplication cache does not grow unboundedly. - * - * @param currentTime The current time. This should use the same source of time as - * the `deduplicateUntil` argument of [[deduplicateCommand]]. - * - * @return when DAO has finished removing expired entries. Clients do not - * need to wait for the operation to finish, it is safe to concurrently - * call deduplicateCommand(). - */ - def removeExpiredDeduplicationData( - currentTime: Timestamp - )(implicit loggingContext: LoggingContext): Future[Unit] - - /** Stops deduplicating the given command. This method should be called after - * a command is rejected by the submission service, or after a transaction is - * rejected by the ledger. Without removing deduplication entries for failed - * commands, applications would have to wait for the end of the (long) deduplication - * window before they could send a retry. - * - * @param commandId The command Id - * @param submitters The submitting parties - * @return - */ - def stopDeduplicatingCommand( - commandId: CommandId, - submitters: List[Ref.Party], - )(implicit loggingContext: LoggingContext): Future[Unit] - /** Prunes participant events and completions in archived history and remembers largest * pruning offset processed thus far. * @@ -223,7 +179,6 @@ private[platform] trait LedgerReadDao extends ReportsHealth { to: Option[Timestamp], applicationId: Option[Ref.ApplicationId], )(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] - } // TODO sandbox-classic clean-up: This interface and its implementation is only used in the JdbcLedgerDao suite diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala index 3b7b06e7240e..5a33008a3dde 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala @@ -6,15 +6,12 @@ package com.daml.platform.store.appendonlydao import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf.Archive -import com.daml.ledger.api.domain.{CommandId, LedgerId, ParticipantId, PartyDetails} +import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails} import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset -import com.daml.ledger.participant.state.index.v2.{ - CommandDeduplicationResult, - MeteringStore, - PackageDetails, -} +import com.daml.ledger.participant.state.index.v2.MeteringStore +import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref import com.daml.lf.data.Ref.ApplicationId @@ -98,33 +95,6 @@ private[platform] class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: override val completions: LedgerDaoCommandCompletionsReader = ledgerDao.completions - override def deduplicateCommand( - commandId: CommandId, - submitters: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] = - Timed.future( - metrics.daml.index.db.deduplicateCommand, - ledgerDao.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil), - ) - - override def removeExpiredDeduplicationData(currentTime: Timestamp)(implicit - loggingContext: LoggingContext - ): Future[Unit] = - Timed.future( - metrics.daml.index.db.removeExpiredDeduplicationData, - ledgerDao.removeExpiredDeduplicationData(currentTime), - ) - - override def stopDeduplicatingCommand(commandId: CommandId, submitters: List[Ref.Party])(implicit - loggingContext: LoggingContext - ): Future[Unit] = - Timed.future( - metrics.daml.index.db.stopDeduplicatingCommand, - ledgerDao.stopDeduplicatingCommand(commandId, submitters), - ) - override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit loggingContext: LoggingContext ): Future[Unit] = diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala index d11119d319b3..1d3b915dd76b 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala @@ -133,8 +133,6 @@ object DbDto { deduplication_start: Option[Long], ) extends DbDto - final case class CommandDeduplication(deduplication_key: String) extends DbDto - final case class StringInterningDto( internalId: Int, externalString: String, diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala index 2a87d8caa553..056c7be4ad0d 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala @@ -165,17 +165,6 @@ trait PackageStorageBackend { )(connection: Connection): Vector[(Offset, PackageLedgerEntry)] } -trait DeduplicationStorageBackend { - def deduplicatedUntil(deduplicationKey: String)(connection: Connection): Timestamp - def upsertDeduplicationEntry( - key: String, - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(connection: Connection)(implicit loggingContext: LoggingContext): Int - def removeExpiredDeduplicationData(currentTime: Timestamp)(connection: Connection): Unit - def stopDeduplicatingCommand(deduplicationKey: String)(connection: Connection): Unit -} - trait CompletionStorageBackend { def commandCompletions( startExclusive: Offset, diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala index 974dd4557e55..d8dbf2c86f6d 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala @@ -16,7 +16,6 @@ trait StorageBackendFactory { def createConfigurationStorageBackend(ledgerEndCache: LedgerEndCache): ConfigurationStorageBackend def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend - def createDeduplicationStorageBackend: DeduplicationStorageBackend def createCompletionStorageBackend(stringInterning: StringInterning): CompletionStorageBackend def createContractStorageBackend( ledgerEndCache: LedgerEndCache, diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala index 2e1f6c9cbf1a..8e8a894474eb 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala @@ -5,7 +5,6 @@ package com.daml.platform.store.backend import java.util.UUID import com.daml.ledger.api.DeduplicationPeriod.{DeduplicationDuration, DeduplicationOffset} -import com.daml.ledger.api.domain import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.v2.CompletionInfo @@ -15,7 +14,7 @@ import com.daml.lf.engine.Blinding import com.daml.lf.ledger.EventId import com.daml.lf.transaction.Transaction.ChildrenRecursion import com.daml.platform.index.index.StatusDetails -import com.daml.platform.store.appendonlydao.{DeduplicationKeyMaker, JdbcLedgerDao} +import com.daml.platform.store.appendonlydao.JdbcLedgerDao import com.daml.platform.store.appendonlydao.events._ object UpdateToDbDto { @@ -34,13 +33,7 @@ object UpdateToDbDto { rejection_status_message = Some(u.reasonTemplate.message), rejection_status_details = Some(StatusDetails.of(u.reasonTemplate.status.details).toByteArray), - ), - DbDto.CommandDeduplication( - DeduplicationKeyMaker.make( - domain.CommandId(u.completionInfo.commandId), - u.completionInfo.actAs, - ) - ), + ) ) case u: ConfigurationChanged => diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/DeduplicationStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/DeduplicationStorageBackendTemplate.scala deleted file mode 100644 index 88feeab66b93..000000000000 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/DeduplicationStorageBackendTemplate.scala +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.store.backend.common - -import java.sql.Connection -import anorm.RowParser -import com.daml.lf.data.Time.Timestamp -import com.daml.platform.store.Conversions.timestampFromMicros -import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation -import com.daml.platform.store.backend.DeduplicationStorageBackend - -private[backend] trait DeduplicationStorageBackendTemplate extends DeduplicationStorageBackend { - private case class ParsedCommandData(deduplicateUntil: Timestamp) - - private val CommandDataParser: RowParser[ParsedCommandData] = - timestampFromMicros("deduplicate_until") - .map(ParsedCommandData) - - override def deduplicatedUntil(deduplicationKey: String)(connection: Connection): Timestamp = - SQL""" - select deduplicate_until - from participant_command_submissions - where deduplication_key = $deduplicationKey - """ - .as(CommandDataParser.single)(connection) - .deduplicateUntil - - override def removeExpiredDeduplicationData( - currentTime: Timestamp - )(connection: Connection): Unit = { - SQL""" - delete from participant_command_submissions - where deduplicate_until < ${currentTime.micros} - """ - .execute()(connection) - () - } - - override def stopDeduplicatingCommand(deduplicationKey: String)(connection: Connection): Unit = { - SQL""" - delete from participant_command_submissions - where deduplication_key = $deduplicationKey - """ - .execute()(connection) - () - } - -} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala index bcba1935d112..af901b4a3835 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala @@ -283,11 +283,6 @@ private[backend] object AppendOnlySchema { "deduplication_start" -> fieldStrategy.bigintOptional(_ => _.deduplication_start), ) - val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] = - fieldStrategy.delete("participant_command_submissions")( - "deduplication_key" -> fieldStrategy.string(_ => _.deduplication_key) - ) - val stringInterningTable: Table[DbDto.StringInterningDto] = fieldStrategy.insert("string_interning")( "internal_id" -> fieldStrategy.int(_ => _.internalId), @@ -323,7 +318,6 @@ private[backend] object AppendOnlySchema { packages.executeUpdate, partyEntries.executeUpdate, commandCompletions.executeUpdate, - commandSubmissionDeletes.executeUpdate, stringInterningTable.executeUpdate, createFilter.executeUpdate, transactionMetering.executeUpdate, @@ -350,7 +344,6 @@ private[backend] object AppendOnlySchema { packages.prepareData(collect[Package], stringInterning), partyEntries.prepareData(collect[PartyEntry], stringInterning), commandCompletions.prepareData(collect[CommandCompletion], stringInterning), - commandSubmissionDeletes.prepareData(collect[CommandDeduplication], stringInterning), stringInterningTable.prepareData(collect[StringInterningDto], stringInterning), createFilter.prepareData(collect[CreateFilter], stringInterning), transactionMetering.prepareData(collect[TransactionMetering], stringInterning), diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2DeduplicationStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2DeduplicationStorageBackend.scala deleted file mode 100644 index 475f84e824a6..000000000000 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2DeduplicationStorageBackend.scala +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.store.backend.h2 - -import java.sql.Connection - -import com.daml.lf.data.Time.Timestamp -import com.daml.logging.{ContextualizedLogger, LoggingContext} -import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation -import com.daml.platform.store.backend.common.DeduplicationStorageBackendTemplate - -import scala.util.control.NonFatal - -object H2DeduplicationStorageBackend extends DeduplicationStorageBackendTemplate { - private val logger = ContextualizedLogger.get(this.getClass) - - override def upsertDeduplicationEntry( - key: String, - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(connection: Connection)(implicit loggingContext: LoggingContext): Int = { - - // Under the default READ_COMMITTED isolation level used for the indexdb, when a deduplication - // upsert is performed simultaneously from multiple threads, the query fails with - // JdbcSQLIntegrityConstraintViolationException: Unique index or primary key violation - // Simple retry helps - def retry[T](op: => T): T = - try { - op - } catch { - case NonFatal(e) => - logger.debug(s"Caught exception while upserting a deduplication entry: $e") - op - } - retry( - SQL""" - merge into participant_command_submissions pcs - using dual on deduplication_key = $key - when not matched then - insert (deduplication_key, deduplicate_until) - values ($key, ${deduplicateUntil.micros}) - when matched and pcs.deduplicate_until < ${submittedAt.micros} then - update set deduplicate_until=${deduplicateUntil.micros} - """ - .executeUpdate()(connection) - ) - } -} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2ResetStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2ResetStorageBackend.scala index f1fc308c45dd..430b1230f468 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2ResetStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2ResetStorageBackend.scala @@ -18,7 +18,6 @@ object H2ResetStorageBackend extends ResetStorageBackend { truncate table package_entries; truncate table parameters; truncate table participant_command_completions; - truncate table participant_command_submissions; truncate table participant_events_divulgence; truncate table participant_events_create; truncate table participant_events_consuming_exercise; diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackendFactory.scala index e334b1b96592..22095798a74d 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackendFactory.scala @@ -14,7 +14,6 @@ import com.daml.platform.store.backend.{ ContractStorageBackend, DBLockStorageBackend, DataSourceStorageBackend, - DeduplicationStorageBackend, EventStorageBackend, IngestionStorageBackend, PartyStorageBackend, @@ -32,9 +31,6 @@ object H2StorageBackendFactory extends StorageBackendFactory with CommonStorageB override def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend = new PartyStorageBackendTemplate(H2QueryStrategy, ledgerEndCache) - override val createDeduplicationStorageBackend: DeduplicationStorageBackend = - H2DeduplicationStorageBackend - override def createCompletionStorageBackend( stringInterning: StringInterning ): CompletionStorageBackend = diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleDeduplicationStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleDeduplicationStorageBackend.scala deleted file mode 100644 index 263858b7c241..000000000000 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleDeduplicationStorageBackend.scala +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.store.backend.oracle - -import java.sql.Connection - -import com.daml.lf.data.Time.Timestamp -import com.daml.logging.{ContextualizedLogger, LoggingContext} -import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation -import com.daml.platform.store.backend.common.DeduplicationStorageBackendTemplate - -import scala.util.control.NonFatal - -object OracleDeduplicationStorageBackend extends DeduplicationStorageBackendTemplate { - private val logger = ContextualizedLogger.get(this.getClass) - - override def upsertDeduplicationEntry( - key: String, - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(connection: Connection)(implicit loggingContext: LoggingContext): Int = { - - // Under the default READ_COMMITTED isolation level used for the indexdb, when a deduplication - // upsert is performed simultaneously from multiple threads, the query fails with - // SQLIntegrityConstraintViolationException: ORA-00001: unique constraint (INDEXDB.SYS_C007590) violated - // Simple retry helps - def retry[T](op: => T): T = - try { - op - } catch { - case NonFatal(e) => - logger.debug(s"Caught exception while upserting a deduplication entry: $e") - op - } - retry( - SQL""" - merge into participant_command_submissions pcs - using dual - on (pcs.deduplication_key = $key) - when matched then - update set pcs.deduplicate_until=${deduplicateUntil.micros} - where pcs.deduplicate_until < ${submittedAt.micros} - when not matched then - insert (pcs.deduplication_key, pcs.deduplicate_until) - values ($key, ${deduplicateUntil.micros}) - """ - .executeUpdate()(connection) - ) - } - -} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleResetStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleResetStorageBackend.scala index 8b89bf66c68e..af3a20292593 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleResetStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleResetStorageBackend.scala @@ -17,7 +17,6 @@ object OracleResetStorageBackend extends ResetStorageBackend { "package_entries", "parameters", "participant_command_completions", - "participant_command_submissions", "participant_events_divulgence", "participant_events_create", "participant_events_consuming_exercise", diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackendFactory.scala index 262189882ca4..68fbffa69d2b 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackendFactory.scala @@ -15,7 +15,6 @@ import com.daml.platform.store.backend.{ ContractStorageBackend, DBLockStorageBackend, DataSourceStorageBackend, - DeduplicationStorageBackend, EventStorageBackend, IngestionStorageBackend, PartyStorageBackend, @@ -33,9 +32,6 @@ object OracleStorageBackendFactory extends StorageBackendFactory with CommonStor override def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend = new PartyStorageBackendTemplate(OracleQueryStrategy, ledgerEndCache) - override val createDeduplicationStorageBackend: DeduplicationStorageBackend = - OracleDeduplicationStorageBackend - override def createCompletionStorageBackend( stringInterning: StringInterning ): CompletionStorageBackend = diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresDeduplicationStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresDeduplicationStorageBackend.scala deleted file mode 100644 index 359a825730eb..000000000000 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresDeduplicationStorageBackend.scala +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.store.backend.postgresql - -import java.sql.Connection - -import com.daml.lf.data.Time.Timestamp -import com.daml.logging.LoggingContext -import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation -import com.daml.platform.store.backend.common.DeduplicationStorageBackendTemplate - -object PostgresDeduplicationStorageBackend extends DeduplicationStorageBackendTemplate { - - override def upsertDeduplicationEntry( - key: String, - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(connection: Connection)(implicit loggingContext: LoggingContext): Int = - SQL""" - insert into participant_command_submissions as pcs (deduplication_key, deduplicate_until) - values ($key, ${deduplicateUntil.micros}) - on conflict (deduplication_key) - do update - set deduplicate_until=${deduplicateUntil.micros} - where pcs.deduplicate_until < ${submittedAt.micros} - """ - .executeUpdate()(connection) -} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresResetStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresResetStorageBackend.scala index af42c22fdd69..d36c00f9f0c0 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresResetStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresResetStorageBackend.scala @@ -17,7 +17,6 @@ object PostgresResetStorageBackend extends ResetStorageBackend { truncate table package_entries cascade; truncate table parameters cascade; truncate table participant_command_completions cascade; - truncate table participant_command_submissions cascade; truncate table participant_events_divulgence cascade; truncate table participant_events_create cascade; truncate table participant_events_consuming_exercise cascade; diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala index 94c6c8954fce..ac2958417826 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala @@ -15,7 +15,6 @@ import com.daml.platform.store.backend.{ ContractStorageBackend, DBLockStorageBackend, DataSourceStorageBackend, - DeduplicationStorageBackend, EventStorageBackend, IngestionStorageBackend, PartyStorageBackend, @@ -35,9 +34,6 @@ object PostgresStorageBackendFactory override def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend = new PartyStorageBackendTemplate(PostgresQueryStrategy, ledgerEndCache) - override val createDeduplicationStorageBackend: DeduplicationStorageBackend = - PostgresDeduplicationStorageBackend - override def createCompletionStorageBackend( stringInterning: StringInterning ): CompletionStorageBackend = diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala index e024793e634e..8077c154adad 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala @@ -79,7 +79,6 @@ case class TestBackend( configuration: ConfigurationStorageBackend, party: PartyStorageBackend, packageBackend: PackageStorageBackend, - deduplication: DeduplicationStorageBackend, completion: CompletionStorageBackend, contract: ContractStorageBackend, event: EventStorageBackend, @@ -104,7 +103,6 @@ object TestBackend { configuration = storageBackendFactory.createConfigurationStorageBackend(ledgerEndCache), party = storageBackendFactory.createPartyStorageBackend(ledgerEndCache), packageBackend = storageBackendFactory.createPackageStorageBackend(ledgerEndCache), - deduplication = storageBackendFactory.createDeduplicationStorageBackend, completion = storageBackendFactory.createCompletionStorageBackend(stringInterning), contract = storageBackendFactory.createContractStorageBackend(ledgerEndCache, stringInterning), diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala index 3e3cf33f173d..de9c3261c188 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala @@ -16,7 +16,6 @@ trait StorageBackendSuite with StorageBackendTestsPruning with StorageBackendTestsDBLockForSuite with StorageBackendTestsIntegrity - with StorageBackendTestsDeduplication with StorageBackendTestsTimestamps with StorageBackendTestsStringInterning with StorageBackendTestsUserManagement diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDeduplication.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDeduplication.scala deleted file mode 100644 index 3418e1828f40..000000000000 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDeduplication.scala +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.store.backend - -import com.daml.lf.data.Time.Timestamp -import org.scalatest.Inside -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -private[backend] trait StorageBackendTestsDeduplication - extends Matchers - with Inside - with StorageBackendSpec { - this: AnyFlatSpec => - - behavior of "DeduplicationStorageBackend" - - import StorageBackendTestValues._ - - it should "only allow one upsertDeduplicationEntry to insert a new entry" in { - val key = "deduplication key" - val submittedAt = Timestamp.assertFromLong(0L) - val deduplicateUntil = submittedAt.addMicros(1000L) - val n = 8 - - executeSql(backend.parameter.initializeParameters(someIdentityParams)) - val insertedRows = executeParallelSql( - Vector.fill(n)(c => - backend.deduplication.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil)(c) - ) - ) - val foundDeduplicateUntil = executeSql(backend.deduplication.deduplicatedUntil(key)) - - insertedRows.count(_ == 1) shouldBe 1 // One of the calls inserts a new row - insertedRows.count(_ == 0) shouldBe (n - 1) // All other calls don't write anything - foundDeduplicateUntil shouldBe deduplicateUntil - } - - it should "only allow one upsertDeduplicationEntry to update an existing expired entry" in { - val key = "deduplication key" - val submittedAt = Timestamp.assertFromLong(0L) - val deduplicateUntil = submittedAt.addMicros(1000L) - // Second submission is after the deduplication window of the first one - val submittedAt2 = Timestamp.assertFromLong(2000L) - val deduplicateUntil2 = submittedAt2.addMicros(1000L) - val n = 8 - - executeSql(backend.parameter.initializeParameters(someIdentityParams)) - val insertedRows = - executeSql(backend.deduplication.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil)) - val foundDeduplicateUntil = executeSql(backend.deduplication.deduplicatedUntil(key)) - val updatedRows = executeParallelSql( - Vector.fill(n)(c => - backend.deduplication.upsertDeduplicationEntry( - key, - submittedAt2, - deduplicateUntil2, - )(c) - ) - ) - val foundDeduplicateUntil2 = executeSql(backend.deduplication.deduplicatedUntil(key)) - - insertedRows shouldBe 1 // First call inserts a new row - updatedRows.count( - _ == 1 - ) shouldBe 1 // One of the subsequent calls updates the now expired row - updatedRows.count(_ == 0) shouldBe (n - 1) // All other calls don't write anything - foundDeduplicateUntil shouldBe deduplicateUntil - foundDeduplicateUntil2 shouldBe deduplicateUntil2 - } - - it should "not update or insert anything if there is an existing active entry" in { - val key = "deduplication key" - val submittedAt = Timestamp.assertFromLong(0L) - val deduplicateUntil = submittedAt.addMicros(5000L) - // Second submission is within the deduplication window of the first one - val submittedAt2 = Timestamp.assertFromLong(1000L) - val deduplicateUntil2 = submittedAt2.addMicros(5000L) - - executeSql(backend.parameter.initializeParameters(someIdentityParams)) - val insertedRows = executeSql( - backend.deduplication.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil) - ) - val foundDeduplicateUntil = executeSql(backend.deduplication.deduplicatedUntil(key)) - val updatedRows = executeSql( - backend.deduplication.upsertDeduplicationEntry(key, submittedAt2, deduplicateUntil2) - ) - val foundDeduplicateUntil2 = executeSql(backend.deduplication.deduplicatedUntil(key)) - - insertedRows shouldBe 1 // First call inserts a new row - updatedRows shouldBe 0 // Second call doesn't write anything - foundDeduplicateUntil shouldBe deduplicateUntil - foundDeduplicateUntil2 shouldBe deduplicateUntil - } - -} diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoCommandDeduplicationSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoCommandDeduplicationSpec.scala deleted file mode 100644 index f45f0b0b6405..000000000000 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoCommandDeduplicationSpec.scala +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.store.dao - -import java.time.Instant -import java.util.UUID - -import com.daml.ledger.api.domain.CommandId -import com.daml.ledger.participant.state.index.v2.{ - CommandDeduplicationDuplicate, - CommandDeduplicationNew, -} -import com.daml.lf.data.Time.Timestamp -import org.scalatest.flatspec.AsyncFlatSpec -import org.scalatest.matchers.should.Matchers - -private[dao] trait JdbcLedgerDaoCommandDeduplicationSpec { - this: AsyncFlatSpec with Matchers with JdbcLedgerDaoSuite => - - behavior of "JdbcLedgerDao (command deduplication)" - - it should "correctly deduplicate a command" in { - val commandId: CommandId = CommandId(UUID.randomUUID.toString) - for { - original <- ledgerDao.deduplicateCommand(commandId, List(alice), t(0), t(5000)) - duplicate <- ledgerDao.deduplicateCommand(commandId, List(alice), t(500), t(5500)) - } yield { - original shouldBe CommandDeduplicationNew - duplicate shouldBe CommandDeduplicationDuplicate(t(5000)) - } - } - - it should "correctly deduplicate commands with multiple submitters" in { - val commandId: CommandId = CommandId(UUID.randomUUID.toString) - for { - original <- ledgerDao.deduplicateCommand(commandId, List(alice, bob), t(0), t(5000)) - duplicate1 <- ledgerDao.deduplicateCommand(commandId, List(alice, bob), t(500), t(5500)) - duplicate2 <- ledgerDao.deduplicateCommand(commandId, List(bob, alice), t(500), t(5500)) - duplicate3 <- ledgerDao.deduplicateCommand( - commandId, - List(alice, bob, alice), - t(500), - t(5500), - ) - } yield { - original shouldBe CommandDeduplicationNew - duplicate1 shouldBe CommandDeduplicationDuplicate(t(5000)) - duplicate2 shouldBe CommandDeduplicationDuplicate(t(5000)) - duplicate3 shouldBe CommandDeduplicationDuplicate(t(5000)) - } - } - - it should "not deduplicate a command after it expired" in { - val commandId: CommandId = CommandId(UUID.randomUUID.toString) - for { - original1 <- ledgerDao.deduplicateCommand(commandId, List(alice), t(0), t(100)) - original2 <- ledgerDao.deduplicateCommand(commandId, List(alice), t(101), t(200)) - } yield { - original1 shouldBe CommandDeduplicationNew - original2 shouldBe CommandDeduplicationNew - } - } - - it should "not deduplicate a command after its deduplication was stopped" in { - val commandId: CommandId = CommandId(UUID.randomUUID.toString) - for { - original1 <- ledgerDao.deduplicateCommand(commandId, List(alice), t(0), t(10000)) - _ <- ledgerDao.stopDeduplicatingCommand(commandId, List(alice)) - original2 <- ledgerDao.deduplicateCommand(commandId, List(alice), t(1), t(10001)) - } yield { - original1 shouldBe CommandDeduplicationNew - original2 shouldBe CommandDeduplicationNew - } - } - - it should "not deduplicate commands with different command ids" in { - val commandId1: CommandId = CommandId(UUID.randomUUID.toString) - val commandId2: CommandId = CommandId(UUID.randomUUID.toString) - for { - original1 <- ledgerDao.deduplicateCommand(commandId1, List(alice, bob), t(0), t(1000)) - original2 <- ledgerDao.deduplicateCommand(commandId2, List(alice, bob), t(0), t(1000)) - } yield { - original1 shouldBe CommandDeduplicationNew - original2 shouldBe CommandDeduplicationNew - } - } - - it should "not deduplicate commands with different submitters" in { - val commandId: CommandId = CommandId(UUID.randomUUID.toString) - for { - original1 <- ledgerDao.deduplicateCommand(commandId, List(alice, bob), t(0), t(1000)) - original2 <- ledgerDao.deduplicateCommand(commandId, List(alice, charlie), t(0), t(1000)) - } yield { - original1 shouldBe CommandDeduplicationNew - original2 shouldBe CommandDeduplicationNew - } - } - - private[this] val t0 = Instant.now() - private[this] def t(ms: Long): Timestamp = { - Timestamp.assertFromInstant(t0.plusMillis(ms)) - } -} diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala index 249c92685206..0c1f7ece58ad 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala @@ -9,11 +9,7 @@ import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails} import com.daml.ledger.api.messages.command.submission.SubmitRequest import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks} import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} -import com.daml.ledger.participant.state.index.v2.{ - CommandDeduplicationNew, - IndexPartyManagementService, - IndexSubmissionService, -} +import com.daml.ledger.participant.state.index.v2.IndexPartyManagementService import com.daml.ledger.participant.state.v2.{SubmissionResult, WriteService} import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf @@ -331,91 +327,6 @@ class ApiSubmissionServiceSpec } } - behavior of "command deduplication" - - it should "use deduplication if enabled" in { - val partyManagementService = mock[IndexPartyManagementService] - val writeService = mock[state.WriteService] - val indexSubmissionService = mock[IndexSubmissionService] - val mockCommandExecutor = mock[CommandExecutor] - - when( - mockCommandExecutor.execute( - any[Commands], - any[Hash], - any[Configuration], - )(any[ExecutionContext], any[LoggingContext]) - ).thenReturn( - Future.failed( - new RuntimeException - ) // we don't care about the result, deduplication already happened - ) - - val service = - newSubmissionService( - writeService, - partyManagementService, - implicitPartyAllocation = true, - mockIndexSubmissionService = indexSubmissionService, - commandExecutor = mockCommandExecutor, - ) - - val submitRequest = newSubmitRequest() - service - .submit(submitRequest) - .transform(_ => { - verify(indexSubmissionService).deduplicateCommand( - any[CommandId], - any[List[Ref.Party]], - any[Timestamp], - any[Timestamp], - )(any[LoggingContext]) - Success(succeed) - }) - } - - it should "not use deduplication when disabled" in { - val partyManagementService = mock[IndexPartyManagementService] - val writeService = mock[state.WriteService] - val indexSubmissionService = mock[IndexSubmissionService] - val mockCommandExecutor = mock[CommandExecutor] - - when( - mockCommandExecutor.execute( - any[Commands], - any[Hash], - any[Configuration], - )(any[ExecutionContext], any[LoggingContext]) - ).thenReturn( - Future.failed( - new RuntimeException - ) // we don't care about the result, deduplication already happened - ) - - val service = - newSubmissionService( - writeService, - partyManagementService, - implicitPartyAllocation = true, - deduplicationEnabled = false, - mockIndexSubmissionService = indexSubmissionService, - commandExecutor = mockCommandExecutor, - ) - - val submitRequest = newSubmitRequest() - service - .submit(submitRequest) - .transform(_ => { - verify(indexSubmissionService, never).deduplicateCommand( - any[CommandId], - any[List[Ref.Party]], - any[Timestamp], - any[Timestamp], - )(any[LoggingContext]) - Success(succeed) - }) - } - it should "rate-limit when configured to do so" in { val grpcError = RpcStatus.of(Status.Code.ABORTED.value(), s"Quota Exceeded", Seq.empty) @@ -424,8 +335,6 @@ class ApiSubmissionServiceSpec mock[state.WriteService], mock[IndexPartyManagementService], implicitPartyAllocation = true, - deduplicationEnabled = false, - mockIndexSubmissionService = mock[IndexSubmissionService], commandExecutor = mock[CommandExecutor], checkOverloaded = _ => Some(SubmissionResult.SynchronousError(grpcError)), ) @@ -444,10 +353,6 @@ class ApiSubmissionServiceSpec } object ApiSubmissionServiceSpec { - - import ArgumentMatchersSugar._ - import MockitoSugar._ - val commandId = new AtomicInteger() private def newSubmitRequest() = { @@ -474,8 +379,6 @@ object ApiSubmissionServiceSpec { partyManagementService: IndexPartyManagementService, implicitPartyAllocation: Boolean, commandExecutor: CommandExecutor = null, - deduplicationEnabled: Boolean = true, - mockIndexSubmissionService: IndexSubmissionService = mock[IndexSubmissionService], useSelfServiceErrorCodes: Boolean = true, checkOverloaded: TelemetryContext => Option[SubmissionResult] = _ => None, )(implicit @@ -487,33 +390,15 @@ object ApiSubmissionServiceSpec { Some(Configuration(0L, LedgerTimeModel.reasonableDefault, Duration.ZERO)) } - when(writeService.isApiDeduplicationEnabled).thenReturn(deduplicationEnabled) - when( - mockIndexSubmissionService.deduplicateCommand( - any[CommandId], - anyList[Ref.Party], - any[Timestamp], - any[Timestamp], - )(any[LoggingContext]) - ).thenReturn(Future.successful(CommandDeduplicationNew)) - when( - mockIndexSubmissionService.stopDeduplicatingCommand( - any[CommandId], - anyList[Ref.Party], - )(any[LoggingContext]) - ).thenReturn(Future.unit) - new ApiSubmissionService( writeService = writeService, - submissionService = mockIndexSubmissionService, partyManagementService = partyManagementService, timeProvider = null, timeProviderType = null, ledgerConfigurationSubscription = ledgerConfigurationSubscription, seedService = SeedService.WeakRandom, commandExecutor = commandExecutor, - configuration = ApiSubmissionService - .Configuration(implicitPartyAllocation, enableDeduplication = true), + configuration = ApiSubmissionService.Configuration(implicitPartyAllocation), metrics = new Metrics(new MetricRegistry), errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher( enableSelfServiceErrorCodes = useSelfServiceErrorCodes diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/DbDtoToStringsForInterningSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/DbDtoToStringsForInterningSpec.scala index c935af486d6b..6d889c56ae71 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/DbDtoToStringsForInterningSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/DbDtoToStringsForInterningSpec.scala @@ -150,9 +150,6 @@ class DbDtoToStringsForInterningSpec extends AnyFlatSpec with Matchers { deduplication_duration_nanos = Some(1), deduplication_start = Some(1), ), - DbDto.CommandDeduplication( - deduplication_key = "74" - ), DbDto.ConfigurationEntry( ledger_offset = "75", recorded_at = 1, diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala index 76650183bc87..c7a65ed01c96 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala @@ -5,7 +5,6 @@ package com.daml.platform.store.backend import com.daml.daml_lf_dev.DamlLf import com.daml.ledger.api.DeduplicationPeriod.{DeduplicationDuration, DeduplicationOffset} -import com.daml.ledger.api.domain import com.daml.ledger.api.v1.event.{CreatedEvent, ExercisedEvent} import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} import com.daml.ledger.offset.Offset @@ -20,7 +19,7 @@ import com.daml.logging.LoggingContext import com.daml.platform.index.index.StatusDetails import com.daml.platform.store.appendonlydao.events.Raw.TreeEvent import com.daml.platform.store.appendonlydao.events._ -import com.daml.platform.store.appendonlydao.{DeduplicationKeyMaker, JdbcLedgerDao} +import com.daml.platform.store.appendonlydao.JdbcLedgerDao import com.google.protobuf.ByteString import com.google.rpc.status.{Status => StatusProto} import io.grpc.Status @@ -257,13 +256,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { deduplication_duration_seconds = None, deduplication_duration_nanos = None, deduplication_start = None, - ), - DbDto.CommandDeduplication( - DeduplicationKeyMaker.make( - domain.CommandId(completionInfo.commandId), - completionInfo.actAs, - ) - ), + ) ) } @@ -1285,13 +1278,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { deduplication_duration_seconds = expectedDeduplicationDurationSeconds, deduplication_duration_nanos = expectedDeduplicationDurationNanos, deduplication_start = None, - ), - DbDto.CommandDeduplication( - DeduplicationKeyMaker.make( - domain.CommandId(completionInfo.commandId), - completionInfo.actAs, - ) - ), + ) ) } } diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/dao/DeduplicationKeyMakerSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/dao/DeduplicationKeyMakerSpec.scala deleted file mode 100644 index 95706d6fb61d..000000000000 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/dao/DeduplicationKeyMakerSpec.scala +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.store.dao - -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec -import com.daml.ledger.api.domain.CommandId -import com.daml.lf.data.Ref -import com.daml.platform.store.appendonlydao.DeduplicationKeyMaker -import org.scalatest.prop.TableDrivenPropertyChecks - -import java.util.UUID -import scala.util.Random -import scalaz.syntax.tag._ - -class DeduplicationKeyMakerSpec extends AnyWordSpec with Matchers with TableDrivenPropertyChecks { - - val commandId: CommandId = CommandId(Ref.LedgerString.assertFromString(UUID.randomUUID.toString)) - - "DeduplicationKeyMaker" should { - "make a deduplication key starting with a command ID in plain-text" in { - DeduplicationKeyMaker.make(commandId, List(aParty())) should startWith(commandId.unwrap) - } - - "make different keys for different sets of submitters" in { - val aCommonParty = aParty() - val cases = Table( - ("Submitters for key1", "Submitters for key2"), - (List(aParty()), List(aParty())), - (List(aCommonParty, aParty()), List(aCommonParty, aParty())), - (List(aParty(), aParty()), List(aParty(), aParty())), - ) - - forAll(cases) { case (key1Submitters, key2Submitters) => - val key1 = DeduplicationKeyMaker.make(commandId, key1Submitters) - val key2 = DeduplicationKeyMaker.make(commandId, key2Submitters) - - key1 shouldNot equal(key2) - } - } - - "make a deduplication key with a limited length for a large number of submitters" in { - val submitters = (1 to 50).map(_ => aParty()).toList - - /** The motivation for the MaxKeyLength is to avoid problems with putting deduplication key in a database - * index (e.g. for Postgres the limit for the index row size is 2712). - * The value 200 is set arbitrarily to provide some space for other data. - */ - val MaxKeyLength = 200 - DeduplicationKeyMaker.make(commandId, submitters).length should be < MaxKeyLength - } - - "make the same deduplication key for submitters of different order" in { - val submitter1 = aParty() - val submitter2 = aParty() - val submitter3 = aParty() - - val key1 = DeduplicationKeyMaker.make(commandId, List(submitter1, submitter2, submitter3)) - val key2 = DeduplicationKeyMaker.make(commandId, List(submitter1, submitter3, submitter2)) - - key1 shouldBe key2 - } - - "make the same deduplication key for duplicated submitters" in { - val submitter1 = aParty() - val submitter2 = aParty() - - val key1 = DeduplicationKeyMaker.make(commandId, List(submitter1, submitter2)) - val key2 = DeduplicationKeyMaker.make( - commandId, - List(submitter1, submitter1, submitter2, submitter2, submitter2), - ) - - key1 shouldBe key2 - } - - def aParty(): Ref.Party = Ref.Party.assertFromString(Random.alphanumeric.take(100).mkString) - } -} diff --git a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexService.scala b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexService.scala index e1af45b5a50a..2a987a0b40c7 100644 --- a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexService.scala +++ b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexService.scala @@ -16,7 +16,6 @@ trait IndexService with IndexPartyManagementService with IndexConfigManagementService with IndexParticipantPruningService - with IndexSubmissionService with MeteringStore // with IndexTimeService //TODO: this needs some further discussion as the TimeService is actually optional with ReportsHealth diff --git a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexSubmissionService.scala b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexSubmissionService.scala deleted file mode 100644 index 947737b95c42..000000000000 --- a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexSubmissionService.scala +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.participant.state.index.v2 - -import com.daml.ledger.api.domain.CommandId -import com.daml.lf.data.Ref -import com.daml.lf.data.Time.Timestamp -import com.daml.logging.LoggingContext - -import scala.concurrent.Future - -/** Serves as a backend to implement - * [[com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc]] - */ -trait IndexSubmissionService { - def deduplicateCommand( - commandId: CommandId, - submitters: List[Ref.Party], - submittedAt: Timestamp, - deduplicateUntil: Timestamp, - )(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] - - def stopDeduplicatingCommand( - commandId: CommandId, - submitters: List[Ref.Party], - )(implicit loggingContext: LoggingContext): Future[Unit] -} diff --git a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/package.scala b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/package.scala index db6f31032223..c2eb3ebb4823 100644 --- a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/package.scala +++ b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/package.scala @@ -107,13 +107,4 @@ package v2 { knownSince: Timestamp, sourceDescription: Option[String], ) - - sealed abstract class CommandDeduplicationResult extends Product with Serializable - - /** This is the first time the command was submitted. */ - case object CommandDeduplicationNew extends CommandDeduplicationResult - - /** This command was submitted before. */ - final case class CommandDeduplicationDuplicate(deduplicateUntil: Timestamp) - extends CommandDeduplicationResult } diff --git a/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedWriteService.scala b/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedWriteService.scala index 6d1e2b3c3d52..ed7c0ad3c05b 100644 --- a/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedWriteService.scala +++ b/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedWriteService.scala @@ -95,6 +95,4 @@ final class TimedWriteService(delegate: WriteService, metrics: Metrics) extends override def currentHealth(): HealthStatus = delegate.currentHealth() - - override def isApiDeduplicationEnabled: Boolean = delegate.isApiDeduplicationEnabled } diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala index cc529d4b2762..f26d31d3af51 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala @@ -18,11 +18,7 @@ import com.daml.lf.language.LanguageVersion import com.daml.metrics.MetricsReporter import com.daml.platform.apiserver.SeedService.Seeding import com.daml.platform.configuration.Readers._ -import com.daml.platform.configuration.{ - CommandConfiguration, - IndexConfiguration, - SubmissionConfiguration, -} +import com.daml.platform.configuration.{CommandConfiguration, IndexConfiguration} import com.daml.platform.usermanagement.UserManagementConfig import com.daml.ports.Port import io.netty.handler.ssl.ClientAuth @@ -34,7 +30,6 @@ final case class Config[Extra]( mode: Mode, ledgerId: String, commandConfig: CommandConfiguration, - submissionConfig: SubmissionConfiguration, tlsConfig: Option[TlsConfiguration], participants: Seq[ParticipantConfig], maxInboundMessageSize: Int, @@ -78,7 +73,6 @@ object Config { mode = Mode.Run, ledgerId = UUID.randomUUID().toString, commandConfig = CommandConfiguration.default, - submissionConfig = SubmissionConfiguration.default, tlsConfig = None, participants = Vector.empty, maxInboundMessageSize = DefaultMaxInboundMessageSize, @@ -417,17 +411,6 @@ object Config { s" Default is ${CommandConfiguration.DefaultTrackerRetentionPeriod}." ) - opt[Unit]("disable-deduplication-unsafe") - .optional() - .hidden() - .action((_, config) => - config - .copy(submissionConfig = config.submissionConfig.copy(enableDeduplication = false)) - ) - .text( - "Disable participant-side command deduplication." - ) - opt[Duration]("max-deduplication-duration") .optional() .hidden() diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala index d6b026a8e4c6..a12a550797a7 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala @@ -230,7 +230,6 @@ final class Runner[T <: ReadWriteService, Extra]( ledgerId = config.ledgerId, config = apiServerConfig, commandConfig = config.commandConfig, - submissionConfig = config.submissionConfig, partyConfig = configProvider.partyConfig(config), optWriteService = Some(writeService), authService = configProvider.authService(config), diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateWriter.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateWriter.scala index 9a2e70768663..3c9ef062e1a6 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateWriter.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateWriter.scala @@ -31,7 +31,6 @@ class KeyValueParticipantStateWriter( ) extends WriteService { private val logger: ContextualizedLogger = ContextualizedLogger.get(getClass) - override def isApiDeduplicationEnabled: Boolean = false private val keyValueSubmission = new KeyValueSubmission(metrics) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/WriteServiceWithDeduplicationSupport.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/WriteServiceWithDeduplicationSupport.scala index 80e33bac0d12..1f1ec6764aa6 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/WriteServiceWithDeduplicationSupport.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/WriteServiceWithDeduplicationSupport.scala @@ -109,9 +109,6 @@ class WriteServiceWithDeduplicationSupport( telemetryContext: TelemetryContext, ): CompletionStage[SubmissionResult] = delegate.submitConfiguration(maxRecordTime, submissionId, config) - - override def isApiDeduplicationEnabled: Boolean = delegate.isApiDeduplicationEnabled - } object WriteServiceWithDeduplicationSupport { diff --git a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/WriteService.scala b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/WriteService.scala index 1c50b65625e0..cd14e21a8674 100644 --- a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/WriteService.scala +++ b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/WriteService.scala @@ -105,9 +105,4 @@ trait WriteService loggingContext: LoggingContext, telemetryContext: TelemetryContext, ): CompletionStage[SubmissionResult] - - /** Indicates whether command deduplication should be enabled when using this [[WriteService]] - * This is temporary until we fully transition from [[com.daml.ledger.participant.state.v1.WriteService]] to [[WriteService]] - */ - def isApiDeduplicationEnabled: Boolean = false } diff --git a/ledger/sandbox-common/src/main/scala/platform/sandbox/config/SandboxConfig.scala b/ledger/sandbox-common/src/main/scala/platform/sandbox/config/SandboxConfig.scala index 534847d30777..49bbe4e462ce 100644 --- a/ledger/sandbox-common/src/main/scala/platform/sandbox/config/SandboxConfig.scala +++ b/ledger/sandbox-common/src/main/scala/platform/sandbox/config/SandboxConfig.scala @@ -12,11 +12,7 @@ import com.daml.lf.data.Ref import com.daml.metrics.MetricsReporter import com.daml.platform.apiserver.SeedService.Seeding import com.daml.platform.common.LedgerIdMode -import com.daml.platform.configuration.{ - CommandConfiguration, - InitialLedgerConfiguration, - SubmissionConfiguration, -} +import com.daml.platform.configuration.{CommandConfiguration, InitialLedgerConfiguration} import com.daml.platform.services.time.TimeProviderType import com.daml.ports.Port import java.io.File @@ -42,7 +38,6 @@ final case class SandboxConfig( delayBeforeSubmittingLedgerConfiguration: Duration, timeModel: LedgerTimeModel, commandConfig: CommandConfiguration, - submissionConfig: SubmissionConfiguration, tlsConfig: Option[TlsConfiguration], // TODO sandbox: Remove CLI option scenario: Option[String], @@ -146,7 +141,6 @@ object SandboxConfig { maxSkew = Duration.ofSeconds(120L), ).get, commandConfig = CommandConfiguration.default, - submissionConfig = SubmissionConfiguration.default, tlsConfig = None, scenario = None, implicitPartyAllocation = true, diff --git a/ledger/sandbox-on-x/src/classic/scala/com/daml/platform/ledger/ConfigConverter.scala b/ledger/sandbox-on-x/src/classic/scala/com/daml/platform/ledger/ConfigConverter.scala index 2cb3e45de9e5..d375dfbda9fd 100644 --- a/ledger/sandbox-on-x/src/classic/scala/com/daml/platform/ledger/ConfigConverter.scala +++ b/ledger/sandbox-on-x/src/classic/scala/com/daml/platform/ledger/ConfigConverter.scala @@ -69,7 +69,6 @@ object ConfigConverter { maybeLedgerId.getOrElse(LedgerIdGenerator.generateRandomId(ledgerName).unwrap) }, commandConfig = sandboxConfig.commandConfig, - submissionConfig = sandboxConfig.submissionConfig, tlsConfig = sandboxConfig.tlsConfig, participants = Seq( singleCombinedParticipant diff --git a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/BridgeWriteService.scala b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/BridgeWriteService.scala index 835fdc1e13f7..91622a634ad5 100644 --- a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/BridgeWriteService.scala +++ b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/BridgeWriteService.scala @@ -37,8 +37,6 @@ class BridgeWriteService( private[this] val logger = ContextualizedLogger.get(getClass) - override def isApiDeduplicationEnabled: Boolean = false - override def close(): Unit = { logger.info("Shutting down BridgeWriteService.") queue.complete() diff --git a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala index 33908eeb8c4a..130f146713cd 100644 --- a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala +++ b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala @@ -231,7 +231,6 @@ object SandboxOnXRunner { ledgerId = config.ledgerId, config = apiServerConfig, commandConfig = config.commandConfig, - submissionConfig = config.submissionConfig, partyConfig = PartyConfiguration(config.extra.implicitPartyAllocation), optWriteService = Some(writeService), authService = config.extra.authService, diff --git a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala index a54d421d63bc..01024e61ab95 100644 --- a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala +++ b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala @@ -43,7 +43,7 @@ import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.MetricsReporting import com.daml.platform.apiserver._ import com.daml.platform.common.LedgerIdMode -import com.daml.platform.configuration.{PartyConfiguration, ServerRole, SubmissionConfiguration} +import com.daml.platform.configuration.{PartyConfiguration, ServerRole} import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, StandaloneIndexerServer} import com.daml.platform.sandbox.banner.Banner import com.daml.platform.sandbox.config.SandboxConfig @@ -266,7 +266,6 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { partyConfig = PartyConfiguration.default.copy( implicitPartyAllocation = config.implicitPartyAllocation ), - submissionConfig = SubmissionConfiguration.default, optWriteService = Some(writeServiceWithDeduplicationSupport), authService = authService, healthChecks = healthChecks,