Skip to content

Commit

Permalink
Implement package management API in the sandbox (digital-asset#1610)
Browse files Browse the repository at this point in the history
* store archive size in `DarReader`

* rename `SandboxTemplateStore` to `SandboxPackageStore`

* store package info in `SandboxPackageStore`

* introduce package upload / read to write / index services

not tested yet, just a ton of plumbing

* WIP test the package service

* Fix build errors after rebase

* Move packages service to v2

* Ledger API client uses ledger API types

* Fix ReflectionIT

* Correctly handle uploading invalid dar files

* Fix reading DAR entry file sizes

* Improve package management IT

* Improve handling of duplicate packages

* Fix language-support build

* Use unique party and command names

* Rename lfpackage to language

* Rename SandboxPackageStore to InMemoryPackageStore

* Remove getCurrentTime ledger method

* Improve package management IT

* Move InMemoryActiveContracts and InMemoryPackageStore

* Use case object for UploadDarResult.Ok

* Address review comments

* Update release notes

Fixes digital-asset#1311

* Use BazelRunfiles in test
  • Loading branch information
rautenrieth-da authored and mergify[bot] committed Jun 19, 2019
1 parent fda517a commit c14b909
Show file tree
Hide file tree
Showing 51 changed files with 657 additions and 334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import scala.util.{Failure, Success, Try}

class DarReader[A](
readDalfNamesFromManifest: InputStream => Try[Dar[String]],
parseDalf: InputStream => Try[A]) {
// The `Long` is the dalf size in bytes.
parseDalf: (Long, InputStream) => Try[A]) {

import Errors._

Expand Down Expand Up @@ -79,14 +80,16 @@ class DarReader[A](
sequence(names.map(parseOne(f)))

private def parseOne(f: ZipFile)(s: String): Try[A] =
bracket(getZipEntryInputStream(f, s))(close).flatMap(parseDalf)
bracket(getZipEntryInputStream(f, s))({ case (_, is) => close(is) }).flatMap({
case (size, is) => parseDalf(size, is)
})

private def getZipEntryInputStream(f: ZipFile, name: String): Try[InputStream] =
private def getZipEntryInputStream(f: ZipFile, name: String): Try[(Long, InputStream)] =
for {
e <- Try(new ZipEntry(name))
e <- Try(f.getEntry(name))
is <- inputStream(f, e)
bis <- Try(new BufferedInputStream(is))
} yield bis
} yield (e.getSize(), bis)

private def close(is: InputStream): Try[Unit] = Try(is.close())
}
Expand All @@ -111,13 +114,15 @@ object Errors {

object DarReader {
def apply(): DarReader[(Ref.PackageId, DamlLf.ArchivePayload)] =
new DarReader(DarManifestReader.dalfNames, a => Try(Reader.decodeArchiveFromInputStream(a)))
new DarReader(DarManifestReader.dalfNames, {
case (_, a) => Try(Reader.decodeArchiveFromInputStream(a))
})

def apply[A](parseDalf: InputStream => Try[A]): DarReader[A] =
def apply[A](parseDalf: (Long, InputStream) => Try[A]): DarReader[A] =
new DarReader(DarManifestReader.dalfNames, parseDalf)
}

object DarReaderWithVersion
extends DarReader[((Ref.PackageId, DamlLf.ArchivePayload), LanguageMajorVersion)](
DarManifestReader.dalfNames,
a => Try(Reader.readArchiveAndVersion(a)))
{ case (_, a) => Try(Reader.readArchiveAndVersion(a)) })
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object UniversalArchiveReader {
private def parseDalf(is: InputStream) = Try(Reader.decodeArchiveFromInputStream(is))

private def parseDar[A](parseDalf: InputStream => Try[A]): ZipFile => Try[Dar[A]] =
DarReader(parseDalf).readArchive
DarReader { case (_, is) => parseDalf(is) }.readArchive
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package com.digitalasset.daml.lf.data

import scala.util.{Failure, Success, Try}

private[lf] object TryOps {
private[digitalasset] object TryOps {
def sequence[A](list: List[Try[A]]): Try[List[A]] = {
val zero: Try[List[A]] = Success(List.empty[A])
list.foldRight(zero)((a, as) => map2(a, as)(_ :: _))
Expand Down
6 changes: 6 additions & 0 deletions docs/source/support/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ Release Procedure
- Fixes to the CI/CD release procedure.
See `#1755 <https://github.com/digital-asset/daml/issues/1755>__.`

Sandbox
~~~~~~~

- Introduced a new API for package management.
See `#1311 <https://github.com/digital-asset/daml/issues/1311>`__.

.. _release-0-13-4:

0.13.4 - 2019-06-19
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.digitalasset.ledger.api.v1.TransactionServiceOuterClass.{
}
import com.digitalasset.ledger.api.v1.{CommandServiceGrpc, TransactionServiceGrpc}
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.config.{DamlPackageContainer, SandboxConfig}
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services.SandboxServerResource
import com.digitalasset.platform.services.time.{TimeModel, TimeProviderType}
import io.grpc.Channel
Expand All @@ -54,7 +54,7 @@ class CodegenLedgerTest extends FlatSpec with Matchers with BazelRunfiles {
def withClient(testCode: Channel => Assertion): Assertion = {
val cfg = SandboxConfig.default.copy(
port = 0,
damlPackageContainer = DamlPackageContainer(List(testDalf)),
damlPackages = List(testDalf),
ledgerIdMode = LedgerIdMode.Static(LedgerId(LedgerID)),
timeProviderType = TimeProviderType.WallClock,
timeModel = TimeModel.reasonableDefault
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@ private[codegen] object CodeGenRunner extends StrictLogging {
object ArchiveReader
extends DarReader[DamlLf.Archive](
DarManifestReader.dalfNames,
is => Try(DamlLf.Archive.parseFrom(is))
{ case (_, is) => Try(DamlLf.Archive.parseFrom(is)) }
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.digitalasset.ledger.client.configuration.{

import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.SandboxServer
import com.digitalasset.platform.sandbox.config.{DamlPackageContainer, SandboxConfig}
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.services.time.TimeProviderType
import com.digitalasset.sample.EventDecoder
import com.digitalasset.sample.MyMain.NameClashRecordVariant.NameClashRecordVariantA
Expand Down Expand Up @@ -105,7 +105,7 @@ class ScalaCodeGenIT

private val serverConfig = SandboxConfig.default.copy(
port = port,
damlPackageContainer = DamlPackageContainer(archives),
damlPackages = archives,
timeProviderType = TimeProviderType.WallClock,
ledgerIdMode = LedgerIdMode.Static(LedgerId(ledgerId)),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.digitalasset.ledger.client.services.commands.CommandClient
import com.digitalasset.ledger.client.services.transactions.TransactionClient
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.SandboxServer
import com.digitalasset.platform.sandbox.config.{DamlPackageContainer, SandboxConfig}
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.services.time.TimeProviderType
import com.google.protobuf.empty.Empty

Expand All @@ -52,7 +52,7 @@ object ExampleMain extends App {

private val serverConfig = SandboxConfig.default.copy(
port = port,
damlPackageContainer = DamlPackageContainer(List(dar)),
damlPackages = List(dar),
timeProviderType = TimeProviderType.WallClock,
ledgerIdMode = LedgerIdMode.Static(LedgerId(ledgerId)),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ service PackageManagementService {
// available on the whole ledger. This call might not be supported by some
// ledger implementations. Canton could be an example, where uploading a DAR
// is not sufficient to render it usable, it must be activated first.
// This method will return UNIMPLEMENTED, if DAR package uploading is not
// supported by the backing participant. If DAR file is too big or is
// malformed, the backing participant will respond with INVALID_ARGUMENT.
// This call may:
// - Succeed, if the package was successfully uploaded, or if the same package
// was already uploaded before.
// - Respond with UNIMPLEMENTED, if DAR package uploading is not supported by
// the backing participant.
// - Respond with INVALID_ARGUMENT, if the DAR file is too big or malformed.
// The maximum supported size is implementation specific.
rpc UploadDarFile (UploadDarFileRequest) returns (UploadDarFileResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object ReferenceServer extends App {

//val ledger = new Ledger(timeModel, tsb)
def archivesFromDar(file: File): List[Archive] = {
DarReader[Archive](x => Try(Archive.parseFrom(x)))
DarReader[Archive] { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchive(new ZipFile(file))
.fold(t => throw new RuntimeException(s"Failed to parse DAR from $file", t), dar => dar.all)
}
Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-api-client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ da_scala_library(
"//ledger-api/rs-grpc-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger/ledger-api-domain",
"//ledger/participant-state",
],
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.ledger.client.services.admin

import com.digitalasset.ledger.api.v1.admin.package_management_service.{
ListKnownPackagesRequest,
PackageDetails,
UploadDarFileRequest
}
import com.digitalasset.ledger.api.v1.admin.package_management_service.PackageManagementServiceGrpc.PackageManagementService
import com.google.protobuf.ByteString

import scala.concurrent.{ExecutionContext, Future}

class PackageManagementClient(service: PackageManagementService)(implicit ec: ExecutionContext) {
def listKnownPackages(): Future[Seq[PackageDetails]] = {
service
.listKnownPackages(ListKnownPackagesRequest())
.map(_.packageDetails)
}

def uploadDarFile(darFile: ByteString): Future[Unit] = {
service
.uploadDarFile(UploadDarFileRequest(darFile))
.map(_ => ())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.tests.integration.ledger.api

import java.io.File
import java.nio.file.Files
import java.util.zip.ZipFile

import com.digitalasset.daml.bazeltools.BazelRunfiles
import com.digitalasset.daml.lf.archive.{DarReader, Decode}
import com.digitalasset.daml.lf.language.Ast
import com.digitalasset.daml_lf.DamlLf.Archive

import scala.util.{Random, Try}
import com.digitalasset.ledger.api.testing.utils.{
AkkaBeforeAndAfterAll,
IsStatusException,
SuiteResourceManagementAroundAll
}
import com.digitalasset.ledger.api.v1.admin.package_management_service.PackageManagementServiceGrpc.PackageManagementService
import com.digitalasset.ledger.api.v1.commands.CreateCommand
import com.digitalasset.ledger.api.v1.transaction_filter.{Filters, TransactionFilter}
import com.digitalasset.ledger.api.v1.value.{Identifier, Record, RecordField}
import com.digitalasset.ledger.client.services.admin.PackageManagementClient
import com.digitalasset.platform.apitesting.LedgerContextExtensions._
import com.digitalasset.platform.apitesting.MultiLedgerFixture
import com.digitalasset.platform.participant.util.ValueConversions._
import io.grpc.Status
import com.google.protobuf.ByteString
import org.scalatest.{AsyncFreeSpec, Matchers}
import org.scalatest.Inspectors._
import org.scalatest.concurrent.AsyncTimeLimitedTests
import scalaz.syntax.traverse._
import scalaz.std.either._
import scalaz.std.list._

import scala.concurrent.Future

class PackageManagementServiceIT
extends AsyncFreeSpec
with AkkaBeforeAndAfterAll
with MultiLedgerFixture
with SuiteResourceManagementAroundAll
with AsyncTimeLimitedTests
with Matchers
with BazelRunfiles {

private val runSuffix = "-" + Random.alphanumeric.take(10).mkString
private val partyNameMangler =
(partyText: String) => partyText + runSuffix + Random.alphanumeric.take(10).mkString
private val commandIdMangler =
(testName: String, nodeId: String) => s"ledger-api-test-tool-$testName-$nodeId-$runSuffix"

override protected def config: Config = Config.default.copy(darFiles = Nil)

private def packageManagementService(stub: PackageManagementService): PackageManagementClient =
new PackageManagementClient(stub)

private case class LoadedPackage(size: Long, archive: Archive, pkg: Ast.Package)

private def loadTestDar: (Array[Byte], List[LoadedPackage], String) = {
val file = new File(rlocation("ledger/sandbox/Test.dar"))

val testDarBytes = Files.readAllBytes(file.toPath)

val testPackages = DarReader {
case (archiveSize, x) => Try(Archive.parseFrom(x)).map(ar => (archiveSize, ar))
}.readArchive(new ZipFile(file))
.fold(t => Left(s"Failed to parse DAR from $file: $t"), dar => Right(dar.all))
.flatMap {
_ traverseU {
case (archiveSize, archive) =>
Try(LoadedPackage(archiveSize, archive, Decode.decodeArchive(archive)._2)).toEither.left
.map(err => s"Could not parse archive $archive.getHash: $err")
}
}
.fold[List[LoadedPackage]](err => fail(err), scala.Predef.identity)

// Guesses the package ID of the test package.
// Note: the test DAR file contains 3 packages: the test package, stdlib, and daml-prim.
// The test package should be by far the smallest one, so we just sort the packages by size
// to avoid having to parse and inspect package details.
val testPackageId = testPackages
.sortBy(_.size)
.headOption
.getOrElse(fail("List of packages is empty"))
.archive
.getHash

(testDarBytes, testPackages, testPackageId)
}

private val (testDarBytes, testPackages, testPackageId) = loadTestDar

"should accept packages" in allFixtures { ctx =>
val client = packageManagementService(ctx.packageManagementService)

// Note: this may be a long running ledger, and the test package may have been uploaded before.
// Do not make any assertions on the initial state of the ledger.
for {
_ <- client.uploadDarFile(ByteString.copyFrom(testDarBytes))
finalPackages <- client.listKnownPackages()
} yield {
forAll(testPackages) { p =>
finalPackages.map(_.packageId).contains(p.archive.getHash) shouldBe true
}
forAll(finalPackages) { p =>
p.packageSize > 0 shouldBe true
}
}
}

"should accept duplicate packages" in allFixtures { ctx =>
val client = packageManagementService(ctx.packageManagementService)
val N = 8

// Package upload is idempotent, submitting duplicate packages should succeed.
// This test *concurrently* uploads the same package N times.
for {
_ <- Future.traverse(1 to N)(i => client.uploadDarFile(ByteString.copyFrom(testDarBytes)))
finalPackages <- client.listKnownPackages()
} yield {
forAll(testPackages) { p =>
finalPackages.map(_.packageId).contains(p.archive.getHash) shouldBe true
}
}
}

"fail with the expected status on an invalid upload" in allFixtures { ctx =>
packageManagementService(ctx.packageManagementService)
.uploadDarFile(ByteString.EMPTY)
.failed map { ex =>
IsStatusException(Status.INVALID_ARGUMENT.getCode)(ex)
}
}

"should accept commands using the uploaded package" in allFixtures { ctx =>
val party = partyNameMangler("operator")
val createArg = Record(fields = List(RecordField("operator", party.asParty)))
def createCmd =
CreateCommand(Some(Identifier(testPackageId, "", "Test", "Dummy")), Some(createArg)).wrap
val filter = TransactionFilter(Map(party -> Filters.defaultInstance))
val client = packageManagementService(ctx.packageManagementService)

for {
_ <- client.uploadDarFile(ByteString.copyFrom(testDarBytes))
createTx <- ctx.testingHelpers.submitAndListenForSingleResultOfCommand(
ctx.testingHelpers
.submitRequestWithId(commandIdMangler("PackageManagementServiceIT_commands", "create"))
.update(
_.commands.commands := List(createCmd),
_.commands.party := party
),
filter
)
createdEv = ctx.testingHelpers.getHead(ctx.testingHelpers.createdEventsIn(createTx))
} yield {
createdEv.templateId.map(_.packageId) shouldBe Some(testPackageId)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ReflectionIT
for {
response <- execRequest(ledger, listServices)
} yield {
response.getListServicesResponse.getServiceCount shouldEqual 12
response.getListServicesResponse.getServiceCount shouldEqual 13
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ import com.digitalasset.daml.bazeltools.BazelRunfiles._
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.ledger.api.tls.TlsConfiguration
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.config.{
CommandConfiguration,
DamlPackageContainer,
SandboxConfig
}
import com.digitalasset.platform.sandbox.config.{CommandConfiguration, SandboxConfig}
import com.digitalasset.platform.services.time.{TimeModel, TimeProviderType}
import scalaz.NonEmptyList

Expand Down Expand Up @@ -118,7 +114,7 @@ object PlatformApplications {
address = None,
port = selectedPort,
None,
damlPackageContainer = DamlPackageContainer(config.darFiles.map(_.toFile)),
damlPackages = config.darFiles.map(_.toFile),
timeProviderType = config.timeProviderType,
timeModel = config.timeModel,
commandConfig = config.commandConfiguration,
Expand Down
Loading

0 comments on commit c14b909

Please sign in to comment.