Skip to content

Commit

Permalink
Async package management (digital-asset#3806)
Browse files Browse the repository at this point in the history
* Add package_entries table

* Change PublicPackageUpload event to cover list of packages.

Add PublicPackageUploadRejected.

* Produce new package update events in KeyValueConsumption

* Update signature of uploadPackages

* Cleanup InMemoryKVParticipantState. Add submissionId to uploadPackages.

* Fix up InMemoryKVParticipantStateIT

* Initial ledger dao changes for package entries

Drop the participant_id as we never expect to see
entries of other participants. This should be done
for party_entries as well.

* Drop UploadPackagesResult

* Implement getPackageEntries and refactor callers

* Add maxRecordTime to uploadPackages

* First cut at updating ApiPackageManagementService

* Update tests, wire through the packageEntries

* Don't extend IndexPackagesService in InMemoryPackageStore

It does not implement the full interface and it isn't used
directly as one anyway.

* Drop maximum_record_time from package_management_service

Adding maximum record times touches the whole stack. Leaving
this change to another PR.

* Wire through the removal of maximum_record_time.

And remove dead code from InMemoryKVParticipantState

* Remove checking for duplicate package uploads

This aligns with the behaviour of WriteService.

* Reformat

* Fix PackageManagementService after adding of submission_id to the service
  • Loading branch information
Jussi Mäki authored Dec 13, 2019
1 parent 0c741a6 commit ce70ad4
Show file tree
Hide file tree
Showing 34 changed files with 636 additions and 475 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import DA.Ledger.Types
import Data.ByteString(ByteString)
import Data.Functor
import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as TL
import Network.GRPC.HighLevel.Generated
import qualified Com.Digitalasset.Ledger.Api.V1.Admin.PackageManagementService as LL

Expand Down Expand Up @@ -57,7 +58,7 @@ uploadDarFile bytes =
withGRPCClient config $ \client -> do
service <- LL.packageManagementServiceClient client
let LL.PackageManagementService {packageManagementServiceUploadDarFile=rpc} = service
let request = LL.UploadDarFileRequest bytes
let request = LL.UploadDarFileRequest bytes TL.empty {- let server allocate submission id -}
rpc (ClientNormalRequest request timeout mdm)
>>= unwrapWithInvalidArgument
<&> fmap (\LL.UploadDarFileResponse{} -> ())
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ message UploadDarFileRequest {
// ``daml_lf.proto``.
// Required
bytes dar_file = 1;

// Unique submission identifier.
// Optional, defaults to a random identifier.
string submission_id = 2;
}

// An empty message that is received when the upload operation succeeded.
message UploadDarFileResponse {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

package com.daml.ledger.api.server.damlonx.reference.v2

import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.codahale.metrics.SharedMetricRegistries
import com.daml.ledger.participant.state.v1.SubmissionId
import com.daml.ledger.api.server.damlonx.reference.v2.cli.Cli
import com.daml.ledger.participant.state.kvutils.InMemoryKVParticipantState
import com.digitalasset.daml.lf.archive.DarReader
Expand Down Expand Up @@ -50,10 +52,11 @@ object ReferenceServer extends App {
val authService = AuthServiceWildcard

config.archiveFiles.foreach { file =>
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- DarReader { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchiveFromFile(file)
} yield ledger.uploadPackages(dar.all, None)
} yield ledger.uploadPackages(submissionId, dar.all, None)
}

val participantF: Future[(AutoCloseable, AutoCloseable)] = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,21 @@ object domain {
reason: String
) extends PartyEntry
}

sealed abstract class PackageEntry() extends Product with Serializable

object PackageEntry {

final case class PackageUploadAccepted(
submissionId: String,
recordTime: Instant
) extends PackageEntry

final case class PackageUploadRejected(
submissionId: String,
recordTime: Instant,
reason: String
) extends PackageEntry
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@

package com.daml.ledger.participant.state.index.v2

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.digitalasset.daml.lf.data.Ref.PackageId
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.daml.lf.language.Ast.Package
import com.digitalasset.ledger.api.domain.{LedgerOffset, PackageEntry}

import scala.concurrent.Future

/**
* Serves as a backend to implement
* [[com.digitalasset.ledger.api.v1.package_service.PackageServiceGrpc.PackageService]]
* PackageService and PackageManagementService.
*/
trait IndexPackagesService {
def listLfPackages(): Future[Map[PackageId, PackageDetails]]
Expand All @@ -20,4 +23,6 @@ trait IndexPackagesService {

/** Like [[getLfArchive]], but already parsed. */
def getLfPackage(packageId: PackageId): Future[Option[Package]]

def packageEntries(beginOffset: LedgerOffset.Absolute): Source[PackageEntry, NotUsed]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package com.daml.ledger.participant.state.kvutils
import java.io._
import java.time.Clock
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CompletableFuture, CompletionStage}

import akka.NotUsed
Expand Down Expand Up @@ -74,10 +73,6 @@ object InMemoryKVParticipantState {
/** A periodically emitted heartbeat that is committed to the ledger. */
final case class CommitHeartbeat(recordTime: Timestamp) extends Commit

final case class AddPackageUploadRequest(
submissionId: String,
cf: CompletableFuture[UploadPackagesResult])

final case class AddPotentialResponse(idx: Int)

}
Expand Down Expand Up @@ -122,9 +117,6 @@ class InMemoryKVParticipantState(
// Namespace prefix for DAML state.
private val NS_DAML_STATE = ByteString.copyFromUtf8("DS")

// For an in-memory ledger, an atomic integer is enough to guarantee uniqueness
private val submissionIdSource = new AtomicInteger()

/** Interval for heartbeats. Heartbeats are committed to [[State.commitLog]]
* and sent as [[Update.Heartbeat]] to [[stateUpdates]] consumers.
*/
Expand Down Expand Up @@ -158,55 +150,6 @@ class InMemoryKVParticipantState(
stateRef = newState
}

/** Akka actor that matches the requests for party allocation
* with asynchronous responses delivered within the log entries.
*/
class ResponseMatcher extends Actor {
var packageRequests: Map[String, CompletableFuture[UploadPackagesResult]] = Map.empty

@SuppressWarnings(Array("org.wartremover.warts.Any"))
override def receive: Receive = {
case AddPackageUploadRequest(submissionId, cf) =>
packageRequests += (submissionId -> cf); ()

case AddPotentialResponse(idx) =>
assert(idx >= 0 && idx < stateRef.commitLog.size)

stateRef.commitLog(idx) match {
case CommitSubmission(entryId, _) =>
stateRef.store
.get(entryId.getEntryId)
.flatMap { blob =>
KeyValueConsumption.logEntryToAsyncResponse(
entryId,
Envelope.open(blob) match {
case Right(Envelope.LogEntryMessage(logEntry)) =>
logEntry
case _ =>
sys.error(s"Envolope did not contain log entry")
},
participantId
)
}
.foreach {
case KeyValueConsumption.PackageUploadResponse(submissionId, result) =>
packageRequests
.getOrElse(
submissionId,
sys.error(
s"packageUpload response: $submissionId could not be matched with a request!"))
.complete(result)
packageRequests -= submissionId
}
case _ => ()
}
}
}

/** Instance of the [[ResponseMatcher]] to which we send messages used for request-response matching. */
private val matcherActorRef =
system.actorOf(Props(new ResponseMatcher), s"response-matcher-$ledgerId")

/** Akka actor that receives submissions sequentially and
* commits them one after another to the state, e.g. appending
* a new ledger commit entry, and applying it to the key-value store.
Expand Down Expand Up @@ -286,7 +229,6 @@ class InMemoryKVParticipantState(

// Wake up consumers.
dispatcher.signalNewHead(stateRef.commitLog.size)
matcherActorRef ! AddPotentialResponse(stateRef.commitLog.size - 1)
}
}
}
Expand Down Expand Up @@ -454,21 +396,29 @@ class InMemoryKVParticipantState(
private def generateRandomParty(): Ref.Party =
Ref.Party.assertFromString(s"party-${UUID.randomUUID().toString.take(8)}")

/*
/** Upload DAML-LF packages to the ledger */
*/

/** Upload a collection of DAML-LF packages to the ledger. */
override def uploadPackages(
submissionId: SubmissionId,
archives: List[Archive],
sourceDescription: Option[String]): CompletionStage[UploadPackagesResult] = {
val sId = submissionIdSource.getAndIncrement().toString
val cf = new CompletableFuture[UploadPackagesResult]
matcherActorRef ! AddPackageUploadRequest(sId, cf)
commitActorRef ! CommitSubmission(
allocateEntryId,
Envelope.enclose(
KeyValueSubmission
.archivesToSubmission(sId, archives, sourceDescription.getOrElse(""), participantId))
)
cf
}
sourceDescription: Option[String]): CompletionStage[SubmissionResult] =
CompletableFuture.completedFuture({
commitActorRef ! CommitSubmission(
allocateEntryId,
Envelope.enclose(
KeyValueSubmission
.archivesToSubmission(
submissionId,
archives,
sourceDescription.getOrElse(""),
participantId))
)
SubmissionResult.Acknowledged
})

/** Retrieve the static initial conditions of the ledger, containing
* the ledger identifier and the initial ledger record time.
Expand All @@ -485,21 +435,6 @@ class InMemoryKVParticipantState(
val _ = Await.ready(gracefulStop(commitActorRef, 5.seconds, PoisonPill), 6.seconds)
}

private def getLogEntry(state: State, entryId: Proto.DamlLogEntryId): Proto.DamlLogEntry = {
Envelope.open(
state.store
.getOrElse(
entryId.getEntryId,
sys.error(s"getLogEntry: Cannot find ${Pretty.prettyEntryId(entryId)}!")
)
) match {
case Right(Envelope.LogEntryMessage(logEntry)) =>
logEntry
case _ =>
sys.error(s"getLogEntry: Envelope did not contain log entry")
}
}

private def getDamlState(state: State, key: Proto.DamlStateKey): Option[Proto.DamlStateValue] =
state.store
.get(NS_DAML_STATE.concat(KeyValueCommitting.packDamlStateKey(key)))
Expand Down
Loading

0 comments on commit ce70ad4

Please sign in to comment.