Skip to content

Commit

Permalink
resources: Release sequenced resources in parallel. (#7991)
Browse files Browse the repository at this point in the history
* resources: Release sequenced resources in parallel.

This isn't used much, but has been bothering me for a while. While we
acquire the resources in parallel, we used to release them sequentially.
This reimplements `sequence` so they're released all at once.

CHANGELOG_BEGIN
CHANGELOG_END

* resources: Drop an unnecessary `.map`.

* resources: Fix the Scaladoc for `sequence`.
  • Loading branch information
SamirTalwar authored Nov 18, 2020
1 parent 5110115 commit 87eee30
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 23 deletions.
1 change: 1 addition & 0 deletions libs-scala/resources/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ da_scala_test_suite(
deps = [
":resources",
":resources-test-lib",
"//libs-scala/timer-utils",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import com.daml.resources.HasExecutionContext.executionContext

import scala.collection.generic.CanBuildFrom
import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}

Expand Down Expand Up @@ -90,43 +91,61 @@ final class ResourceFactories[Context: HasExecutionContext] {
fromFuture(Future.failed(exception))

/**
* Sequences a [[TraversableOnce]] of [[Resource]]s into a [[Resource]] of the [[TraversableOnce]] of their values.
* Sequences a [[Traversable]] of [[Resource]]s into a [[Resource]] of the [[Traversable]] of their values.
*
* @param seq The [[TraversableOnce]] of [[Resource]]s.
* @param bf The projection from a [[TraversableOnce]] of resources into one of their values.
* @param seq The [[Traversable]] of [[Resource]]s.
* @param bf The projection from a [[Traversable]] of resources into one of their values.
* @param context The asynchronous task execution engine.
* @tparam T The value type.
* @tparam C The [[TraversableOnce]] actual type.
* @tparam C The [[Traversable]] actual type.
* @tparam U The return type.
* @return A [[Resource]] with a sequence of the values of the sequenced [[Resource]]s as its underlying value.
*/
def sequence[T, C[X] <: TraversableOnce[X]](seq: C[R[T]])(
implicit bf: CanBuildFrom[C[R[T]], T, C[T]],
def sequence[T, C[X] <: Traversable[X], U](seq: C[R[T]])(
implicit bf: CanBuildFrom[C[R[T]], T, U],
context: Context,
): R[C[T]] =
seq
): R[U] = new R[U] {
private val resource = seq
.foldLeft(successful(bf()))((builderResource, elementResource) =>
for {
builder <- builderResource // Consider the builder in the accumulator resource
element <- elementResource // Consider the value in the actual resource element
} yield builder += element) // Append the element to the builder
.map(_.result()) // Yield a resource of collection resulting from the builder

override def asFuture: Future[U] =
resource.asFuture

override def release(): Future[Unit] =
Future.sequence(seq.map(_.release())).map(_ => ())
}

/**
* Sequences a [[TraversableOnce]] of [[Resource]]s into a [[Resource]] with no underlying value.
* Sequences a [[Traversable]] of [[Resource]]s into a [[Resource]] with no underlying value.
*
* @param seq The [[TraversableOnce]] of [[Resource]]s.
* @param seq The [[Traversable]] of [[Resource]]s.
* @param context The asynchronous task execution engine.
* @tparam T The value type.
* @tparam C The [[TraversableOnce]] actual type.
* @tparam C The [[Traversable]] actual type.
* @return A [[Resource]] sequencing the [[Resource]]s and no underlying value.
*/
def sequenceIgnoringValues[T, C[X] <: TraversableOnce[X]](seq: C[Resource[Context, T]])(
def sequenceIgnoringValues[T, C[X] <: Traversable[X]](seq: C[R[T]])(
implicit context: Context
): Resource[Context, Unit] =
seq
.foldLeft(unit)((builderResource, elementResource) =>
for {
_ <- builderResource
_ <- elementResource
} yield ())
): R[Unit] =
sequence(seq)(new UnitCanBuildFrom, context)

final class UnitCanBuildFrom[T, C[X] <: Traversable[X]] extends CanBuildFrom[C[R[T]], T, Unit] {
override def apply(from: C[R[T]]): mutable.Builder[T, Unit] = apply()

override def apply(): mutable.Builder[T, Unit] = UnitBuilder
}

object UnitBuilder extends mutable.Builder[Any, Unit] {
override def +=(elem: Any): this.type = this

override def clear(): Unit = ()

override def result(): Unit = ()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.util.{Timer, TimerTask}

import com.daml.resources.FailingResourceOwner.FailingResourceFailedToOpen
import com.daml.resources.{Resource => AbstractResource}
import com.daml.timer.Delayed
import com.github.ghik.silencer.silent
import org.scalatest.{AsyncWordSpec, Matchers}

Expand All @@ -25,7 +26,7 @@ final class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
TestContext.`TestContext has ExecutionContext`
}

private implicit val context: TestContext = new TestContext(executionContext)
private implicit val context: TestContext = new TestContext(ExecutionContext.global)

"a resource owner" should {
"acquire and release a resource" in {
Expand Down Expand Up @@ -588,14 +589,14 @@ final class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
"many resources in a sequence" should {
"be able to be sequenced" in {
val acquireOrder = mutable.Buffer[Int]()
val releaseOrder = mutable.Buffer[Int]()
val released = mutable.Set[Int]()
val owners = (1 to 10).map(value =>
new AbstractResourceOwner[TestContext, Int] {
override def acquire()(implicit context: TestContext): Resource[Int] = {
acquireOrder += value
Resource(Future(value))(v =>
Future {
releaseOrder += v
released += v
})
}
})
Expand All @@ -616,7 +617,77 @@ final class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
_ <- resource.release()
} yield {
withClue("after releasing,") {
releaseOrder should be(10.to(1, step = -1))
released.toSet should be((1 to 10).toSet)
}
}
}

"sequence, ignoring values if asked" in {
val acquired = mutable.Set[Int]()
val released = mutable.Set[Int]()
val owners = (1 to 10).map(value =>
new AbstractResourceOwner[TestContext, Int] {
override def acquire()(implicit context: TestContext): Resource[Int] =
Resource(Future {
acquired += value
value
})(v =>
Future {
released += v
})
})
val resources = owners.map(_.acquire())

val resource = for {
values <- Resource.sequenceIgnoringValues(resources)
} yield {
withClue("after sequencing,") {
acquired.toSet should be((1 to 10).toSet)
values should be(())
}
()
}

for {
_ <- resource.asFuture
_ <- resource.release()
} yield {
withClue("after releasing,") {
released.toSet should be((1 to 10).toSet)
}
}
}

"release in parallel" in {
val releaseOrder = mutable.Buffer[Int]()
val owners = (1 to 4).map(value =>
new AbstractResourceOwner[TestContext, Int] {
override def acquire()(implicit context: TestContext): Resource[Int] = {
Resource(Future(value)) { v =>
Delayed.by((v * 200).milliseconds) {
releaseOrder += v
()
}
}
}
})
val resources = owners.map(_.acquire())

val resource = for {
values <- Resource.sequence(resources)
} yield {
withClue("after sequencing,") {
values should be(1 to 4)
}
()
}

for {
_ <- resource.asFuture
_ <- resource.release()
} yield {
withClue("after releasing,") {
releaseOrder should be(1 to 4)
}
}
}
Expand Down

0 comments on commit 87eee30

Please sign in to comment.