Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resources: Release sequenced resources in parallel. #7991

Merged
merged 3 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs-scala/resources/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ da_scala_test_suite(
deps = [
":resources",
":resources-test-lib",
"//libs-scala/timer-utils",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import com.daml.resources.HasExecutionContext.executionContext

import scala.collection.generic.CanBuildFrom
import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}

Expand Down Expand Up @@ -90,43 +91,61 @@ final class ResourceFactories[Context: HasExecutionContext] {
fromFuture(Future.failed(exception))

/**
* Sequences a [[TraversableOnce]] of [[Resource]]s into a [[Resource]] of the [[TraversableOnce]] of their values.
* Sequences a [[Traversable]] of [[Resource]]s into a [[Resource]] of the [[Traversable]] of their values.
*
* @param seq The [[TraversableOnce]] of [[Resource]]s.
* @param bf The projection from a [[TraversableOnce]] of resources into one of their values.
* @param seq The [[Traversable]] of [[Resource]]s.
* @param bf The projection from a [[Traversable]] of resources into one of their values.
* @param context The asynchronous task execution engine.
* @tparam T The value type.
* @tparam C The [[TraversableOnce]] actual type.
* @tparam C The [[Traversable]] actual type.
* @tparam U The return type.
* @return A [[Resource]] with a sequence of the values of the sequenced [[Resource]]s as its underlying value.
*/
def sequence[T, C[X] <: TraversableOnce[X]](seq: C[R[T]])(
implicit bf: CanBuildFrom[C[R[T]], T, C[T]],
def sequence[T, C[X] <: Traversable[X], U](seq: C[R[T]])(
implicit bf: CanBuildFrom[C[R[T]], T, U],
context: Context,
): R[C[T]] =
seq
): R[U] = new R[U] {
private val resource = seq
.foldLeft(successful(bf()))((builderResource, elementResource) =>
for {
builder <- builderResource // Consider the builder in the accumulator resource
element <- elementResource // Consider the value in the actual resource element
} yield builder += element) // Append the element to the builder
.map(_.result()) // Yield a resource of collection resulting from the builder

override def asFuture: Future[U] =
resource.asFuture

override def release(): Future[Unit] =
Future.sequence(seq.map(_.release())).map(_ => ())
}

/**
* Sequences a [[TraversableOnce]] of [[Resource]]s into a [[Resource]] with no underlying value.
* Sequences a [[Traversable]] of [[Resource]]s into a [[Resource]] with no underlying value.
*
* @param seq The [[TraversableOnce]] of [[Resource]]s.
* @param seq The [[Traversable]] of [[Resource]]s.
* @param context The asynchronous task execution engine.
* @tparam T The value type.
* @tparam C The [[TraversableOnce]] actual type.
* @tparam C The [[Traversable]] actual type.
* @return A [[Resource]] sequencing the [[Resource]]s and no underlying value.
*/
def sequenceIgnoringValues[T, C[X] <: TraversableOnce[X]](seq: C[Resource[Context, T]])(
def sequenceIgnoringValues[T, C[X] <: Traversable[X]](seq: C[R[T]])(
implicit context: Context
): Resource[Context, Unit] =
seq
.foldLeft(unit)((builderResource, elementResource) =>
for {
_ <- builderResource
_ <- elementResource
} yield ())
): R[Unit] =
sequence(seq)(new UnitCanBuildFrom, context)

final class UnitCanBuildFrom[T, C[X] <: Traversable[X]] extends CanBuildFrom[C[R[T]], T, Unit] {
override def apply(from: C[R[T]]): mutable.Builder[T, Unit] = apply()

override def apply(): mutable.Builder[T, Unit] = UnitBuilder
}

object UnitBuilder extends mutable.Builder[Any, Unit] {
override def +=(elem: Any): this.type = this

override def clear(): Unit = ()

override def result(): Unit = ()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.util.{Timer, TimerTask}

import com.daml.resources.FailingResourceOwner.FailingResourceFailedToOpen
import com.daml.resources.{Resource => AbstractResource}
import com.daml.timer.Delayed
import com.github.ghik.silencer.silent
import org.scalatest.{AsyncWordSpec, Matchers}

Expand All @@ -25,7 +26,7 @@ final class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
TestContext.`TestContext has ExecutionContext`
}

private implicit val context: TestContext = new TestContext(executionContext)
private implicit val context: TestContext = new TestContext(ExecutionContext.global)

"a resource owner" should {
"acquire and release a resource" in {
Expand Down Expand Up @@ -588,14 +589,14 @@ final class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
"many resources in a sequence" should {
"be able to be sequenced" in {
val acquireOrder = mutable.Buffer[Int]()
val releaseOrder = mutable.Buffer[Int]()
val released = mutable.Set[Int]()
val owners = (1 to 10).map(value =>
new AbstractResourceOwner[TestContext, Int] {
override def acquire()(implicit context: TestContext): Resource[Int] = {
acquireOrder += value
Resource(Future(value))(v =>
Future {
releaseOrder += v
released += v
})
}
})
Expand All @@ -616,7 +617,77 @@ final class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
_ <- resource.release()
} yield {
withClue("after releasing,") {
releaseOrder should be(10.to(1, step = -1))
released.toSet should be((1 to 10).toSet)
}
}
}

"sequence, ignoring values if asked" in {
val acquired = mutable.Set[Int]()
val released = mutable.Set[Int]()
val owners = (1 to 10).map(value =>
new AbstractResourceOwner[TestContext, Int] {
override def acquire()(implicit context: TestContext): Resource[Int] =
Resource(Future {
acquired += value
value
})(v =>
Future {
released += v
})
})
val resources = owners.map(_.acquire())

val resource = for {
values <- Resource.sequenceIgnoringValues(resources)
} yield {
withClue("after sequencing,") {
acquired.toSet should be((1 to 10).toSet)
values should be(())
}
()
}

for {
_ <- resource.asFuture
_ <- resource.release()
} yield {
withClue("after releasing,") {
released.toSet should be((1 to 10).toSet)
}
}
}

"release in parallel" in {
val releaseOrder = mutable.Buffer[Int]()
val owners = (1 to 4).map(value =>
new AbstractResourceOwner[TestContext, Int] {
override def acquire()(implicit context: TestContext): Resource[Int] = {
Resource(Future(value)) { v =>
Delayed.by((v * 200).milliseconds) {
releaseOrder += v
()
}
}
}
})
val resources = owners.map(_.acquire())

val resource = for {
values <- Resource.sequence(resources)
} yield {
withClue("after sequencing,") {
values should be(1 to 4)
}
()
}

for {
_ <- resource.asFuture
_ <- resource.release()
} yield {
withClue("after releasing,") {
releaseOrder should be(1 to 4)
}
}
}
Expand Down