Skip to content

Commit

Permalink
resources: Customizable contexts. (#7678)
Browse files Browse the repository at this point in the history
* resources: Move builders into //ledger/ledger-resources.

Keep the actual constructors in a trait, but instantiate it when working
with ledger code.

This allows us to later introduce an extra "context" type parameter to
ResourceOwner.

* resources-akka: Move the builders in to //ledger/ledger-resources.

* resources: Introduce an abstract `Context` parameter for owners.

This replaces the concrete `ExecutionContext`. While it _can_ be an
execution context, it really doesn't matter as long as we can get at one
somehow.

This is being introduced so we can wrap the context in a container,
either for type tagging or to include extra information.

Because our current context _is_ `ExecutionContext`, and an implicit is
provided to extract it, we can end up with two ways to get the same
value. We use shadowing to prevent this. This problem should go away in
the near future when a new context type is added.

CHANGELOG_BEGIN
- [Integration Kit] The `ResourceOwner` type is now parameterized by a
  `Context`, which is filled in by the corresponding `Context` class in
  the _ledger-resources_ dependency. This allows us to pass extra
  information through resource acquisition.
CHANGELOG_END

* ledger-resources: Move `ResourceOwner` here from `resources`.

* ledger-resources: Remove dependencies from outside //ledger.

* ledger-resource: Wrap the acquisition execution context in `Context`.

So we can add a logging context to it.

* resources: Pass the Context, not the ExecutionContext, to Resource.

* Avoid importing `HasExecutionContext`.

* ledger-resources: Publish to Maven Central.

* resources: Make the small changes suggested by @stefanobaghino-da.

Co-Authored-By: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* ledger-resources: Pull out a trait for test resource contexts.

Saves a few lines of code.

* Restore some imports that were accidentally wildcarded.

* resources: Replace an `implicit def` with a couple of imports.

* participant-integration-api: Simplify the JdbcLedgerDaoBackend tests.

Try and use the right execution context where possible.

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>
  • Loading branch information
SamirTalwar and stefanobaghino-da authored Oct 20, 2020
1 parent 3665025 commit 7f679b9
Show file tree
Hide file tree
Showing 133 changed files with 1,030 additions and 788 deletions.
1 change: 1 addition & 0 deletions daml-script/runner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ da_scala_library(
"//ledger/ledger-api-auth",
"//ledger/ledger-api-client",
"//ledger/ledger-api-common",
"//ledger/ledger-resources",
"//ledger/participant-integration-api",
"//ledger/participant-state",
"//ledger/sandbox-classic",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.stream.Materializer
import com.daml.daml_lf_dev.DamlLf
import com.daml.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.resources.ResourceContext
import com.daml.lf.PureCompiledPackages
import com.daml.lf.archive.{Dar, DarReader, Decode}
import com.daml.lf.data.Ref.{Identifier, PackageId, QualifiedName}
Expand Down Expand Up @@ -58,7 +59,8 @@ object TestMain extends StrictLogging {
implicit val sequencer: ExecutionSequencerFactory =
new AkkaExecutionSequencerPool("ScriptTestPool")(system)
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
implicit val executionContext: ExecutionContext = system.dispatcher
implicit val resourceContext: ResourceContext = ResourceContext(executionContext)

val (participantParams, participantCleanup) = config.participantConfig match {
case Some(file) =>
Expand Down
3 changes: 3 additions & 0 deletions daml-script/test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ da_scala_library(
"//ledger/ledger-api-common",
"//ledger/ledger-on-memory",
"//ledger/ledger-on-memory:ledger-on-memory-app",
"//ledger/ledger-resources",
"//ledger/participant-integration-api",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
Expand Down Expand Up @@ -179,6 +180,7 @@ da_scala_test_suite(
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/ledger-resources",
"//ledger/participant-integration-api",
"//ledger/participant-integration-api:participant-integration-api-tests-lib",
"//ledger/participant-state",
Expand All @@ -191,6 +193,7 @@ da_scala_test_suite(
"//ledger/test-common",
"//libs-scala/ports",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"@maven//:com_auth0_java_jwt",
"@maven//:com_typesafe_akka_akka_http_core_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,70 +3,65 @@

package com.daml.lf.engine.script.test

import akka.actor.ActorSystem
import akka.http.scaladsl.Http.ServerBinding
import akka.stream.Materializer
import io.grpc.Channel
import java.io.File
import java.nio.file.Files

import org.scalatest._

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.control.NonFatal
import scalaz.{-\/, \/-}
import scalaz.syntax.traverse._
import spray.json._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http.ServerBinding
import akka.stream.Materializer
import com.daml.bazeltools.BazelRunfiles._
import com.daml.ledger.api.refinements.ApiTypes.ApplicationId
import com.daml.lf.archive.{Dar, DarReader}
import com.daml.lf.archive.Decode
import com.daml.lf.data.Ref._
import com.daml.lf.engine.script.{
ApiParameters,
Participant,
Participants,
Runner,
ScriptLedgerClient,
ScriptTimeMode,
}
import com.daml.lf.iface.EnvironmentInterface
import com.daml.lf.iface.reader.InterfaceReader
import com.daml.lf.language.Ast.Package
import com.daml.lf.speedy.SError._
import com.daml.lf.speedy.SValue
import com.daml.lf.speedy.SValue._
import com.daml.lf.value.json.ApiCodecCompressed
import com.daml.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.daml.http.HttpService
import com.daml.jwt.{JwtSigner, HMAC256Verifier}
import com.daml.jwt.domain.DecodedJwt
import com.daml.jwt.{HMAC256Verifier, JwtSigner}
import com.daml.ledger.api.auth.{AuthServiceJWT, AuthServiceJWTCodec, AuthServiceJWTPayload}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.refinements.ApiTypes.ApplicationId
import com.daml.ledger.api.testing.utils.{
OwnedResource,
SuiteResource,
SuiteResourceManagementAroundAll,
Resource => TestResource
}
import com.daml.ledger.api.auth.{AuthServiceJWT, AuthServiceJWTCodec, AuthServiceJWTPayload}
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.archive.{Dar, DarReader, Decode}
import com.daml.lf.data.Ref._
import com.daml.lf.engine.script._
import com.daml.lf.iface.EnvironmentInterface
import com.daml.lf.iface.reader.InterfaceReader
import com.daml.lf.language.Ast.Package
import com.daml.lf.speedy.SError._
import com.daml.lf.speedy.SValue
import com.daml.lf.speedy.SValue._
import com.daml.lf.value.json.ApiCodecCompressed
import com.daml.platform.apiserver.services.GrpcClientResource
import com.daml.platform.common.LedgerIdMode
import com.daml.platform.sandbox.{AbstractSandboxFixture, SandboxServer}
import com.daml.platform.sandbox.config.SandboxConfig
import com.daml.platform.sandbox.services.TestCommands
import com.daml.platform.sandbox.{AbstractSandboxFixture, SandboxServer}
import com.daml.ports.Port
import com.daml.resources.{Resource, ResourceOwner}
import io.grpc.Channel
import org.scalatest._
import scalaz.syntax.traverse._
import scalaz.{-\/, \/-}
import spray.json._

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import scala.util.control.NonFatal

trait JsonApiFixture
extends AbstractSandboxFixture
with SuiteResource[(SandboxServer, Channel, ServerBinding)] {
self: Suite =>

override protected def darFile = new File(rlocation("daml-script/test/script-test.dar"))

protected val darFileNoLedger = new File(rlocation("daml-script/test/script-test-no-ledger.dar"))

protected def server: SandboxServer = suiteResource.value._1

override protected def serverPort: Port = server.port
override protected def channel: Channel = suiteResource.value._2
override protected def config: SandboxConfig =
Expand Down Expand Up @@ -119,16 +114,16 @@ trait JsonApiFixture

override protected lazy val suiteResource
: TestResource[(SandboxServer, Channel, ServerBinding)] = {
implicit val ec: ExecutionContext = system.dispatcher
new OwnedResource[(SandboxServer, Channel, ServerBinding)](
implicit val context: ResourceContext = ResourceContext(system.dispatcher)
new OwnedResource[ResourceContext, (SandboxServer, Channel, ServerBinding)](
for {
jdbcUrl <- database
.fold[ResourceOwner[Option[String]]](ResourceOwner.successful(None))(_.map(info =>
Some(info.jdbcUrl)))
server <- SandboxServer.owner(config.copy(jdbcUrl = jdbcUrl))
channel <- GrpcClientResource.owner(server.port)
httpService <- new ResourceOwner[ServerBinding] {
override def acquire()(implicit ec: ExecutionContext): Resource[ServerBinding] = {
override def acquire()(implicit context: ResourceContext): Resource[ServerBinding] = {
Resource[ServerBinding] {
Files.write(jsonAccessTokenFile, getToken(List(), false).getBytes())
val config = new HttpService.DefaultStartSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.daml.ledger.on.memory.{ExtraConfig, Owner}
import com.daml.ledger.participant.state.kvutils.app.{ParticipantConfig, ParticipantRunMode}
import com.daml.ledger.participant.state.kvutils.{app => kvutils}
import com.daml.ledger.participant.state.v1
import com.daml.ledger.resources.ResourceContext
import com.daml.lf.engine.script._
import com.daml.ports.Port
import org.scalatest.Suite
Expand Down Expand Up @@ -66,8 +67,8 @@ trait MultiParticipantFixture
managementServiceTimeout = ParticipantConfig.defaultManagementServiceTimeout,
)
override protected lazy val suiteResource = {
implicit val ec: ExecutionContext = system.dispatcher
new OwnedResource[(Port, Port)](
implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher)
new OwnedResource[ResourceContext, (Port, Port)](
for {
_ <- Owner(
kvutils.Config
Expand Down
1 change: 1 addition & 0 deletions extractor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ da_scala_test_suite(
"//ledger/ledger-api-client",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-resources",
"//ledger/participant-integration-api",
"//ledger/participant-state",
"//ledger/sandbox-classic",
Expand Down
2 changes: 2 additions & 0 deletions language-support/java/codegen/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ da_scala_test(
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-resources",
"//ledger/ledger-resources:ledger-resources-test-lib",
"//ledger/participant-integration-api",
"//ledger/participant-integration-api:participant-integration-api-tests-lib",
"//ledger/participant-state",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import java.time.temporal.ChronoField
import java.time.{Instant, LocalDate, ZoneOffset}

import com.daml.ledger.javaapi.data.{Unit => DamlUnit}
import com.daml.ledger.resources.TestResourceContext
import com.daml.lf.data.Numeric
import org.scalatest.{AsyncFlatSpec, Matchers}
import wolpertinger.color.Grey
import wolpertinger.{Color, Wolpertinger}

import scala.collection.JavaConverters._

class CodegenLedgerTest extends AsyncFlatSpec with Matchers {
class CodegenLedgerTest extends AsyncFlatSpec with Matchers with TestResourceContext {

import TestUtil._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

package com.daml

import com.daml.ledger.resources.TestResourceContext
import org.scalatest.{AsyncFlatSpec, Matchers}
import stakeholders.{ExplicitObservers, ImplicitObservers, MixedObservers, OnlySignatories}

class StakeholdersTest extends AsyncFlatSpec with Matchers {
class StakeholdersTest extends AsyncFlatSpec with Matchers with TestResourceContext {

import TestUtil._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.daml.ledger.api.v1.TransactionServiceOuterClass.{
import com.daml.ledger.api.v1.{CommandServiceGrpc, TransactionServiceGrpc}
import com.daml.ledger.javaapi.data
import com.daml.ledger.javaapi.data._
import com.daml.ledger.resources.ResourceContext
import com.daml.platform.apiserver.services.GrpcClientResource
import com.daml.platform.common.LedgerIdMode
import com.daml.platform.sandbox
Expand All @@ -30,7 +31,7 @@ import io.grpc.Channel
import org.scalatest.Assertion

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
import scala.language.implicitConversions

object TestUtil {
Expand All @@ -41,8 +42,7 @@ object TestUtil {
val LedgerID = "ledger-test"

def withClient(testCode: Channel => Assertion)(
implicit executionContext: ExecutionContext
): Future[Assertion] = {
implicit resourceContext: ResourceContext): Future[Assertion] = {
val config = sandbox.DefaultConfig.copy(
port = Port.Dynamic,
damlPackages = List(testDalf),
Expand All @@ -54,7 +54,7 @@ object TestUtil {
server <- SandboxServer.owner(config)
channel <- GrpcClientResource.owner(server.port)
} yield channel
channelOwner.use(channel => Future(testCode(channel)))
channelOwner.use(channel => Future(testCode(channel))(resourceContext.executionContext))
}

// unfortunately this is needed to help with passing functions to rxjava methods like Flowable#map
Expand Down
1 change: 1 addition & 0 deletions language-support/scala/codegen-sample-app/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ da_scala_test(
"//ledger/ledger-api-auth",
"//ledger/ledger-api-client",
"//ledger/ledger-api-common",
"//ledger/ledger-resources",
"//ledger/participant-integration-api",
"//ledger/participant-state",
"//ledger/sandbox-classic",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@
package com.daml.ledger.api.testing.utils

import com.daml.resources
import com.daml.resources.AbstractResourceOwner

import scala.concurrent.Await
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext}
import scala.reflect.ClassTag

final class OwnedResource[T: ClassTag](
owner: resources.ResourceOwner[T],
final class OwnedResource[Context, T: ClassTag](
owner: AbstractResourceOwner[Context, T],
acquisitionTimeout: FiniteDuration = 30.seconds,
releaseTimeout: FiniteDuration = 30.seconds,
)(implicit executionContext: ExecutionContext)
)(implicit context: Context)
extends ManagedResource[T] {
private var resource: resources.Resource[T] = _
private var resource: resources.Resource[Context, T] = _

override def construct(): T = {
resource = owner.acquire()
Expand Down
1 change: 1 addition & 0 deletions ledger/daml-on-sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ da_scala_library(
"//daml-lf/data",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-resources",
"//ledger/participant-integration-api",
"//ledger/participant-state",
"//ledger/sandbox-classic",
Expand Down
3 changes: 2 additions & 1 deletion ledger/daml-on-sql/src/main/scala/on/sql/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.on.sql

import com.daml.ledger.resources.ResourceContext
import com.daml.platform.sandbox.config.SandboxConfig
import com.daml.platform.sandbox.{GlobalLogLevel, SandboxServer}
import com.daml.resources.ProgramResource
Expand All @@ -16,5 +17,5 @@ object Main {
}

private[sql] def run(config: SandboxConfig): Unit =
new ProgramResource(SandboxServer.owner(Name, config)).run()
new ProgramResource(SandboxServer.owner(Name, config)).run(ResourceContext.apply)
}
2 changes: 2 additions & 0 deletions ledger/ledger-api-client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ da_scala_library(
"//ledger-api/rs-grpc-bridge",
"//ledger/ledger-api-auth-client",
"//ledger/ledger-api-domain",
"//ledger/ledger-resources",
"//libs-scala/concurrent",
"//libs-scala/grpc-utils",
"//libs-scala/ports",
Expand Down Expand Up @@ -72,6 +73,7 @@ da_scala_test_suite(
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-resources",
"//ledger/participant-integration-api",
"//ledger/participant-state",
"//ledger/sandbox",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ package com.daml.ledger.client
import java.net.{InetAddress, InetSocketAddress}

import com.daml.ledger.client.configuration.LedgerClientConfiguration
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ports.Port
import com.daml.resources.{Resource, ResourceOwner}
import io.grpc.ManagedChannel
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future

object GrpcChannel {

Expand All @@ -35,7 +35,7 @@ object GrpcChannel {
configuration,
)

override def acquire()(implicit executionContext: ExecutionContext): Resource[ManagedChannel] =
override def acquire()(implicit context: ResourceContext): Resource[ManagedChannel] =
Resource(Future(GrpcChannel(builder, configuration)))(channel =>
Future {
channel.shutdownNow()
Expand Down
2 changes: 2 additions & 0 deletions ledger/ledger-api-common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ da_scala_library(
"//ledger/ledger-api-akka",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-resources",
"//ledger/metrics",
"//libs-scala/concurrent",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_dropwizard_metrics_metrics_core",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package com.daml.platform.akkastreams.dispatcher

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.resources.ResourceOwner
import com.daml.ledger.resources.ResourceOwner

/**
* A fanout signaller, representing a stream of external updates,
Expand Down
Loading

0 comments on commit 7f679b9

Please sign in to comment.