Skip to content

Commit

Permalink
Trigger service: streamline running tests with and without the databa…
Browse files Browse the repository at this point in the history
…se (digital-asset#6205)

This adds a function withTriggerServiceAndDb which runs a test twice, once with and once without a database, and succeeds if both succeed. This will be useful for reusing test logic with both backends and making sure behaviour is consistent. I have used this function where possible, but it won't work for everything until stop is implemented on the DB side.

At the moment this new function squashes two tests into one making it hard to tell whether it failed with or without the database. In a future PR I will investigate using an abstract class to run the tests separately (hopefully with altered descriptions).

This feature required a few changes in the process, mainly:
- Use PostgresAroundAll to connect/disconnect to the database before and after all tests run
- Add a destroy method to the TriggerDao to reset the database between tests
- Use the TriggerDao in the withTriggerService functions to initialize / clean up the database at the start / end of each test
- Sort trigger instances from list using Scala's sort, not relying on Postgres' ordering of UUIDs. This also means we need to use UUIDs for trigger instances in the tests and sort nonempty vectors in expected results.
  • Loading branch information
rohanjr authored Jun 4, 2020
1 parent f1822f6 commit 09254a0
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ final case class RunningTrigger(
runner: ActorRef[TriggerRunner.Message]
)

class Server(dar: Option[Dar[(PackageId, Package)]], jdbcConfig: Option[JdbcConfig]) {
class Server(dar: Option[Dar[(PackageId, Package)]], triggerDao: Option[TriggerDao]) {

private var triggers: Map[UUID, RunningTrigger] = Map.empty;
private var triggersByToken: Map[Jwt, Set[UUID]] = Map.empty;
Expand All @@ -76,12 +76,6 @@ class Server(dar: Option[Dar[(PackageId, Package)]], jdbcConfig: Option[JdbcConf
val compiledPackages: MutableCompiledPackages = ConcurrentCompiledPackages()
dar.foreach(addDar)

// Note(RJR): It feels wrong to use a different execution context from that of the actor
// system but I haven't seen any misbehaviour due to this yet. We can revisit this if there's
// an issue. (Threading through the actor system execution context was a pain which is why
// I went with the global option instead.)
val triggerDao: Option[TriggerDao] = jdbcConfig.map(TriggerDao(_)(ExecutionContext.global))

private def addDar(dar: Dar[(PackageId, Package)]): Unit = {
val darMap = dar.all.toMap
darMap.foreach {
Expand Down Expand Up @@ -128,7 +122,7 @@ class Server(dar: Option[Dar[(PackageId, Package)]], jdbcConfig: Option[JdbcConf
}

private def listRunningTriggers(jwt: Jwt): Either[String, Vector[UUID]] = {
triggerDao match {
val triggerInstances = triggerDao match {
case None =>
Right(triggersByToken.getOrElse(jwt, Set()).toVector)
case Some(dao) =>
Expand All @@ -138,6 +132,9 @@ class Server(dar: Option[Dar[(PackageId, Package)]], jdbcConfig: Option[JdbcConf
case Success(triggerInstances) => Right(triggerInstances)
}
}
// Note(RJR): We sort UUIDs here using Java's comparison of UUIDs.
// We do not rely on the Postgres ordering which is different.
triggerInstances.map(_.sorted)
}

private def logTriggerStatus(t: RunningTrigger, msg: String): Unit = {
Expand Down Expand Up @@ -189,7 +186,8 @@ object Server {
dar: Option[Dar[(PackageId, Package)]],
jdbcConfig: Option[JdbcConfig],
): Behavior[Message] = Behaviors.setup { ctx =>
val server = new Server(dar, jdbcConfig)
val triggerDao = jdbcConfig.map(TriggerDao(_)(ctx.system.executionContext))
val server = new Server(dar, triggerDao)

// http doesn't know about akka typed so provide untyped system
implicit val untypedSystem: akka.actor.ActorSystem = ctx.system.toClassic
Expand Down Expand Up @@ -522,9 +520,16 @@ object ServiceMain {
bindingFuture.map(server => (server, system))
}

def initDatabase(c: JdbcConfig)(implicit ec: ExecutionContext): Either[String, Unit] = {
val triggerDao = TriggerDao(c)(ec)
val transaction = triggerDao.transact(TriggerDao.initialize(triggerDao.logHandler))
def initDatabase(dao: TriggerDao): Either[String, Unit] = {
val transaction = dao.transact(TriggerDao.initialize(dao.logHandler))
Try(transaction.unsafeRunSync()) match {
case Failure(err) => Left(err.toString)
case Success(()) => Right(())
}
}

def destroyDatabase(dao: TriggerDao): Either[String, Unit] = {
val transaction = dao.transact(TriggerDao.destroy)
Try(transaction.unsafeRunSync()) match {
case Failure(err) => Left(err.toString)
case Success(()) => Right(())
Expand Down Expand Up @@ -576,7 +581,8 @@ object ServiceMain {
system.log.error("No JDBC configuration for database initialization.")
sys.exit(1)
case (true, Some(jdbcConfig)) =>
initDatabase(jdbcConfig) match {
val dao = TriggerDao(jdbcConfig)(ec)
initDatabase(dao) match {
case Left(err) =>
system.log.error(err)
sys.exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ object TriggerDao {
*> createPartyIndex.update.run).void
}

// Drop all tables and other objects associated with the database.
// Only used between tests for now.
def destroy: ConnectionIO[Unit] = {
val dropTriggerTable: Fragment = sql"drop table running_triggers"
val dropDalfTable: Fragment = sql"drop table dalfs"
(dropTriggerTable.update.run
*> dropDalfTable.update.run).void
}

def addRunningTrigger(t: RunningTrigger): ConnectionIO[Unit] = {
val partyToken = t.jwt.value
val fullTriggerName = t.triggerName.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, Tra
import com.daml.ledger.client.LedgerClient
import com.daml.jwt.JwtSigner
import com.daml.jwt.domain.{DecodedJwt, Jwt}
import com.daml.testing.postgresql.PostgresAroundSuite
import com.daml.testing.postgresql.PostgresAroundAll
import eu.rekawek.toxiproxy._

class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with PostgresAroundSuite {
class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with PostgresAroundAll {

override implicit def patienceConfig: PatienceConfig =
PatienceConfig(timeout = scaled(Span(15, Seconds)), interval = scaled(Span(1, Seconds)))
Expand All @@ -52,6 +52,9 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
}
val testPkgId = dar.main._1

// Lazy because the postgresDatabase is only available once the tests start
private lazy val jdbcConfig = JdbcConfig(postgresDatabase.url, "operator", "password")

def submitCmd(client: LedgerClient, party: String, cmd: Command) = {
val req = SubmitAndWaitRequest(
Some(
Expand Down Expand Up @@ -80,26 +83,33 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
.fold(e => fail(s"cannot sign a JWT: ${e.shows}"), identity)
}

protected def headersWithAuth(party: String) = authorizationHeader(jwt(party))

protected def authorizationHeader(token: Jwt): List[Authorization] =
protected def headersWithAuth(party: String): List[Authorization] = {
val token = jwt(party)
List(Authorization(OAuth2BearerToken(token.value)))
}

def withHttpService[A](
triggerDar: Option[Dar[(PackageId, Package)]],
jdbcConfig: Option[JdbcConfig] = None)
def withHttpService[A](triggerDar: Option[Dar[(PackageId, Package)]])
: ((Uri, LedgerClient, Proxy) => Future[A]) => Future[A] =
TriggerServiceFixture
.withTriggerService[A](testId, List(darPath), triggerDar, jdbcConfig)
.withTriggerService[A](testId, List(darPath), triggerDar, None)

def withTriggerServiceAndDb[A](dar: Option[Dar[(PackageId, Package)]])
: ((Uri, LedgerClient, Proxy) => Future[A]) => Future[Assertion] =
testFn =>
for {
_ <- TriggerServiceFixture.withTriggerService(testId, List(darPath), dar, None)(testFn)
_ <- TriggerServiceFixture.withTriggerService(testId, List(darPath), dar, Some(jdbcConfig))(
testFn)
} yield succeed

def startTrigger(uri: Uri, id: String, party: String): Future[HttpResponse] = {
def startTrigger(uri: Uri, triggerName: String, party: String): Future[HttpResponse] = {
val req = HttpRequest(
method = HttpMethods.POST,
uri = uri.withPath(Uri.Path("/v1/start")),
headers = headersWithAuth(party),
entity = HttpEntity(
ContentTypes.`application/json`,
s"""{"triggerName": "$id"}"""
s"""{"triggerName": "$triggerName"}"""
)
)
Http().singleRequest(req)
Expand All @@ -114,18 +124,20 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
Http().singleRequest(req)
}

def triggerStatus(uri: Uri, id: String): Future[HttpResponse] = {
def triggerStatus(uri: Uri, triggerInstance: UUID): Future[HttpResponse] = {
val id = triggerInstance.toString
val req = HttpRequest(
method = HttpMethods.GET,
uri = uri.withPath(Uri.Path(s"/v1/status/$id")),
)
Http().singleRequest(req)
}

def stopTrigger(uri: Uri, id: String, party: String): Future[HttpResponse] = {
def stopTrigger(uri: Uri, triggerInstance: UUID, party: String): Future[HttpResponse] = {
val id = triggerInstance.toString
val req = HttpRequest(
method = HttpMethods.DELETE,
headers = headersWithAuth(s"${party}"),
headers = headersWithAuth(party),
uri = uri.withPath(Uri.Path(s"/v1/stop/$id")),
)
Http().singleRequest(req)
Expand Down Expand Up @@ -160,19 +172,19 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
} yield result
}

def parseTriggerId(resp: HttpResponse): Future[String] = {
def parseTriggerId(resp: HttpResponse): Future[UUID] = {
for {
JsObject(fields) <- parseResult(resp)
Some(JsString(triggerId)) = fields.get("triggerId")
} yield triggerId
} yield UUID.fromString(triggerId)
}

def parseTriggerIds(resp: HttpResponse): Future[Vector[String]] = {
def parseTriggerIds(resp: HttpResponse): Future[Vector[UUID]] = {
for {
JsObject(fields) <- parseResult(resp)
Some(JsArray(ids)) = fields.get("triggerIds")
triggerIds = ids map {
case JsString(id) => id
case JsString(id) => UUID.fromString(id)
case _ => fail("""Non-string element of "triggerIds" field""")
}
} yield triggerIds
Expand All @@ -181,7 +193,7 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
def assertTriggerIds(
uri: Uri,
party: String,
pred: (Vector[String]) => Boolean): Future[Assertion] = {
pred: Vector[UUID] => Boolean): Future[Assertion] = {
eventually {
val actualTriggerIds = Await.result(for {
resp <- listTriggers(uri, party)
Expand All @@ -204,57 +216,39 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg

def assertTriggerStatus(
uri: Uri,
id: String,
triggerInstance: UUID,
pred: (Vector[String]) => Boolean): Future[Assertion] = {
eventually {
val actualTriggerStatus = Await.result(for {
resp <- triggerStatus(uri, id)
resp <- triggerStatus(uri, triggerInstance)
result <- parseTriggerStatus(resp)
} yield result, Duration.Inf)
assert(pred(actualTriggerStatus))
}
}

it should "initialize database" in {
connectToPostgresqlServer()
createNewDatabase()
val testJdbcConfig = JdbcConfig(postgresDatabase.url, "operator", "password")
assert(ServiceMain.initDatabase(testJdbcConfig).isRight)
dropDatabase()
disconnectFromPostgresqlServer()
succeed
}
it should "start up and shut down server" in
withTriggerServiceAndDb(Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
Future(succeed)
}

it should "add running triggers to the database" in {
connectToPostgresqlServer()
createNewDatabase()
val testJdbcConfig = JdbcConfig(postgresDatabase.url, "operator", "password")
assert(ServiceMain.initDatabase(testJdbcConfig).isRight)
Await.result(
withHttpService(Some(dar), Some(testJdbcConfig)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
for {
// Initially no triggers started for Alice
_ <- assertTriggerIds(uri, "Alice", _ == Vector())
// Start a trigger for Alice and check it appears in list.
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice")
trigger1 <- parseTriggerId(resp)
_ <- assertTriggerIds(uri, "Alice", _ == Vector(trigger1))
// Do the same for a second trigger.
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice")
trigger2 <- parseTriggerId(resp)
expected = Vector(trigger1, trigger2).sorted
_ <- assertTriggerIds(uri, "Alice", _ == expected)
} yield succeed
},
Duration.Inf
)
dropDatabase()
disconnectFromPostgresqlServer()
succeed
}
it should "add running triggers" in
withTriggerServiceAndDb(Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
for {
// Initially no triggers started for Alice
_ <- assertTriggerIds(uri, "Alice", _ == Vector())
// Start a trigger for Alice and check it appears in list.
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice")
trigger1 <- parseTriggerId(resp)
_ <- assertTriggerIds(uri, "Alice", _ == Vector(trigger1))
// Do the same for a second trigger.
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice")
trigger2 <- parseTriggerId(resp)
_ <- assertTriggerIds(uri, "Alice", _ == Vector(trigger1, trigger2).sorted)
} yield succeed
}

it should "fail to start non-existent trigger" in withHttpService(Some(dar)) {
it should "fail to start non-existent trigger" in withTriggerServiceAndDb(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val expectedError = StatusCodes.UnprocessableEntity
for {
Expand Down Expand Up @@ -302,12 +296,12 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
// Start another trigger for Bob.
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Bob")
bobTrigger2 <- parseTriggerId(resp)
_ <- assertTriggerIds(uri, "Bob", _ == Vector(bobTrigger1, bobTrigger2))
_ <- assertTriggerIds(uri, "Bob", _ == Vector(bobTrigger1, bobTrigger2).sorted)
// Stop Alice's trigger.
resp <- stopTrigger(uri, aliceTrigger, "Alice")
_ <- assert(resp.status.isSuccess)
_ <- assertTriggerIds(uri, "Alice", _.isEmpty)
_ <- assertTriggerIds(uri, "Bob", _ == Vector(bobTrigger1, bobTrigger2))
_ <- assertTriggerIds(uri, "Bob", _ == Vector(bobTrigger1, bobTrigger2).sorted)
// Stop Bob's triggers.
resp <- stopTrigger(uri, bobTrigger1, "Bob")
_ <- assert(resp.status.isSuccess)
Expand Down Expand Up @@ -357,7 +351,7 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
} yield succeed
}

it should "fail to start a trigger if a ledger client can't be obtained" in withHttpService(
it should "fail to start a trigger if a ledger client can't be obtained" in withTriggerServiceAndDb(
Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
// Disable the proxy. This means that the service won't be able to
// get a ledger connection.
Expand Down Expand Up @@ -519,15 +513,20 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg

it should "stopping a trigger that can't parse as a UUID gives a 404 response" in withHttpService(
None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid: String = "No More Mr Nice Guy"
val req = HttpRequest(
method = HttpMethods.DELETE,
uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")),
)
for {
resp <- stopTrigger(uri, "No More Mr Nice Guy", "Alice")
resp <- Http().singleRequest(req)
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
} yield succeed
}

it should "stopping an unknown trigger gives an error response" in withHttpService(None) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid: String = "ffffffff-ffff-ffff-ffff-ffffffffffff"
val uuid = UUID.fromString("ffffffff-ffff-ffff-ffff-ffffffffffff")
for {
resp <- stopTrigger(uri, uuid, "Alice")
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ import com.daml.platform.services.time.TimeProviderType
import com.daml.ports.Port
import com.daml.bazeltools.BazelRunfiles
import com.daml.timer.RetryStrategy
import org.scalatest.Assertions._

import scala.concurrent._
import scala.concurrent.duration._
import scala.sys.process.Process
import java.net.{Socket, ServerSocket, InetAddress}
import java.net.{InetAddress, ServerSocket, Socket}

import eu.rekawek.toxiproxy._

object TriggerServiceFixture {
Expand Down Expand Up @@ -95,6 +98,15 @@ object TriggerServiceFixture {
client <- LedgerClient.singleHost(host.getHostName(), ledgerPort, clientConfig(applicationId))
} yield client

val triggerDao: Option[TriggerDao] =
jdbcConfig.map(c => {
val dao = TriggerDao(c)
ServiceMain.initDatabase(dao) match {
case Left(err) => fail("Failed to initialize database: " ++ err.toString)
case Right(()) => dao
}
})

// Configure the service with the ledger's *proxy* port.
val serviceF: Future[(ServerBinding, TypedActorSystem[Server.Message])] = for {
(_, _, ledgerProxyPort, _) <- ledgerF
Expand All @@ -118,7 +130,7 @@ object TriggerServiceFixture {
// For adding toxics.
val ledgerProxyF: Future[Proxy] = for {
(_, _, _, ledgerProxy) <- ledgerF
} yield ledgerProxy // Not used yet.
} yield ledgerProxy

val fa: Future[A] = for {
client <- clientF
Expand All @@ -132,6 +144,10 @@ object TriggerServiceFixture {
serviceF.foreach({ case (_, system) => system ! Server.Stop })
ledgerF.foreach(_._1.close())
toxiProxyProc.destroy()
triggerDao.map(dao =>
ServiceMain.destroyDatabase(dao).getOrElse { err: String =>
fail("Failed to remove database objects: " ++ err.toString)
})
}

fa
Expand Down

0 comments on commit 09254a0

Please sign in to comment.