Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger service: Write packages to database if we have one #6424

Merged
merged 7 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,35 +68,43 @@ final case class RunningTrigger(
runner: ActorRef[TriggerRunner.Message]
)

class Server {
class Server(triggerDao: RunningTriggerDao) {

private var triggerLog: Map[UUID, Vector[(LocalDateTime, String)]] = Map.empty;
private var triggerLog: Map[UUID, Vector[(LocalDateTime, String)]] = Map.empty

// We keep the compiled packages in memory as it is required to construct a trigger Runner.
// When running with a persistent store we also write packages to it so we can recover our state
// after the service shuts down or crashes.
val compiledPackages: MutableCompiledPackages = ConcurrentCompiledPackages()

// Add dar to compiledPackages and return the main package id
private def addDar(encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)]): Unit = {
val dar = encodedDar.map {
case (pkgId, pkgArchive) => Decode.readArchivePayload(pkgId, pkgArchive)
}
private def addPackagesInMemory(encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)]): Unit = {
// Decode the dar for the in-memory store.
val dar = encodedDar.map((Decode.readArchivePayload _).tupled)
val darMap = dar.all.toMap
darMap.foreach {
case (pkgId, pkg) =>
// If packages are not in topological order, we will get back
// ResultNeedPackage. The way the code is structured here we
// will still call addPackage even if we already fed the
// package via the callback but this is harmless and not
// expensive.
@scala.annotation.tailrec
def go(r: Result[Unit]): Unit = r match {
case ResultDone(()) => ()
case ResultNeedPackage(pkgId, resume) =>
go(resume(darMap.get(pkgId)))
case _ => throw new RuntimeException(s"Unexpected engine result $r")
}

go(compiledPackages.addPackage(pkgId, pkg))
// `addPackage` returns a ResultNeedPackage if a dependency is not yet uploaded.
// So we need to use the entire `darMap` to complete each call to `addPackage`.
// This will result in repeated calls to `addPackage` for the same package, but
// this is harmless and not expensive.
@scala.annotation.tailrec
def complete(r: Result[Unit]): Unit = r match {
case ResultDone(()) => ()
case ResultNeedPackage(dep, resume) =>
complete(resume(darMap.get(dep)))
case _ =>
throw new RuntimeException(s"Unexpected engine result $r from attempt to add package.")
}

darMap foreach {
case (pkgId, pkg) => complete(compiledPackages.addPackage(pkgId, pkg))
}
}

// Add a dar to compiledPackages (in memory) and to persistent storage if using it.
// Uploads of packages that already exist are considered harmless and are ignored.
private def addDar(encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit] = {
addPackagesInMemory(encodedDar)
triggerDao.persistPackages(encodedDar)
}

private def logTriggerStatus(triggerInstance: UUID, msg: String): Unit = {
Expand Down Expand Up @@ -163,8 +171,16 @@ object Server {
}
}

val server = new Server
initialDar.foreach(server.addDar)
val server = new Server(triggerDao)

initialDar foreach { dar =>
server.addDar(dar) match {
case Left(err) =>
ctx.log.error("Failed to upload provided DAR.\n" ++ err)
sys.exit(1)
case Right(()) =>
}
}

// http doesn't know about akka typed so provide untyped system
implicit val untypedSystem: akka.actor.ActorSystem = ctx.system.toClassic
Expand Down Expand Up @@ -274,9 +290,14 @@ object Server {
complete(errorResponse(StatusCodes.UnprocessableEntity, err.toString))
case Success(dar) =>
try {
server.addDar(dar)
val mainPackageId = JsObject(("mainPackageId", dar.main._1.name.toJson))
complete(successResponse(mainPackageId))
server.addDar(dar) match {
case Left(err) =>
complete(errorResponse(StatusCodes.InternalServerError, err))
case Right(()) =>
val mainPackageId =
JsObject(("mainPackageId", dar.main._1.name.toJson))
complete(successResponse(mainPackageId))
}
} catch {
case err: ParseError =>
complete(errorResponse(StatusCodes.UnprocessableEntity, err.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import java.util.UUID
import cats.effect.{ContextShift, IO}
import cats.syntax.apply._
import cats.syntax.functor._
import com.daml.daml_lf_dev.DamlLf
import com.daml.lf.archive.Dar
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.engine.trigger.{EncryptedToken, JdbcConfig, RunningTrigger, UserCredentials}
import doobie.free.connection.ConnectionIO
import doobie.implicits._
Expand Down Expand Up @@ -97,6 +100,17 @@ class DbTriggerDao(xa: Connection.T) extends RunningTriggerDao {
select.query[UUID].to[Vector]
}

// Insert a package to the `dalfs` table. Do nothing if the package already exists.
// We specify this in the `insert` since `packageId` is the primary key on the table.
private def insertPackage(
packageId: PackageId,
pkg: DamlLf.ArchivePayload): ConnectionIO[Unit] = {
val insert: Fragment = sql"""
insert into dalfs values (${packageId.toString}, ${pkg.toByteArray}) on conflict do nothing
"""
insert.update.run.void
}

// Drop all tables and other objects associated with the database.
// Only used between tests for now.
private def dropTables: ConnectionIO[Unit] = {
Expand Down Expand Up @@ -125,6 +139,14 @@ class DbTriggerDao(xa: Connection.T) extends RunningTriggerDao {
run(selectRunningTriggers(credentials.token)).map(_.sorted)
}

// Write packages to the `dalfs` table so we can recover state after a shutdown.
override def persistPackages(
dar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit] = {
import cats.implicits._
val insertAll = dar.all.traverse_((insertPackage _).tupled)
run(insertAll)
}

def initialize: Either[String, Unit] =
run(createTables(logHandler), "Failed to initialize database.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package com.daml.lf.engine.trigger.dao

import java.util.UUID

import com.daml.daml_lf_dev.DamlLf
import com.daml.lf.archive.Dar
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.engine.trigger.{RunningTrigger, UserCredentials}

class InMemoryTriggerDao extends RunningTriggerDao {
Expand All @@ -30,6 +33,10 @@ class InMemoryTriggerDao extends RunningTriggerDao {
override def listRunningTriggers(credentials: UserCredentials): Either[String, Vector[UUID]] = {
Right(triggersByParty.getOrElse(credentials, Set()).toVector.sorted)
}

// This is only possible when running with persistence. For in-memory mode we do nothing.
override def persistPackages(dar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit] =
Right(())
}

object InMemoryTriggerDao {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ package com.daml.lf.engine.trigger.dao

import java.util.UUID

import com.daml.daml_lf_dev.DamlLf
import com.daml.lf.archive.Dar
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.engine.trigger.{RunningTrigger, UserCredentials}

trait RunningTriggerDao {
def addRunningTrigger(t: RunningTrigger): Either[String, Unit]
def removeRunningTrigger(triggerInstance: UUID): Either[String, Boolean]
def listRunningTriggers(credentials: UserCredentials): Either[String, Vector[UUID]]
def persistPackages(dar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ abstract class AbstractTriggerServiceTest extends AsyncFlatSpec with Eventually
Future(succeed)
}

it should "allow repeated uploads of the same packages" in
withTriggerService(Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
for {
resp <- uploadDar(uri, darPath) // same dar as in initialization
_ <- parseResult(resp)
resp <- uploadDar(uri, darPath) // same dar again
_ <- parseResult(resp)
} yield succeed
}

it should "fail to start non-existent trigger" in withTriggerService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val expectedError = StatusCodes.UnprocessableEntity
Expand Down