diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala index 91bb6cb0a534..85d2fa6d78f4 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala @@ -67,7 +67,7 @@ final case class RunningTrigger( runner: ActorRef[TriggerRunner.Message] ) -class Server(dar: Option[Dar[(PackageId, Package)]], jdbcConfig: Option[JdbcConfig]) { +class Server(dar: Option[Dar[(PackageId, Package)]], triggerDao: Option[TriggerDao]) { private var triggers: Map[UUID, RunningTrigger] = Map.empty; private var triggersByToken: Map[Jwt, Set[UUID]] = Map.empty; @@ -76,12 +76,6 @@ class Server(dar: Option[Dar[(PackageId, Package)]], jdbcConfig: Option[JdbcConf val compiledPackages: MutableCompiledPackages = ConcurrentCompiledPackages() dar.foreach(addDar) - // Note(RJR): It feels wrong to use a different execution context from that of the actor - // system but I haven't seen any misbehaviour due to this yet. We can revisit this if there's - // an issue. (Threading through the actor system execution context was a pain which is why - // I went with the global option instead.) - val triggerDao: Option[TriggerDao] = jdbcConfig.map(TriggerDao(_)(ExecutionContext.global)) - private def addDar(dar: Dar[(PackageId, Package)]): Unit = { val darMap = dar.all.toMap darMap.foreach { @@ -128,7 +122,7 @@ class Server(dar: Option[Dar[(PackageId, Package)]], jdbcConfig: Option[JdbcConf } private def listRunningTriggers(jwt: Jwt): Either[String, Vector[UUID]] = { - triggerDao match { + val triggerInstances = triggerDao match { case None => Right(triggersByToken.getOrElse(jwt, Set()).toVector) case Some(dao) => @@ -138,6 +132,9 @@ class Server(dar: Option[Dar[(PackageId, Package)]], jdbcConfig: Option[JdbcConf case Success(triggerInstances) => Right(triggerInstances) } } + // Note(RJR): We sort UUIDs here using Java's comparison of UUIDs. + // We do not rely on the Postgres ordering which is different. + triggerInstances.map(_.sorted) } private def logTriggerStatus(t: RunningTrigger, msg: String): Unit = { @@ -189,7 +186,8 @@ object Server { dar: Option[Dar[(PackageId, Package)]], jdbcConfig: Option[JdbcConfig], ): Behavior[Message] = Behaviors.setup { ctx => - val server = new Server(dar, jdbcConfig) + val triggerDao = jdbcConfig.map(TriggerDao(_)(ctx.system.executionContext)) + val server = new Server(dar, triggerDao) // http doesn't know about akka typed so provide untyped system implicit val untypedSystem: akka.actor.ActorSystem = ctx.system.toClassic @@ -522,9 +520,16 @@ object ServiceMain { bindingFuture.map(server => (server, system)) } - def initDatabase(c: JdbcConfig)(implicit ec: ExecutionContext): Either[String, Unit] = { - val triggerDao = TriggerDao(c)(ec) - val transaction = triggerDao.transact(TriggerDao.initialize(triggerDao.logHandler)) + def initDatabase(dao: TriggerDao): Either[String, Unit] = { + val transaction = dao.transact(TriggerDao.initialize(dao.logHandler)) + Try(transaction.unsafeRunSync()) match { + case Failure(err) => Left(err.toString) + case Success(()) => Right(()) + } + } + + def destroyDatabase(dao: TriggerDao): Either[String, Unit] = { + val transaction = dao.transact(TriggerDao.destroy) Try(transaction.unsafeRunSync()) match { case Failure(err) => Left(err.toString) case Success(()) => Right(()) @@ -576,7 +581,8 @@ object ServiceMain { system.log.error("No JDBC configuration for database initialization.") sys.exit(1) case (true, Some(jdbcConfig)) => - initDatabase(jdbcConfig) match { + val dao = TriggerDao(jdbcConfig)(ec) + initDatabase(dao) match { case Left(err) => system.log.error(err) sys.exit(1) diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerDao.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerDao.scala index 6a61a3ef82a9..bda012ab6963 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerDao.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerDao.scala @@ -72,6 +72,15 @@ object TriggerDao { *> createPartyIndex.update.run).void } + // Drop all tables and other objects associated with the database. + // Only used between tests for now. + def destroy: ConnectionIO[Unit] = { + val dropTriggerTable: Fragment = sql"drop table running_triggers" + val dropDalfTable: Fragment = sql"drop table dalfs" + (dropTriggerTable.update.run + *> dropDalfTable.update.run).void + } + def addRunningTrigger(t: RunningTrigger): ConnectionIO[Unit] = { val partyToken = t.jwt.value val fullTriggerName = t.triggerName.toString diff --git a/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/ServiceTest.scala b/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/ServiceTest.scala index adddd3fec1bf..51f19f4b682c 100644 --- a/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/ServiceTest.scala +++ b/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/ServiceTest.scala @@ -36,10 +36,10 @@ import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, Tra import com.daml.ledger.client.LedgerClient import com.daml.jwt.JwtSigner import com.daml.jwt.domain.{DecodedJwt, Jwt} -import com.daml.testing.postgresql.PostgresAroundSuite +import com.daml.testing.postgresql.PostgresAroundAll import eu.rekawek.toxiproxy._ -class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with PostgresAroundSuite { +class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with PostgresAroundAll { override implicit def patienceConfig: PatienceConfig = PatienceConfig(timeout = scaled(Span(15, Seconds)), interval = scaled(Span(1, Seconds))) @@ -52,6 +52,9 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg } val testPkgId = dar.main._1 + // Lazy because the postgresDatabase is only available once the tests start + private lazy val jdbcConfig = JdbcConfig(postgresDatabase.url, "operator", "password") + def submitCmd(client: LedgerClient, party: String, cmd: Command) = { val req = SubmitAndWaitRequest( Some( @@ -80,26 +83,33 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg .fold(e => fail(s"cannot sign a JWT: ${e.shows}"), identity) } - protected def headersWithAuth(party: String) = authorizationHeader(jwt(party)) - - protected def authorizationHeader(token: Jwt): List[Authorization] = + protected def headersWithAuth(party: String): List[Authorization] = { + val token = jwt(party) List(Authorization(OAuth2BearerToken(token.value))) + } - def withHttpService[A]( - triggerDar: Option[Dar[(PackageId, Package)]], - jdbcConfig: Option[JdbcConfig] = None) + def withHttpService[A](triggerDar: Option[Dar[(PackageId, Package)]]) : ((Uri, LedgerClient, Proxy) => Future[A]) => Future[A] = TriggerServiceFixture - .withTriggerService[A](testId, List(darPath), triggerDar, jdbcConfig) + .withTriggerService[A](testId, List(darPath), triggerDar, None) + + def withTriggerServiceAndDb[A](dar: Option[Dar[(PackageId, Package)]]) + : ((Uri, LedgerClient, Proxy) => Future[A]) => Future[Assertion] = + testFn => + for { + _ <- TriggerServiceFixture.withTriggerService(testId, List(darPath), dar, None)(testFn) + _ <- TriggerServiceFixture.withTriggerService(testId, List(darPath), dar, Some(jdbcConfig))( + testFn) + } yield succeed - def startTrigger(uri: Uri, id: String, party: String): Future[HttpResponse] = { + def startTrigger(uri: Uri, triggerName: String, party: String): Future[HttpResponse] = { val req = HttpRequest( method = HttpMethods.POST, uri = uri.withPath(Uri.Path("/v1/start")), headers = headersWithAuth(party), entity = HttpEntity( ContentTypes.`application/json`, - s"""{"triggerName": "$id"}""" + s"""{"triggerName": "$triggerName"}""" ) ) Http().singleRequest(req) @@ -114,7 +124,8 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg Http().singleRequest(req) } - def triggerStatus(uri: Uri, id: String): Future[HttpResponse] = { + def triggerStatus(uri: Uri, triggerInstance: UUID): Future[HttpResponse] = { + val id = triggerInstance.toString val req = HttpRequest( method = HttpMethods.GET, uri = uri.withPath(Uri.Path(s"/v1/status/$id")), @@ -122,10 +133,11 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg Http().singleRequest(req) } - def stopTrigger(uri: Uri, id: String, party: String): Future[HttpResponse] = { + def stopTrigger(uri: Uri, triggerInstance: UUID, party: String): Future[HttpResponse] = { + val id = triggerInstance.toString val req = HttpRequest( method = HttpMethods.DELETE, - headers = headersWithAuth(s"${party}"), + headers = headersWithAuth(party), uri = uri.withPath(Uri.Path(s"/v1/stop/$id")), ) Http().singleRequest(req) @@ -160,19 +172,19 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg } yield result } - def parseTriggerId(resp: HttpResponse): Future[String] = { + def parseTriggerId(resp: HttpResponse): Future[UUID] = { for { JsObject(fields) <- parseResult(resp) Some(JsString(triggerId)) = fields.get("triggerId") - } yield triggerId + } yield UUID.fromString(triggerId) } - def parseTriggerIds(resp: HttpResponse): Future[Vector[String]] = { + def parseTriggerIds(resp: HttpResponse): Future[Vector[UUID]] = { for { JsObject(fields) <- parseResult(resp) Some(JsArray(ids)) = fields.get("triggerIds") triggerIds = ids map { - case JsString(id) => id + case JsString(id) => UUID.fromString(id) case _ => fail("""Non-string element of "triggerIds" field""") } } yield triggerIds @@ -181,7 +193,7 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg def assertTriggerIds( uri: Uri, party: String, - pred: (Vector[String]) => Boolean): Future[Assertion] = { + pred: Vector[UUID] => Boolean): Future[Assertion] = { eventually { val actualTriggerIds = Await.result(for { resp <- listTriggers(uri, party) @@ -204,57 +216,39 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg def assertTriggerStatus( uri: Uri, - id: String, + triggerInstance: UUID, pred: (Vector[String]) => Boolean): Future[Assertion] = { eventually { val actualTriggerStatus = Await.result(for { - resp <- triggerStatus(uri, id) + resp <- triggerStatus(uri, triggerInstance) result <- parseTriggerStatus(resp) } yield result, Duration.Inf) assert(pred(actualTriggerStatus)) } } - it should "initialize database" in { - connectToPostgresqlServer() - createNewDatabase() - val testJdbcConfig = JdbcConfig(postgresDatabase.url, "operator", "password") - assert(ServiceMain.initDatabase(testJdbcConfig).isRight) - dropDatabase() - disconnectFromPostgresqlServer() - succeed - } + it should "start up and shut down server" in + withTriggerServiceAndDb(Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => + Future(succeed) + } - it should "add running triggers to the database" in { - connectToPostgresqlServer() - createNewDatabase() - val testJdbcConfig = JdbcConfig(postgresDatabase.url, "operator", "password") - assert(ServiceMain.initDatabase(testJdbcConfig).isRight) - Await.result( - withHttpService(Some(dar), Some(testJdbcConfig)) { - (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => - for { - // Initially no triggers started for Alice - _ <- assertTriggerIds(uri, "Alice", _ == Vector()) - // Start a trigger for Alice and check it appears in list. - resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice") - trigger1 <- parseTriggerId(resp) - _ <- assertTriggerIds(uri, "Alice", _ == Vector(trigger1)) - // Do the same for a second trigger. - resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice") - trigger2 <- parseTriggerId(resp) - expected = Vector(trigger1, trigger2).sorted - _ <- assertTriggerIds(uri, "Alice", _ == expected) - } yield succeed - }, - Duration.Inf - ) - dropDatabase() - disconnectFromPostgresqlServer() - succeed - } + it should "add running triggers" in + withTriggerServiceAndDb(Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => + for { + // Initially no triggers started for Alice + _ <- assertTriggerIds(uri, "Alice", _ == Vector()) + // Start a trigger for Alice and check it appears in list. + resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice") + trigger1 <- parseTriggerId(resp) + _ <- assertTriggerIds(uri, "Alice", _ == Vector(trigger1)) + // Do the same for a second trigger. + resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice") + trigger2 <- parseTriggerId(resp) + _ <- assertTriggerIds(uri, "Alice", _ == Vector(trigger1, trigger2).sorted) + } yield succeed + } - it should "fail to start non-existent trigger" in withHttpService(Some(dar)) { + it should "fail to start non-existent trigger" in withTriggerServiceAndDb(Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => val expectedError = StatusCodes.UnprocessableEntity for { @@ -302,12 +296,12 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg // Start another trigger for Bob. resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Bob") bobTrigger2 <- parseTriggerId(resp) - _ <- assertTriggerIds(uri, "Bob", _ == Vector(bobTrigger1, bobTrigger2)) + _ <- assertTriggerIds(uri, "Bob", _ == Vector(bobTrigger1, bobTrigger2).sorted) // Stop Alice's trigger. resp <- stopTrigger(uri, aliceTrigger, "Alice") _ <- assert(resp.status.isSuccess) _ <- assertTriggerIds(uri, "Alice", _.isEmpty) - _ <- assertTriggerIds(uri, "Bob", _ == Vector(bobTrigger1, bobTrigger2)) + _ <- assertTriggerIds(uri, "Bob", _ == Vector(bobTrigger1, bobTrigger2).sorted) // Stop Bob's triggers. resp <- stopTrigger(uri, bobTrigger1, "Bob") _ <- assert(resp.status.isSuccess) @@ -357,7 +351,7 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg } yield succeed } - it should "fail to start a trigger if a ledger client can't be obtained" in withHttpService( + it should "fail to start a trigger if a ledger client can't be obtained" in withTriggerServiceAndDb( Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => // Disable the proxy. This means that the service won't be able to // get a ledger connection. @@ -519,15 +513,20 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg it should "stopping a trigger that can't parse as a UUID gives a 404 response" in withHttpService( None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => + val uuid: String = "No More Mr Nice Guy" + val req = HttpRequest( + method = HttpMethods.DELETE, + uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")), + ) for { - resp <- stopTrigger(uri, "No More Mr Nice Guy", "Alice") + resp <- Http().singleRequest(req) _ <- assert(resp.status.isFailure() && resp.status.intValue() == 404) } yield succeed } it should "stopping an unknown trigger gives an error response" in withHttpService(None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => - val uuid: String = "ffffffff-ffff-ffff-ffff-ffffffffffff" + val uuid = UUID.fromString("ffffffff-ffff-ffff-ffff-ffffffffffff") for { resp <- stopTrigger(uri, uuid, "Alice") _ <- assert(resp.status.isFailure() && resp.status.intValue() == 404) diff --git a/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceFixture.scala b/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceFixture.scala index 1a97235cec57..6194e05a885f 100644 --- a/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceFixture.scala +++ b/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceFixture.scala @@ -31,10 +31,13 @@ import com.daml.platform.services.time.TimeProviderType import com.daml.ports.Port import com.daml.bazeltools.BazelRunfiles import com.daml.timer.RetryStrategy +import org.scalatest.Assertions._ + import scala.concurrent._ import scala.concurrent.duration._ import scala.sys.process.Process -import java.net.{Socket, ServerSocket, InetAddress} +import java.net.{InetAddress, ServerSocket, Socket} + import eu.rekawek.toxiproxy._ object TriggerServiceFixture { @@ -95,6 +98,15 @@ object TriggerServiceFixture { client <- LedgerClient.singleHost(host.getHostName(), ledgerPort, clientConfig(applicationId)) } yield client + val triggerDao: Option[TriggerDao] = + jdbcConfig.map(c => { + val dao = TriggerDao(c) + ServiceMain.initDatabase(dao) match { + case Left(err) => fail("Failed to initialize database: " ++ err.toString) + case Right(()) => dao + } + }) + // Configure the service with the ledger's *proxy* port. val serviceF: Future[(ServerBinding, TypedActorSystem[Server.Message])] = for { (_, _, ledgerProxyPort, _) <- ledgerF @@ -118,7 +130,7 @@ object TriggerServiceFixture { // For adding toxics. val ledgerProxyF: Future[Proxy] = for { (_, _, _, ledgerProxy) <- ledgerF - } yield ledgerProxy // Not used yet. + } yield ledgerProxy val fa: Future[A] = for { client <- clientF @@ -132,6 +144,10 @@ object TriggerServiceFixture { serviceF.foreach({ case (_, system) => system ! Server.Stop }) ledgerF.foreach(_._1.close()) toxiProxyProc.destroy() + triggerDao.map(dao => + ServiceMain.destroyDatabase(dao).getOrElse { err: String => + fail("Failed to remove database objects: " ++ err.toString) + }) } fa