Skip to content

Commit

Permalink
resources: Remove the default factories. (#8645)
Browse files Browse the repository at this point in the history
* resources: Wrapper constructors around the normal Resource types.

* resources: Factor out `ResourceFactories#apply`.

* resources: Remove the global factories.

They're no longer necessary, and could cause confusion. Users should use
their own factory object, not a generic one.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
SamirTalwar authored Jan 27, 2021
1 parent 1351b08 commit 694a721
Show file tree
Hide file tree
Showing 20 changed files with 71 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@ import com.daml.ledger.api.testtool.infrastructure.Reporter.ColorizedPrintStream
import com.daml.ledger.api.testtool.infrastructure._
import com.daml.ledger.api.testtool.tests.Tests
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.resources.{AbstractResourceOwner, Resource}
import io.grpc.Channel
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object LedgerApiTestTool {

private type ResourceOwner[T] = com.daml.resources.AbstractResourceOwner[ExecutionContext, T]
private type Resource[T] = com.daml.resources.Resource[ExecutionContext, T]
private val Resource = new com.daml.resources.ResourceFactories[ExecutionContext]

private[this] val logger = LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))

// The suffix that will be appended to all party and command identifiers to ensure
Expand Down Expand Up @@ -212,7 +215,7 @@ object LedgerApiTestTool {
host: String,
port: Int,
tlsConfig: Option[TlsConfiguration],
): AbstractResourceOwner[ExecutionContext, Channel] = {
): ResourceOwner[Channel] = {
logger.info(s"Setting up managed channel to participant at $host:$port...")
val channelBuilder = NettyChannelBuilder.forAddress(host, port).usePlaintext()
for (ssl <- tlsConfig; sslContext <- ssl.client) {
Expand All @@ -229,7 +232,7 @@ object LedgerApiTestTool {
private def initializeParticipantChannels(
participants: Vector[(String, Int)],
tlsConfig: Option[TlsConfiguration],
)(implicit executionContext: ExecutionContext): Resource[ExecutionContext, Vector[Channel]] = {
)(implicit executionContext: ExecutionContext): Resource[Vector[Channel]] = {
val participantChannelOwners =
for ((host, port) <- participants) yield {
initializeParticipantChannel(host, port, tlsConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

package com.daml.testing.postgresql

import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, Resource}
import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, ReleasableResource, Resource}

import scala.concurrent.Future

object PostgresResource {
def owner[Context: HasExecutionContext](): AbstractResourceOwner[Context, PostgresDatabase] =
new AbstractResourceOwner[Context, PostgresDatabase] with PostgresAround {
override def acquire()(implicit context: Context): Resource[Context, PostgresDatabase] =
Resource[Context].apply(Future {
ReleasableResource(Future {
connectToPostgresqlServer()
createNewRandomDatabase()
})(_ => Future(disconnectFromPostgresqlServer()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
package com.daml.resources.akka

import akka.stream.Materializer
import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, Resource}
import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, ReleasableResource, Resource}

import scala.concurrent.Future

class ActorMaterializerResourceOwner[Context: HasExecutionContext](
acquireMaterializer: () => Materializer
) extends AbstractResourceOwner[Context, Materializer] {
override def acquire()(implicit context: Context): Resource[Context, Materializer] =
Resource[Context].apply(Future(acquireMaterializer()))(materializer =>
ReleasableResource(Future(acquireMaterializer()))(materializer =>
Future(materializer.shutdown())
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package com.daml.resources.akka

import akka.actor.ActorSystem
import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, Resource}
import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, ReleasableResource, Resource}

import scala.concurrent.Future

class ActorSystemResourceOwner[Context: HasExecutionContext](acquireActorSystem: () => ActorSystem)
extends AbstractResourceOwner[Context, ActorSystem] {
override def acquire()(implicit context: Context): Resource[Context, ActorSystem] =
Resource[Context].apply(Future(acquireActorSystem()))(_.terminate().map(_ => ()))
ReleasableResource(Future(acquireActorSystem()))(_.terminate().map(_ => ()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.resources.grpc

import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, Resource}
import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, ReleasableResource, Resource}
import io.grpc.{ManagedChannel, ManagedChannelBuilder}

import scala.concurrent.Future
Expand All @@ -14,7 +14,7 @@ private[grpc] final class ManagedChannelResourceOwner[Context: HasExecutionConte
shutdownTimeout: FiniteDuration,
) extends AbstractResourceOwner[Context, ManagedChannel] {
override def acquire()(implicit context: Context): Resource[Context, ManagedChannel] =
Resource[Context].apply(Future(builder.build())) { channel =>
ReleasableResource(Future(builder.build())) { channel =>
Future {
channel.shutdown()
channel.awaitTermination(shutdownTimeout.length, shutdownTimeout.unit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.daml.resources.grpc
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit.MILLISECONDS

import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, Resource}
import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, ReleasableResource, Resource}
import io.netty.channel.nio.NioEventLoopGroup

import scala.concurrent.{Future, Promise}
Expand All @@ -17,7 +17,7 @@ private[grpc] final class NioEventLoopGroupResourceOwner[Context: HasExecutionCo
threadFactory: ThreadFactory,
) extends AbstractResourceOwner[Context, NioEventLoopGroup] {
override def acquire()(implicit context: Context): Resource[Context, NioEventLoopGroup] =
Resource[Context].apply(Future(new NioEventLoopGroup(threadCount, threadFactory))) {
ReleasableResource(Future(new NioEventLoopGroup(threadCount, threadFactory))) {
eventLoopGroup =>
val promise = Promise[Unit]()
val future = eventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.resources.grpc

import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, Resource}
import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, ReleasableResource, Resource}
import io.grpc.{Server, ServerBuilder}

import scala.concurrent.Future
Expand All @@ -14,7 +14,7 @@ class ServerResourceOwner[Context: HasExecutionContext](
shutdownTimeout: FiniteDuration,
) extends AbstractResourceOwner[Context, Server] {
override def acquire()(implicit context: Context): Resource[Context, Server] =
Resource[Context].apply(Future(builder.build().start())) { server =>
ReleasableResource(Future(builder.build().start())) { server =>
Future {
// Ask to shutdown gracefully, but wait for termination for the specified timeout.
val done = server.shutdown().awaitTermination(shutdownTimeout.length, shutdownTimeout.unit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ class CloseableResourceOwner[Context: HasExecutionContext, T <: AutoCloseable](
acquireCloseable: () => T
) extends AbstractResourceOwner[Context, T] {
override def acquire()(implicit context: Context): Resource[Context, T] =
Resource.apply(Future(acquireCloseable()))(closeable => Future(closeable.close()))
ReleasableResource(Future(acquireCloseable()))(closeable => Future(closeable.close()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class ExecutorServiceResourceOwner[Context: HasExecutionContext, T <: ExecutorSe
acquireExecutorService: () => T
) extends AbstractResourceOwner[Context, T] {
override def acquire()(implicit context: Context): Resource[Context, T] =
Resource.apply(Future {
ReleasableResource(Future {
val executorService = acquireExecutorService()
// If we try and release an executor service which is itself being used to power the
// releasing, we end up in a deadlock—the executor can't shut down, and therefore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ class FutureCloseableResourceOwner[Context: HasExecutionContext, T <: AutoClosea
acquireFutureCloseable: () => Future[T]
) extends AbstractResourceOwner[Context, T] {
override def acquire()(implicit context: Context): Resource[Context, T] =
Resource.apply(acquireFutureCloseable())(closeable => Future(closeable.close()))
ReleasableResource(acquireFutureCloseable())(closeable => Future(closeable.close()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import scala.concurrent.Future
class FutureResourceOwner[Context: HasExecutionContext, T](acquireFuture: () => Future[T])
extends AbstractResourceOwner[Context, T] {
override def acquire()(implicit context: Context): Resource[Context, T] =
Resource.fromFuture(acquireFuture())
PureResource(acquireFuture())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}

/** Nests release operation for a [[Resource]]'s future. */
private[resources] final class NestedResource[Context: HasExecutionContext, T](future: Future[T])(
private[resources] final class NestedResource[Context: HasExecutionContext, T] private (
future: Future[T]
)(
releaseResource: T => Future[Unit],
releaseSubResources: () => Future[Unit],
)(implicit context: Context)
Expand Down Expand Up @@ -49,3 +51,11 @@ private[resources] final class NestedResource[Context: HasExecutionContext, T](f
else // A release is already in progress or completed; we wait for that instead
releasePromise.future
}

object NestedResource {
def apply[Context: HasExecutionContext, T](future: Future[T])(
releaseResource: T => Future[Unit],
releaseSubResources: () => Future[Unit],
)(implicit context: Context): Resource[Context, T] =
new NestedResource(future)(releaseResource, releaseSubResources)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.resources.ProgramResource._

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try
import scala.util.control.{NoStackTrace, NonFatal}

Expand All @@ -26,7 +26,7 @@ final class ProgramResource[Context: HasExecutionContext, T](
newLoggingContext { implicit loggingContext =>
val resource = {
implicit val context: Context = newContext(ExecutionContext.fromExecutor(executorService))
Try(owner.acquire()).fold(Resource.failed, identity)
Try(owner.acquire()).fold(exception => PureResource(Future.failed(exception)), identity)
}

def stop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ package com.daml.resources
import scala.concurrent.Future

/** Represents a pure success or failure as a resource. */
private[resources] final class PureResource[Context: HasExecutionContext, T](future: Future[T])
extends Resource[Context, T] {
private[resources] final class PureResource[Context: HasExecutionContext, T] private (
future: Future[T]
) extends Resource[Context, T] {
override def asFuture: Future[T] = future

override def release(): Future[Unit] = Future.unit
}

object PureResource {
def apply[Context: HasExecutionContext, T](future: Future[T]): Resource[Context, T] =
new PureResource(future)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.resources

import scala.concurrent.Future

object ReleasableResource {
def apply[Context: HasExecutionContext, T](
future: Future[T]
)(releaseResource: T => Future[Unit])(implicit context: Context): Resource[Context, T] =
NestedResource(future)(releaseResource, () => Future.unit)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ abstract class Resource[Context: HasExecutionContext, +A] {
*/
def map[B](f: A => B)(implicit context: Context): R[B] =
// A mapped Resource is a mapped future plus a nesting of an empty release operation and the actual one
new NestedResource(asFuture.map(f))(_ => Future.unit, release _)
NestedResource(asFuture.map(f))(_ => Future.unit, release _)

/** Just like [[Future]]s, [[Resource]]s can be chained. Both component [[Resource]]s will be released correctly
* upon failure and explicit release.
Expand All @@ -48,7 +48,7 @@ abstract class Resource[Context: HasExecutionContext, +A] {
case Failure(_) => Future.unit // Already released by future failure
}
val future = nextFuture.flatMap(_.asFuture)
new NestedResource(future)(
NestedResource(future)(
nextRelease,
release _,
) // Nest next resource release and this resource release
Expand All @@ -63,7 +63,7 @@ abstract class Resource[Context: HasExecutionContext, +A] {
else
Future.failed(new ResourceAcquisitionFilterException())
)
new NestedResource(future)(_ => Future.unit, release _)
NestedResource(future)(_ => Future.unit, release _)
}

/** A nested resource can be flattened.
Expand All @@ -74,7 +74,7 @@ abstract class Resource[Context: HasExecutionContext, +A] {
/** Just like [[Future]]s, an attempted [[Resource]] computation can be transformed.
*/
def transformWith[B](f: Try[A] => R[B])(implicit context: Context): R[B] =
new NestedResource(asFuture.transformWith(f.andThen(Future.successful)))(
NestedResource(asFuture.transformWith(f.andThen(Future.successful)))(
nested => nested.release(),
release _,
).flatten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,30 @@ final class ResourceFactories[Context: HasExecutionContext] {

/** Builds a [[Resource]] from a [[Future]] and some release logic.
*/
def apply[T](future: Future[T])(releaseResource: T => Future[Unit])(implicit
context: Context
): R[T] =
new NestedResource(future)(releaseResource, () => Future.unit)
def apply[T](
future: Future[T]
)(releaseResource: T => Future[Unit])(implicit context: Context): R[T] =
ReleasableResource(future)(releaseResource)

/** Wraps a simple [[Future]] in a [[Resource]] that doesn't need to be released.
*/
def fromFuture[T](future: Future[T]): R[T] =
new PureResource(future)
PureResource(future)

/** Produces a [[Resource]] that has already succeeded with the [[Unit]] value.
*/
def unit: R[Unit] =
fromFuture(Future.unit)
PureResource(Future.unit)

/** Produces a [[Resource]] that has already succeeded with a given value.
*/
def successful[T](value: T): R[T] =
fromFuture(Future.successful(value))
PureResource(Future.successful(value))

/** Produces a [[Resource]] that has already failed with a given exception.
*/
def failed[T](exception: Throwable): R[T] =
fromFuture(Future.failed(exception))
PureResource(Future.failed(exception))

/** Sequences a [[Traversable]] of [[Resource]]s into a [[Resource]] of the [[Traversable]] of their values.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import scala.concurrent.Future
class TimerResourceOwner[Context: HasExecutionContext](acquireTimer: () => Timer)
extends AbstractResourceOwner[Context, Timer] {
override def acquire()(implicit context: Context): Resource[Context, Timer] =
Resource.apply(Future(acquireTimer()))(timer => Future(timer.cancel()))
ReleasableResource(Future(acquireTimer()))(timer => Future(timer.cancel()))
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ final class TestResourceOwner[T](acquire: Future[T], release: T => Future[Unit])
if (!acquired.compareAndSet(false, true)) {
throw new TriedToAcquireTwice
}
Resource[TestContext].apply(acquire)(value =>
ReleasableResource(acquire)(value =>
if (acquired.compareAndSet(true, false))
release(value)
else
Expand Down

0 comments on commit 694a721

Please sign in to comment.