Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libs-scala/resources: Move Resource and friends to their own package. #4066

Merged
merged 4 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
libs-scala/resources-akka: Make the Akka resources an optional dep.
  • Loading branch information
SamirTalwar committed Jan 16, 2020
commit f617022ac907ba791354642c5c66064410953bb8
1 change: 1 addition & 0 deletions ledger/api-server-damlonx/reference-v2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ da_scala_binary(
"//ledger/participant-state/kvutils",
"//ledger/sandbox",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"@maven//:com_github_scopt_scopt_2_12",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import com.digitalasset.ledger.api.auth.{AuthService, AuthServiceWildcard}
import com.digitalasset.platform.apiserver.{ApiServerConfig, StandaloneApiServer}
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.indexer.{IndexerConfig, StandaloneIndexerServer}
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}
import org.slf4j.LoggerFactory

Expand All @@ -42,8 +43,8 @@ object ReferenceServer extends App {
val resource = for {
// Take ownership of the actor system and materializer so they're cleaned up properly.
// This is necessary because we can't declare them as implicits within a `for` comprehension.
_ <- ResourceOwner.forActorSystem(() => system).acquire()
_ <- ResourceOwner.forMaterializer(() => materializer).acquire()
_ <- AkkaResourceOwner.forActorSystem(() => system).acquire()
_ <- AkkaResourceOwner.forMaterializer(() => materializer).acquire()
ledger <- ResourceOwner
.forCloseable(() => new InMemoryKVParticipantState(config.participantId))
.acquire()
Expand Down
1 change: 1 addition & 0 deletions ledger/participant-state/kvutils/app/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ da_scala_library(
"//ledger/participant-state/kvutils",
"//ledger/sandbox",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"@maven//:com_github_scopt_scopt_2_12",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.digitalasset.platform.indexer.{
IndexerStartupMode,
StandaloneIndexerServer
}
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}
import org.slf4j.LoggerFactory
import scopt.OptionParser
Expand Down Expand Up @@ -48,8 +49,8 @@ class Runner[Extra](
val resource = for {
// Take ownership of the actor system and materializer so they're cleaned up properly.
// This is necessary because we can't declare them as implicits within a `for` comprehension.
_ <- ResourceOwner.forActorSystem(() => system).acquire()
_ <- ResourceOwner.forMaterializer(() => materializer).acquire()
_ <- AkkaResourceOwner.forActorSystem(() => system).acquire()
_ <- AkkaResourceOwner.forMaterializer(() => materializer).acquire()
readerWriter <- ResourceOwner
.forCloseable(() => construct(config.participantId, config.extra))
.acquire()
Expand Down
2 changes: 2 additions & 0 deletions ledger/sandbox/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ da_scala_library(
"//ledger/participant-state-index",
"//libs-scala/direct-execution-context",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:ch_qos_logback_logback_core",
"@maven//:com_github_scopt_scopt_2_12",
Expand Down Expand Up @@ -133,6 +134,7 @@ da_scala_library(
"//ledger/participant-state-index",
"//libs-scala/direct-execution-context",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:ch_qos_logback_logback_core",
"@maven//:com_github_scopt_scopt_2_12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.sandbox.BuildInfo
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.stores.InMemoryPackageStore
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -89,10 +90,8 @@ final class StandaloneApiServer(
preloadPackages(packageStore)

for {
actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem(actorSystemName)).acquire()
materializer <- ResourceOwner
.forMaterializer(() => Materializer(actorSystem))
.acquire()
actorSystem <- AkkaResourceOwner.forActorSystem(() => ActorSystem(actorSystemName)).acquire()
materializer <- AkkaResourceOwner.forMaterializer(() => Materializer(actorSystem)).acquire()
initialConditions <- ResourceOwner
.forFuture(() => readService.getLedgerInitialConditions().runWith(Sink.head)(materializer))
.acquire()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.v1.ReadService
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.indexer.StandaloneIndexerServer._
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand All @@ -24,7 +25,7 @@ class StandaloneIndexerServer(
override def acquire()(implicit executionContext: ExecutionContext): Resource[Unit] =
for {
// ActorSystem name not allowed to contain daml-lf LedgerString characters ".:#/ "
actorSystem <- ResourceOwner
actorSystem <- AkkaResourceOwner
.forActorSystem(() =>
ActorSystem("StandaloneIndexerServer-" + config.participantId.filterNot(".:#/ ".toSet)))
.acquire()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import com.digitalasset.platform.sandbox.stores.{
SandboxIndexAndWriteService
}
import com.digitalasset.platform.services.time.TimeProviderType
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -285,10 +286,8 @@ final class SandboxServer(config: => SandboxConfig) extends AutoCloseable {
private def start()(implicit executionContext: ExecutionContext): Resource[SandboxState] = {
val packageStore = loadDamlPackages()
for {
actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem(ActorSystemName)).acquire()
materializer <- ResourceOwner
.forMaterializer(() => Materializer(actorSystem))
.acquire()
actorSystem <- AkkaResourceOwner.forActorSystem(() => ActorSystem(ActorSystemName)).acquire()
materializer <- AkkaResourceOwner.forMaterializer(() => Materializer(actorSystem)).acquire()
} yield {
val apiServerResource = buildAndStartApiServer(
materializer,
Expand Down
30 changes: 30 additions & 0 deletions libs-scala/resources-akka/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) 2020 The DAML Authors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

load("//bazel_tools:scala.bzl", "da_scala_library", "da_scala_test_suite")

da_scala_library(
name = "resources-akka",
srcs = glob(["src/main/scala/**/*.scala"]),
tags = ["maven_coordinates=com.digitalasset:resources-akka:__VERSION__"],
visibility = [
"//visibility:public",
],
deps = [
"//libs-scala/resources",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
],
)

da_scala_test_suite(
name = "resources-akka-tests",
srcs = glob(["src/test/suite/**/*.scala"]),
deps = [
":resources-akka",
"//libs-scala/resources",
"//libs-scala/resources:resources-test-lib",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
],
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.resources
package com.digitalasset.resources.akka

import akka.stream.Materializer
import com.digitalasset.resources.{Resource, ResourceOwner}

import scala.concurrent.{ExecutionContext, Future}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.resources
package com.digitalasset.resources.akka

import akka.actor.ActorSystem
import com.digitalasset.resources.{Resource, ResourceOwner}

import scala.concurrent.{ExecutionContext, Future}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.resources.akka

import akka.actor.ActorSystem
import akka.stream.Materializer
import com.digitalasset.resources.ResourceOwner

object AkkaResourceOwner {
def forActorSystem(acquire: () => ActorSystem): ResourceOwner[ActorSystem] =
new ActorSystemResourceOwner(acquire)

def forMaterializer(acquire: () => Materializer): ResourceOwner[Materializer] =
new ActorMaterializerResourceOwner(acquire)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.resources.akka

import akka.actor.{Actor, ActorSystem, Props}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.{Done, NotUsed}
import com.digitalasset.resources.ResourceOwner
import org.scalatest.{AsyncWordSpec, Matchers}

import scala.concurrent.{Future, Promise}

class AkkaResourceOwnerSpec extends AsyncWordSpec with Matchers {
"a function returning an ActorSystem" should {
"convert to a ResourceOwner" in {
val testPromise = Promise[Int]()
class TestActor extends Actor {
@SuppressWarnings(Array("org.wartremover.warts.Any"))
override def receive: Receive = {
case value: Int => testPromise.success(value)
case value => testPromise.failure(new IllegalArgumentException(s"$value"))
}
}

val resource = for {
actorSystem <- AkkaResourceOwner
.forActorSystem(() => ActorSystem("TestActorSystem"))
.acquire()
actor <- ResourceOwner
.successful(actorSystem.actorOf(Props(new TestActor)))
.acquire()
} yield (actorSystem, actor)

for {
resourceFuture <- resource.asFuture
(actorSystem, actor) = resourceFuture
_ = actor ! 7
result <- testPromise.future
_ <- resource.release()
} yield {
result should be(7)
an[IllegalStateException] should be thrownBy actorSystem.actorOf(Props(new TestActor))
}
}
}

"a function returning a Materializer" should {
"convert to a ResourceOwner" in {
val resource = for {
actorSystem <- AkkaResourceOwner
.forActorSystem(() => ActorSystem("TestActorSystem"))
.acquire()
materializer <- AkkaResourceOwner.forMaterializer(() => Materializer(actorSystem)).acquire()
} yield materializer

for {
materializer <- resource.asFuture
numbers <- Source(1 to 10)
.toMat(Sink.seq)(Keep.right[NotUsed, Future[Seq[Int]]])
.run()(materializer)
_ <- resource.release()
} yield {
numbers should be(1 to 10)
an[IllegalStateException] should be thrownBy Source
.single(0)
.toMat(Sink.ignore)(Keep.right[NotUsed, Future[Done]])
.run()(materializer)
}
}
}
}
4 changes: 0 additions & 4 deletions libs-scala/resources/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ da_scala_library(
"//visibility:public",
],
deps = [
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:org_scala_lang_modules_scala_java8_compat_2_12",
],
)
Expand All @@ -34,7 +32,5 @@ da_scala_test_suite(
deps = [
":resources",
":resources-test-lib",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ package com.digitalasset.resources
import java.util.Timer
import java.util.concurrent.{CompletionStage, ExecutorService}

import akka.actor.ActorSystem
import akka.stream.Materializer

import scala.compat.java8.FutureConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -63,10 +60,4 @@ object ResourceOwner {

def forTimer(acquire: () => Timer): ResourceOwner[Timer] =
new TimerResourceOwner(acquire)

def forActorSystem(acquire: () => ActorSystem): ResourceOwner[ActorSystem] =
new ActorSystemResourceOwner(acquire)

def forMaterializer(acquire: () => Materializer): ResourceOwner[Materializer] =
new ActorMaterializerResourceOwner(acquire)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.{Executors, RejectedExecutionException}
import java.util.{Timer, TimerTask}

import akka.actor.{Actor, ActorSystem, Props}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.{Done, NotUsed}
import org.scalatest.{AsyncWordSpec, Matchers}

import scala.collection.mutable
Expand Down Expand Up @@ -498,60 +494,6 @@ class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
}
}

"a function returning an ActorSystem" should {
"convert to a ResourceOwner" in {
val testPromise = Promise[Int]()
class TestActor extends Actor {
@SuppressWarnings(Array("org.wartremover.warts.Any"))
override def receive: Receive = {
case value: Int => testPromise.success(value)
case value => testPromise.failure(new IllegalArgumentException(s"$value"))
}
}

val resource = for {
actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem("TestActorSystem")).acquire()
actor <- ResourceOwner
.successful(actorSystem.actorOf(Props(new TestActor)))
.acquire()
} yield (actorSystem, actor)

for {
resourceFuture <- resource.asFuture
(actorSystem, actor) = resourceFuture
_ = actor ! 7
result <- testPromise.future
_ <- resource.release()
} yield {
result should be(7)
an[IllegalStateException] should be thrownBy actorSystem.actorOf(Props(new TestActor))
}
}
}

"a function returning a Materializer" should {
"convert to a ResourceOwner" in {
val resource = for {
actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem("TestActorSystem")).acquire()
materializer <- ResourceOwner.forMaterializer(() => Materializer(actorSystem)).acquire()
} yield materializer

for {
materializer <- resource.asFuture
numbers <- Source(1 to 10)
.toMat(Sink.seq)(Keep.right[NotUsed, Future[Seq[Int]]])
.run()(materializer)
_ <- resource.release()
} yield {
numbers should be(1 to 10)
an[IllegalStateException] should be thrownBy Source
.single(0)
.toMat(Sink.ignore)(Keep.right[NotUsed, Future[Done]])
.run()(materializer)
}
}
}

"many resources in a sequence" should {
"be able to be sequenced" in {
val acquireOrder = mutable.Buffer[Int]()
Expand Down
3 changes: 3 additions & 0 deletions release/artifacts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@
- target: //libs-scala/resources:resources
type: jar-scala
mavenUpload: true
- target: //libs-scala/resources-akka:resources-akka
type: jar-scala
mavenUpload: true
- target: //libs-scala/timer-utils:timer-utils
type: jar-scala
mavenUpload: true