Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resources: Less context. #8643

Merged
merged 3 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
resources: Extract out Resource.nest into its own class.
  • Loading branch information
SamirTalwar committed Jan 27, 2021
commit 44991a8646a4f22ead071b698789d964f977d413
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.resources

import java.util.concurrent.atomic.AtomicBoolean

import com.daml.resources.HasExecutionContext.executionContext

import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}

/** Nests release operation for a [[Resource]]'s future. */
private[resources] final class NestedResource[Context: HasExecutionContext, T](future: Future[T])(
releaseResource: T => Future[Unit],
releaseSubResources: () => Future[Unit],
)(implicit context: Context)
extends Resource[Context, T] {
lazy val asFuture: Future[T] = future.transformWith {
case Success(value) => Future.successful(value)
case Failure(throwable) =>
release().flatMap(_ => Future.failed(throwable)) // Release everything on failure
}

private val released: AtomicBoolean = new AtomicBoolean(false) // Short-circuits to a promise
private val releasePromise: Promise[Unit] = Promise() // Will be the release return handle

def release(): Future[Unit] =
if (released.compareAndSet(false, true))
// If `release` is called twice, we wait for `releasePromise` to complete instead
// `released` is set atomically to ensure we don't end up with two concurrent releases
future
.transformWith {
case Success(value) =>
releaseResource(value).flatMap(_ => releaseSubResources()) // Release all
case Failure(_) =>
releaseSubResources() // Only sub-release as the future will take care of itself
}
.transform( // Finally, complete `releasePromise` to allow other releases to complete
value => {
releasePromise.success(())
value
},
exception => {
releasePromise.success(())
exception
},
)
else // A release is already in progress or completed; we wait for that instead
releasePromise.future
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ abstract class Resource[Context: HasExecutionContext, +A] {

private type R[+T] = Resource[Context, T]

private val Resource = new ResourceFactories[Context]

/** Every [[Resource]] has an underlying [[Future]] representation.
*/
def asFuture: Future[A]
Expand All @@ -32,7 +30,7 @@ abstract class Resource[Context: HasExecutionContext, +A] {
*/
def map[B](f: A => B)(implicit context: Context): R[B] =
// A mapped Resource is a mapped future plus a nesting of an empty release operation and the actual one
Resource.nest(asFuture.map(f))(_ => Future.unit, release _)
new NestedResource(asFuture.map(f))(_ => Future.unit, release _)

/** Just like [[Future]]s, [[Resource]]s can be chained. Both component [[Resource]]s will be released correctly
* upon failure and explicit release.
Expand All @@ -50,7 +48,7 @@ abstract class Resource[Context: HasExecutionContext, +A] {
case Failure(_) => Future.unit // Already released by future failure
}
val future = nextFuture.flatMap(_.asFuture)
Resource.nest(future)(
new NestedResource(future)(
nextRelease,
release _,
) // Nest next resource release and this resource release
Expand All @@ -65,7 +63,7 @@ abstract class Resource[Context: HasExecutionContext, +A] {
else
Future.failed(new ResourceAcquisitionFilterException())
)
Resource.nest(future)(_ => Future.unit, release _)
new NestedResource(future)(_ => Future.unit, release _)
}

/** A nested resource can be flattened.
Expand All @@ -76,10 +74,8 @@ abstract class Resource[Context: HasExecutionContext, +A] {
/** Just like [[Future]]s, an attempted [[Resource]] computation can be transformed.
*/
def transformWith[B](f: Try[A] => R[B])(implicit context: Context): R[B] =
Resource
.nest(asFuture.transformWith(f.andThen(Future.successful)))(
nested => nested.release(),
release _,
)
.flatten
new NestedResource(asFuture.transformWith(f.andThen(Future.successful)))(
nested => nested.release(),
release _,
).flatten
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,22 @@

package com.daml.resources

import java.util.concurrent.atomic.AtomicBoolean

import com.daml.resources.HasExecutionContext.executionContext

import scala.collection.compat._
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
import scala.concurrent.Future
import scala.language.higherKinds

final class ResourceFactories[Context: HasExecutionContext] {

private type R[+T] = Resource[Context, T]

/** Nests release operation for a [[Resource]]'s future.
*/
private[resources] def nest[T](future: Future[T])(
releaseResource: T => Future[Unit],
releaseSubResources: () => Future[Unit],
)(implicit context: Context): R[T] = new R[T] {
final lazy val asFuture: Future[T] = future.transformWith {
case Success(value) => Future.successful(value)
case Failure(throwable) =>
release().flatMap(_ => Future.failed(throwable)) // Release everything on failure
}

private val released: AtomicBoolean = new AtomicBoolean(false) // Short-circuits to a promise
private val releasePromise: Promise[Unit] = Promise() // Will be the release return handle

def release(): Future[Unit] =
if (released.compareAndSet(false, true))
// If `release` is called twice, we wait for `releasePromise` to complete instead
// `released` is set atomically to ensure we don't end up with two concurrent releases
future
.transformWith {
case Success(value) =>
releaseResource(value).flatMap(_ => releaseSubResources()) // Release all
case Failure(_) =>
releaseSubResources() // Only sub-release as the future will take care of itself
}
.transform( // Finally, complete `releasePromise` to allow other releases to complete
value => {
releasePromise.success(())
value
},
exception => {
releasePromise.success(())
exception
},
)
else // A release is already in progress or completed; we wait for that instead
releasePromise.future
}

/** Builds a [[Resource]] from a [[Future]] and some release logic.
*/
def apply[T](future: Future[T])(releaseResource: T => Future[Unit])(implicit
context: Context
): R[T] =
nest(future)(releaseResource, () => Future.unit)
new NestedResource(future)(releaseResource, () => Future.unit)

/** Wraps a simple [[Future]] in a [[Resource]] that doesn't need to be released.
*/
Expand Down