From 44991a8646a4f22ead071b698789d964f977d413 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 27 Jan 2021 11:19:08 +0100 Subject: [PATCH 1/3] resources: Extract out `Resource.nest` into its own class. --- .../resources/NestedResource.scala | 51 +++++++++++++++++++ .../com/digitalasset/resources/Resource.scala | 18 +++---- .../resources/ResourceFactories.scala | 48 ++--------------- 3 files changed, 61 insertions(+), 56 deletions(-) create mode 100644 libs-scala/resources/src/main/scala/com/digitalasset/resources/NestedResource.scala diff --git a/libs-scala/resources/src/main/scala/com/digitalasset/resources/NestedResource.scala b/libs-scala/resources/src/main/scala/com/digitalasset/resources/NestedResource.scala new file mode 100644 index 000000000000..95e26a840253 --- /dev/null +++ b/libs-scala/resources/src/main/scala/com/digitalasset/resources/NestedResource.scala @@ -0,0 +1,51 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.resources + +import java.util.concurrent.atomic.AtomicBoolean + +import com.daml.resources.HasExecutionContext.executionContext + +import scala.concurrent.{Future, Promise} +import scala.util.{Failure, Success} + +/** Nests release operation for a [[Resource]]'s future. */ +private[resources] final class NestedResource[Context: HasExecutionContext, T](future: Future[T])( + releaseResource: T => Future[Unit], + releaseSubResources: () => Future[Unit], +)(implicit context: Context) + extends Resource[Context, T] { + lazy val asFuture: Future[T] = future.transformWith { + case Success(value) => Future.successful(value) + case Failure(throwable) => + release().flatMap(_ => Future.failed(throwable)) // Release everything on failure + } + + private val released: AtomicBoolean = new AtomicBoolean(false) // Short-circuits to a promise + private val releasePromise: Promise[Unit] = Promise() // Will be the release return handle + + def release(): Future[Unit] = + if (released.compareAndSet(false, true)) + // If `release` is called twice, we wait for `releasePromise` to complete instead + // `released` is set atomically to ensure we don't end up with two concurrent releases + future + .transformWith { + case Success(value) => + releaseResource(value).flatMap(_ => releaseSubResources()) // Release all + case Failure(_) => + releaseSubResources() // Only sub-release as the future will take care of itself + } + .transform( // Finally, complete `releasePromise` to allow other releases to complete + value => { + releasePromise.success(()) + value + }, + exception => { + releasePromise.success(()) + exception + }, + ) + else // A release is already in progress or completed; we wait for that instead + releasePromise.future +} diff --git a/libs-scala/resources/src/main/scala/com/digitalasset/resources/Resource.scala b/libs-scala/resources/src/main/scala/com/digitalasset/resources/Resource.scala index 45b881ce7411..93ac288167a4 100644 --- a/libs-scala/resources/src/main/scala/com/digitalasset/resources/Resource.scala +++ b/libs-scala/resources/src/main/scala/com/digitalasset/resources/Resource.scala @@ -17,8 +17,6 @@ abstract class Resource[Context: HasExecutionContext, +A] { private type R[+T] = Resource[Context, T] - private val Resource = new ResourceFactories[Context] - /** Every [[Resource]] has an underlying [[Future]] representation. */ def asFuture: Future[A] @@ -32,7 +30,7 @@ abstract class Resource[Context: HasExecutionContext, +A] { */ def map[B](f: A => B)(implicit context: Context): R[B] = // A mapped Resource is a mapped future plus a nesting of an empty release operation and the actual one - Resource.nest(asFuture.map(f))(_ => Future.unit, release _) + new NestedResource(asFuture.map(f))(_ => Future.unit, release _) /** Just like [[Future]]s, [[Resource]]s can be chained. Both component [[Resource]]s will be released correctly * upon failure and explicit release. @@ -50,7 +48,7 @@ abstract class Resource[Context: HasExecutionContext, +A] { case Failure(_) => Future.unit // Already released by future failure } val future = nextFuture.flatMap(_.asFuture) - Resource.nest(future)( + new NestedResource(future)( nextRelease, release _, ) // Nest next resource release and this resource release @@ -65,7 +63,7 @@ abstract class Resource[Context: HasExecutionContext, +A] { else Future.failed(new ResourceAcquisitionFilterException()) ) - Resource.nest(future)(_ => Future.unit, release _) + new NestedResource(future)(_ => Future.unit, release _) } /** A nested resource can be flattened. @@ -76,10 +74,8 @@ abstract class Resource[Context: HasExecutionContext, +A] { /** Just like [[Future]]s, an attempted [[Resource]] computation can be transformed. */ def transformWith[B](f: Try[A] => R[B])(implicit context: Context): R[B] = - Resource - .nest(asFuture.transformWith(f.andThen(Future.successful)))( - nested => nested.release(), - release _, - ) - .flatten + new NestedResource(asFuture.transformWith(f.andThen(Future.successful)))( + nested => nested.release(), + release _, + ).flatten } diff --git a/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala index a9a4e93fb612..dedd793a7a66 100644 --- a/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala +++ b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala @@ -3,64 +3,22 @@ package com.daml.resources -import java.util.concurrent.atomic.AtomicBoolean - import com.daml.resources.HasExecutionContext.executionContext import scala.collection.compat._ -import scala.concurrent.{Future, Promise} -import scala.util.{Failure, Success} +import scala.concurrent.Future +import scala.language.higherKinds final class ResourceFactories[Context: HasExecutionContext] { private type R[+T] = Resource[Context, T] - /** Nests release operation for a [[Resource]]'s future. - */ - private[resources] def nest[T](future: Future[T])( - releaseResource: T => Future[Unit], - releaseSubResources: () => Future[Unit], - )(implicit context: Context): R[T] = new R[T] { - final lazy val asFuture: Future[T] = future.transformWith { - case Success(value) => Future.successful(value) - case Failure(throwable) => - release().flatMap(_ => Future.failed(throwable)) // Release everything on failure - } - - private val released: AtomicBoolean = new AtomicBoolean(false) // Short-circuits to a promise - private val releasePromise: Promise[Unit] = Promise() // Will be the release return handle - - def release(): Future[Unit] = - if (released.compareAndSet(false, true)) - // If `release` is called twice, we wait for `releasePromise` to complete instead - // `released` is set atomically to ensure we don't end up with two concurrent releases - future - .transformWith { - case Success(value) => - releaseResource(value).flatMap(_ => releaseSubResources()) // Release all - case Failure(_) => - releaseSubResources() // Only sub-release as the future will take care of itself - } - .transform( // Finally, complete `releasePromise` to allow other releases to complete - value => { - releasePromise.success(()) - value - }, - exception => { - releasePromise.success(()) - exception - }, - ) - else // A release is already in progress or completed; we wait for that instead - releasePromise.future - } - /** Builds a [[Resource]] from a [[Future]] and some release logic. */ def apply[T](future: Future[T])(releaseResource: T => Future[Unit])(implicit context: Context ): R[T] = - nest(future)(releaseResource, () => Future.unit) + new NestedResource(future)(releaseResource, () => Future.unit) /** Wraps a simple [[Future]] in a [[Resource]] that doesn't need to be released. */ From 5b690f662246a8bd5c623532186f41ef498736d1 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 27 Jan 2021 11:29:18 +0100 Subject: [PATCH 2/3] resources: Avoid needing a context for "pure" resources. CHANGELOG_BEGIN CHANGELOG_END --- .../digitalasset/resources/PureResource.scala | 14 ++++++++++++++ .../resources/ResourceFactories.scala | 10 +++++----- .../resources/ResourceOwnerSpec.scala | 16 +++++++++++++++- 3 files changed, 34 insertions(+), 6 deletions(-) create mode 100644 libs-scala/resources/src/main/scala/com/digitalasset/resources/PureResource.scala diff --git a/libs-scala/resources/src/main/scala/com/digitalasset/resources/PureResource.scala b/libs-scala/resources/src/main/scala/com/digitalasset/resources/PureResource.scala new file mode 100644 index 000000000000..465a7ecdf10d --- /dev/null +++ b/libs-scala/resources/src/main/scala/com/digitalasset/resources/PureResource.scala @@ -0,0 +1,14 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.resources + +import scala.concurrent.Future + +/** Represents a pure success or failure as a resource. */ +private[resources] final class PureResource[Context: HasExecutionContext, T](future: Future[T]) + extends Resource[Context, T] { + override def asFuture: Future[T] = future + + override def release(): Future[Unit] = Future.unit +} diff --git a/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala index dedd793a7a66..e19e4382de77 100644 --- a/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala +++ b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala @@ -22,22 +22,22 @@ final class ResourceFactories[Context: HasExecutionContext] { /** Wraps a simple [[Future]] in a [[Resource]] that doesn't need to be released. */ - def fromFuture[T](future: Future[T])(implicit context: Context): R[T] = - apply(future)(_ => Future.unit) + def fromFuture[T](future: Future[T]): R[T] = + new PureResource(future) /** Produces a [[Resource]] that has already succeeded with the [[Unit]] value. */ - def unit(implicit context: Context): R[Unit] = + def unit: R[Unit] = fromFuture(Future.unit) /** Produces a [[Resource]] that has already succeeded with a given value. */ - def successful[T](value: T)(implicit context: Context): R[T] = + def successful[T](value: T): R[T] = fromFuture(Future.successful(value)) /** Produces a [[Resource]] that has already failed with a given exception. */ - def failed[T](exception: Throwable)(implicit context: Context): R[T] = + def failed[T](exception: Throwable): R[T] = fromFuture(Future.failed(exception)) /** Sequences a [[Traversable]] of [[Resource]]s into a [[Resource]] of the [[Traversable]] of their values. diff --git a/libs-scala/resources/src/test/suite/scala/com/digitalasset/resources/ResourceOwnerSpec.scala b/libs-scala/resources/src/test/suite/scala/com/digitalasset/resources/ResourceOwnerSpec.scala index 201880bfd16c..0f8ddb55065f 100644 --- a/libs-scala/resources/src/test/suite/scala/com/digitalasset/resources/ResourceOwnerSpec.scala +++ b/libs-scala/resources/src/test/suite/scala/com/digitalasset/resources/ResourceOwnerSpec.scala @@ -81,7 +81,21 @@ final class ResourceOwnerSpec extends AsyncWordSpec with Matchers { } } - "treat releases idempotently, only releasing once regardless of the number of calls" in { + "treat single releases idempotently, only releasing once regardless of the number of calls" in { + val owner = TestResourceOwner(7) + val resource = owner.acquire() + + for { + _ <- resource.asFuture + _ <- resource.release() + // if `TestResourceOwner`'s release function is called twice, it'll fail + _ <- resource.release() + } yield { + owner.hasBeenAcquired should be(false) + } + } + + "treat nested releases idempotently, only releasing once regardless of the number of calls" in { val ownerA = TestResourceOwner(7) val ownerB = TestResourceOwner("eight") From b76ba9a5076ed808fdb3736452bf79b56c5ad0ba Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 27 Jan 2021 11:42:48 +0100 Subject: [PATCH 3/3] resources: Turns out the higher kinds import fails in Scala 2.13. --- .../scala/com/digitalasset/resources/ResourceFactories.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala index e19e4382de77..286329dc7442 100644 --- a/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala +++ b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceFactories.scala @@ -7,7 +7,6 @@ import com.daml.resources.HasExecutionContext.executionContext import scala.collection.compat._ import scala.concurrent.Future -import scala.language.higherKinds final class ResourceFactories[Context: HasExecutionContext] {