Skip to content

Commit

Permalink
Ignore repeated package uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
rohanjr committed Jun 19, 2020
1 parent eed0754 commit 44e53f9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,33 +77,33 @@ class Server(triggerDao: RunningTriggerDao) {
// after the service shuts down or crashes.
val compiledPackages: MutableCompiledPackages = ConcurrentCompiledPackages()

// Add a dar to compiledPackages (in memory) and to persistent storage if using it.
// TODO(RJR): Figure out what to do with packages that already exist in `compiledPackages`.
private def addDar(encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit] = {
private def addPackagesInMemory(encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)]): Unit = {
// Decode the dar for the in-memory store.
val dar = encodedDar.map((Decode.readArchivePayload _).tupled)
val darMap = dar.all.toMap

darMap foreach {
case (pkgId, pkg) =>
// If packages are not in topological order, we will get back
// ResultNeedPackage. The way the code is structured here we
// will still call addPackage even if we already fed the
// package via the callback but this is harmless and not
// expensive.
@scala.annotation.tailrec
def go(r: Result[Unit]): Unit = r match {
case ResultDone(()) => ()
case ResultNeedPackage(pkgId, resume) =>
go(resume(darMap.get(pkgId)))
case _ => throw new RuntimeException(s"Unexpected engine result $r")
}
// `addPackage` returns a ResultNeedPackage if a dependency is not yet uploaded.
// So we need to use the entire `darMap` to complete each call to `addPackage`.
// This will result in repeated calls to `addPackage` for the same package, but
// this is harmless and not too slow.
@scala.annotation.tailrec
def complete(r: Result[Unit]): Unit = r match {
case ResultDone(()) => ()
case ResultNeedPackage(dep, resume) =>
complete(resume(darMap.get(dep)))
case _ =>
throw new RuntimeException(s"Unexpected engine result $r from attempt to add package.")
}

go(compiledPackages.addPackage(pkgId, pkg))
darMap foreach {
case (pkgId, pkg) => complete(compiledPackages.addPackage(pkgId, pkg))
}
}

// TODO(RJR): Only attempt to write packages that aren't already uploaded.
// Otherwise the transaction will fail due to a primary key conflict.
// Add a dar to compiledPackages (in memory) and to persistent storage if using it.
// Uploads of packages that already exist are considered harmless and are ignored.
private def addDar(encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit] = {
addPackagesInMemory(encodedDar)
triggerDao.persistPackages(encodedDar)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import doobie.{LogHandler, Transactor, _}

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
//import doobie.free.connection._
//import scalaz._, Scalaz._
//import scalaz.std.list._
//import scalaz.syntax.traverse._

object Connection {

Expand Down Expand Up @@ -104,9 +100,13 @@ class DbTriggerDao(xa: Connection.T) extends RunningTriggerDao {
select.query[UUID].to[Vector]
}

private def insertDalf(packageId: PackageId, pkg: DamlLf.ArchivePayload): ConnectionIO[Unit] = {
// Insert a package to the `dalfs` table. Do nothing if the package already exists.
// We specify this in the `insert` since `packageId` is the primary key on the table.
private def insertPackage(
packageId: PackageId,
pkg: DamlLf.ArchivePayload): ConnectionIO[Unit] = {
val insert: Fragment = sql"""
insert into dalfs values (${packageId.toString}, ${pkg.toByteArray})
insert into dalfs values (${packageId.toString}, ${pkg.toByteArray}) on conflict do nothing
"""
insert.update.run.void
}
Expand Down Expand Up @@ -139,12 +139,11 @@ class DbTriggerDao(xa: Connection.T) extends RunningTriggerDao {
run(selectRunningTriggers(credentials.token)).map(_.sorted)
}

// Write packages to the `dalfs` table so we can recover state after a shutdown.
override def persistPackages(
dar: Dar[(PackageId, DamlLf.ArchivePayload)]): Either[String, Unit] = {
import cats.implicits._
val insertAll = dar.all.traverse_((insertDalf _).tupled)
// Just insert main package for draft until above is fixed.
// val insertMain = (insertDalf _).tupled(dar.main)
val insertAll = dar.all.traverse_((insertPackage _).tupled)
run(insertAll)
}

Expand Down

0 comments on commit 44e53f9

Please sign in to comment.