Skip to content

Commit

Permalink
Use separate thread to preload dars in KVBC committer (digital-asset#…
Browse files Browse the repository at this point in the history
…2924)

* use separate thread to preload dars in KVBC committer

* fix formatting
  • Loading branch information
mziolekda authored Sep 17, 2019
1 parent 277bfcb commit 7c4cc96
Showing 1 changed file with 40 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.ledger.participant.state.kvutils.committing

import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

import com.daml.ledger.participant.state.kvutils.Conversions.buildTimestamp
Expand All @@ -16,13 +17,15 @@ import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.collection.breakOut
import scala.concurrent.{Future, ExecutionContext}

private[kvutils] case class ProcessPackageUpload(
engine: Engine,
entryId: DamlLogEntryId,
recordTime: Timestamp,
packageUploadEntry: DamlPackageUploadEntry,
inputState: Map[DamlStateKey, Option[DamlStateValue]]) {
import ProcessPackageUpload._

private val submissionId = packageUploadEntry.getSubmissionId
private val logger =
Expand All @@ -31,6 +34,38 @@ private[kvutils] case class ProcessPackageUpload(

private val archives = packageUploadEntry.getArchivesList.asScala

private def preload(): Future[Unit] =
Future {
logger.trace("Preloading engine...")
val loadedPackages = engine.compiledPackages().packageIds
val t0 = System.nanoTime()
val packages = Map(
archives
.filterNot(
a =>
Ref.PackageId
.fromString(a.getHash)
.fold(_ => false, loadedPackages.contains))
.map { archive =>
Decode.readArchiveAndVersion(archive)._1
}: _*)
val t1 = System.nanoTime()
logger.trace(s"Decoding of ${packages.size} archives completed in ${TimeUnit.NANOSECONDS
.toMillis(t1 - t0)}ms")
packages.headOption.foreach {
case (pkgId, pkg) =>
engine
.preloadPackage(pkgId, pkg)
.consume(
_ => sys.error("Unexpected request to PCS in preloadPackage"),
pkgId => packages.get(pkgId),
_ => sys.error("Unexpected request to keys in preloadPackage")
)
}
val t2 = System.nanoTime()
logger.trace(s"Preload completed in ${TimeUnit.NANOSECONDS.toMillis(t2 - t0)}ms")
}(serialContext)

def run: (DamlLogEntry, Map[DamlStateKey, DamlStateValue]) = {
// TODO: Add more comprehensive validity test, in particular, take the transitive closure
// of all packages being uploaded and see if they compile
Expand All @@ -48,36 +83,6 @@ private[kvutils] case class ProcessPackageUpload(
DamlPackageUploadRejectionEntry.InvalidPackage.newBuilder
.setDetails(error)))
case (_, _) =>
// Preload the engine.
logger.trace("Preloading engine...")
val t0 = System.nanoTime()
val loadedPackages = engine.compiledPackages().packageIds
val packages = Map(
archives
.filterNot(
a =>
Ref.PackageId
.fromString(a.getHash)
.fold(_ => false, loadedPackages.contains))
.map { archive =>
Decode.readArchiveAndVersion(archive)._1
}: _*)
val t1 = System.nanoTime()
logger.trace(s"Decoding of ${packages.size} archives completed in ${TimeUnit.NANOSECONDS
.toMillis(t1 - t0)}ms")
packages.headOption.foreach {
case (pkgId, pkg) =>
engine
.preloadPackage(pkgId, pkg)
.consume(
_ => sys.error("Unexpected request to PCS in preloadPackage"),
pkgId => packages.get(pkgId),
_ => sys.error("Unexpected request to keys in preloadPackage")
)
}
val t2 = System.nanoTime()
logger.trace(s"Preload completed in ${TimeUnit.NANOSECONDS.toMillis(t2 - t0)}ms")

// Filter out archives that already exists.
val filteredArchives = archives
.filter { archive =>
Expand All @@ -88,6 +93,7 @@ private[kvutils] case class ProcessPackageUpload(
.getOrElse(stateKey, throw Err.MissingInputState(stateKey))
.isEmpty
}
preload()
logger.trace(s"Packages committed: ${filteredArchives.map(_.getHash).mkString(", ")}")
(
DamlLogEntry.newBuilder
Expand Down Expand Up @@ -133,3 +139,7 @@ private[kvutils] case class ProcessPackageUpload(
}

}

object ProcessPackageUpload {
val serialContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
}

0 comments on commit 7c4cc96

Please sign in to comment.