Skip to content

Commit

Permalink
Added submission validator (digital-asset#4437)
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
miklos-da authored Feb 10, 2020
1 parent e02e5d6 commit c718397
Show file tree
Hide file tree
Showing 8 changed files with 616 additions and 45 deletions.
1 change: 0 additions & 1 deletion ledger/ledger-on-memory/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ da_scala_library(
],
deps = [
"//daml-lf/data",
"//daml-lf/engine",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/participant-state",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,30 @@
package com.daml.ledger.on.memory

import java.time.Clock
import java.util.concurrent.Semaphore

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.ledger.on.memory.InMemoryLedgerReaderWriter._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntryId,
DamlStateKey,
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.{
Envelope,
KeyValueCommitting,
SequentialLogEntryId,
}
import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, SequentialLogEntryId}
import com.daml.ledger.participant.state.v1._
import com.daml.ledger.validator.ValidationResult.{
MissingInputState,
SubmissionValidated,
ValidationError
}
import com.daml.ledger.validator._
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.ledger.api.health.{HealthStatus, Healthy}
import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher
import com.digitalasset.platform.akkastreams.dispatcher.SubSource.OneAfterAnother
import com.digitalasset.resources.ResourceOwner
import com.google.protobuf.ByteString

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.{breakOut, mutable}
import scala.concurrent.{ExecutionContext, Future}

private[memory] class LogEntry(val entryId: DamlLogEntryId, val payload: Array[Byte])
Expand All @@ -42,7 +39,7 @@ private[memory] object LogEntry {

private[memory] class InMemoryState(
val log: mutable.Buffer[LogEntry] = ArrayBuffer[LogEntry](),
val state: mutable.Map[ByteString, DamlStateValue] = mutable.Map.empty,
val state: mutable.Map[ByteString, Array[Byte]] = mutable.Map.empty,
)

final class InMemoryLedgerReaderWriter(
Expand All @@ -53,39 +50,62 @@ final class InMemoryLedgerReaderWriter(
extends LedgerWriter
with LedgerReader {

private val engine = Engine()

private val currentState = new InMemoryState()

override def commit(correlationId: String, envelope: Array[Byte]): Future[SubmissionResult] =
Future {
val submission = Envelope
.openSubmission(envelope)
.getOrElse(throw new IllegalArgumentException("Not a valid submission in envelope"))
currentState.synchronized {
val stateInputs: Map[DamlStateKey, Option[DamlStateValue]] =
submission.getInputDamlStateList.asScala
.map(key => key -> currentState.state.get(key.toByteString))(breakOut)
val entryId = sequentialLogEntryId.next()
val (logEntry, damlStateUpdates) =
KeyValueCommitting.processSubmission(
engine,
entryId,
currentRecordTime(),
LedgerReader.DefaultConfiguration,
submission,
participantId,
stateInputs,
)
val stateUpdates = damlStateUpdates.map {
case (damlStateKey, value) => damlStateKey.toByteString -> value
private val lockCurrentState = new Semaphore(1, true)

private val validator = SubmissionValidator.create(
new InMemoryLedgerStateAccess(participantId),
() => sequentialLogEntryId.next())

private class InMemoryLedgerStateAccess(theParticipantId: ParticipantId)
extends LedgerStateAccess {
override def inTransaction[T](body: LedgerStateOperations => Future[T]): Future[T] =
Future
.successful(lockCurrentState.acquire())
.flatMap(_ => body(InMemoryLedgerStateOperations))
.andThen {
case _ =>
lockCurrentState.release()
}
currentState.log += LogEntry(entryId, Envelope.enclose(logEntry).toByteArray)
currentState.state ++= stateUpdates
dispatcher.signalNewHead(currentState.log.size)

override def participantId: String = theParticipantId
}

private object InMemoryLedgerStateOperations extends BatchingLedgerStateOperations {
override def readState(keys: Seq[Key]): Future[Seq[Option[Value]]] =
Future.successful {
keys.map(keyBytes => currentState.state.get(ByteString.copyFrom(keyBytes)))
}
SubmissionResult.Acknowledged
}

override def writeState(keyValuePairs: Seq[(Key, Value)]): Future[Unit] =
Future.successful {
currentState.state ++= keyValuePairs.map {
case (keyBytes, valueBytes) => ByteString.copyFrom(keyBytes) -> valueBytes
}
}

override def appendToLog(key: Key, value: Value): Future[Unit] =
Future.successful {
val damlLogEntryId = KeyValueCommitting.unpackDamlLogEntryId(key)
val logEntry = LogEntry(damlLogEntryId, value)
val newHead = currentState.log.synchronized {
currentState.log += logEntry
currentState.log.size
}
dispatcher.signalNewHead(newHead)
}
}

override def commit(correlationId: String, envelope: Array[Byte]): Future[SubmissionResult] = {
validator
.validateAndCommit(envelope, correlationId, currentRecordTime())
.map {
case SubmissionValidated => SubmissionResult.Acknowledged
case MissingInputState(_) => SubmissionResult.InternalError("Missing input state")
case ValidationError(reason) => SubmissionResult.InternalError(reason)
}
}

override def events(offset: Option[Offset]): Source[LedgerRecord, NotUsed] =
dispatcher
Expand All @@ -108,7 +128,9 @@ final class InMemoryLedgerReaderWriter(
Timestamp.assertFromInstant(Clock.systemUTC().instant())

private def retrieveLogEntry(index: Int): LedgerRecord = {
val logEntry = currentState.log(index)
val logEntry = currentState.log.synchronized {
currentState.log(index)
}
LedgerRecord(Offset(Array(index.toLong)), logEntry.entryId, logEntry.payload)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ object Envelope {
case msg => Left(s"Expected state value, got ${msg.getClass}")
}

def openStateValue(envelopeBytes: Array[Byte]): Either[String, Proto.DamlStateValue] =
open(envelopeBytes).flatMap {
case StateValueMessage(entry) => Right(entry)
case msg => Left(s"Expected state value, got ${msg.getClass}")
}

private def compress(payload: ByteString): ByteString = {
val out = ByteString.newOutput
val gzipOut = new GZIPOutputStream(out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ object KeyValueCommitting {
DamlStateKey.parseFrom(bytes)

def packDamlStateValue(value: DamlStateValue): ByteString = value.toByteString
def unpackDamlStateValue(bytes: Array[Byte]): DamlStateValue =
DamlStateValue.parseFrom(bytes)
def unpackDamlStateValue(bytes: ByteString): DamlStateValue =
DamlStateValue.parseFrom(bytes)

Expand All @@ -39,6 +41,8 @@ object KeyValueCommitting {
DamlLogEntry.parseFrom(bytes)

def packDamlLogEntryId(entry: DamlLogEntryId): ByteString = entry.toByteString
def unpackDamlLogEntryId(bytes: Array[Byte]): DamlLogEntryId =
DamlLogEntryId.parseFrom(bytes)
def unpackDamlLogEntryId(bytes: ByteString): DamlLogEntryId =
DamlLogEntryId.parseFrom(bytes)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.validator

import scala.concurrent.{ExecutionContext, Future}

trait LedgerStateAccess {

/**
* Performs read and write operations on the backing store in a single atomic transaction.
* @param body operations to perform
* @tparam T type of result returned after execution
*/
def inTransaction[T](body: LedgerStateOperations => Future[T]): Future[T]

/**
* @return ID of the participant accessing the backing store.
*/
def participantId: String
}

trait LedgerStateOperations {
type Key = Array[Byte]
type Value = Array[Byte]

/**
* Reads value of a single key from the backing store.
* @param key key to look up data for
* @return value corresponding to requested key or None in case it does not exist
*/
def readState(key: Key): Future[Option[Value]]

/**
* Reads values of a set of keys from the backing store.
* @param keys list of keys to look up data for
* @return values corresponding to the requested keys, in the same order as requested
*/
def readState(keys: Seq[Key]): Future[Seq[Option[Value]]]

/**
* Writes a single key-value pair to the backing store. In case the key already exists its value is overwritten.
*/
def writeState(key: Key, value: Value): Future[Unit]

/**
* Writes a list of key-value pairs to the backing store. In case a key already exists its value is overwritten.
*/
def writeState(keyValuePairs: Seq[(Key, Value)]): Future[Unit]

/**
* Writes a single log entry to the backing store. The implementation may return Future.failed in case the key
* (i.e., the log entry ID) already exists.
*/
def appendToLog(key: Key, value: Value): Future[Unit]
}

/**
* Implements non-batching read and write operations on the backing store based on batched implementations.
*/
abstract class BatchingLedgerStateOperations(implicit executionContext: ExecutionContext)
extends LedgerStateOperations {
override def readState(key: Key): Future[Option[Value]] =
readState(Seq(key)).map(_.head)

override def writeState(key: Key, value: Value): Future[Unit] =
writeState(Seq((key, value)))
}

/**
* Implements batching read and write operations on the backing store based on non-batched implementations.
*/
abstract class NonBatchingLedgerStateOperations(implicit executionContext: ExecutionContext)
extends LedgerStateOperations {
override def readState(keys: Seq[Key]): Future[Seq[Option[Value]]] =
Future.sequence(keys.map(readState))

override def writeState(keyValuePairs: Seq[(Key, Value)]): Future[Unit] =
Future
.sequence(keyValuePairs.map {
case (key, value) => writeState(key, value)
})
.map(_ => ())
}
Loading

0 comments on commit c718397

Please sign in to comment.