Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger service: Write packages to database if we have one #6424

Merged
merged 7 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Ignore repeated package uploads
  • Loading branch information
rohanjr committed Jun 19, 2020
commit b5f0efc239e3709e1f1bc0854459180e06237251
Original file line number Diff line number Diff line change
Expand Up @@ -77,33 +77,33 @@ class Server(triggerDao: RunningTriggerDao) {
// after the service shuts down or crashes.
val compiledPackages: MutableCompiledPackages = ConcurrentCompiledPackages()

// Add a dar to compiledPackages (in memory) and to persistent storage if using it.
// TODO(RJR): Figure out what to do with packages that already exist in `compiledPackages`.
private def addDar(encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit] = {
private def addPackagesInMemory(encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)]): Unit = {
// Decode the dar for the in-memory store.
val dar = encodedDar.map((Decode.readArchivePayload _).tupled)
val darMap = dar.all.toMap

darMap foreach {
case (pkgId, pkg) =>
// If packages are not in topological order, we will get back
// ResultNeedPackage. The way the code is structured here we
// will still call addPackage even if we already fed the
// package via the callback but this is harmless and not
// expensive.
@scala.annotation.tailrec
def go(r: Result[Unit]): Unit = r match {
case ResultDone(()) => ()
case ResultNeedPackage(pkgId, resume) =>
go(resume(darMap.get(pkgId)))
case _ => throw new RuntimeException(s"Unexpected engine result $r")
}
// `addPackage` returns a ResultNeedPackage if a dependency is not yet uploaded.
// So we need to use the entire `darMap` to complete each call to `addPackage`.
// This will result in repeated calls to `addPackage` for the same package, but
// this is harmless and not expensive.
@scala.annotation.tailrec
def complete(r: Result[Unit]): Unit = r match {
case ResultDone(()) => ()
case ResultNeedPackage(dep, resume) =>
complete(resume(darMap.get(dep)))
case _ =>
throw new RuntimeException(s"Unexpected engine result $r from attempt to add package.")
}

go(compiledPackages.addPackage(pkgId, pkg))
darMap foreach {
case (pkgId, pkg) => complete(compiledPackages.addPackage(pkgId, pkg))
}
}

// TODO(RJR): Only attempt to write packages that aren't already uploaded.
// Otherwise the transaction will fail due to a primary key conflict.
// Add a dar to compiledPackages (in memory) and to persistent storage if using it.
// Uploads of packages that already exist are considered harmless and are ignored.
private def addDar(encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit] = {
addPackagesInMemory(encodedDar)
triggerDao.persistPackages(encodedDar)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import doobie.{LogHandler, Transactor, _}

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
//import doobie.free.connection._
//import scalaz._, Scalaz._
//import scalaz.std.list._
//import scalaz.syntax.traverse._

object Connection {

Expand Down Expand Up @@ -104,9 +100,13 @@ class DbTriggerDao(xa: Connection.T) extends RunningTriggerDao {
select.query[UUID].to[Vector]
}

private def insertDalf(packageId: PackageId, pkg: DamlLf.ArchivePayload): ConnectionIO[Unit] = {
// Insert a package to the `dalfs` table. Do nothing if the package already exists.
// We specify this in the `insert` since `packageId` is the primary key on the table.
private def insertPackage(
packageId: PackageId,
pkg: DamlLf.ArchivePayload): ConnectionIO[Unit] = {
val insert: Fragment = sql"""
insert into dalfs values (${packageId.toString}, ${pkg.toByteArray})
insert into dalfs values (${packageId.toString}, ${pkg.toByteArray}) on conflict do nothing
"""
insert.update.run.void
}
Expand Down Expand Up @@ -139,12 +139,11 @@ class DbTriggerDao(xa: Connection.T) extends RunningTriggerDao {
run(selectRunningTriggers(credentials.token)).map(_.sorted)
}

// Write packages to the `dalfs` table so we can recover state after a shutdown.
override def persistPackages(
dar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit] = {
import cats.implicits._
val insertAll = dar.all.traverse_((insertDalf _).tupled)
// Just insert main package for draft until above is fixed.
// val insertMain = (insertDalf _).tupled(dar.main)
val insertAll = dar.all.traverse_((insertPackage _).tupled)
run(insertAll)
}

Expand Down