Skip to content

Commit

Permalink
kvutils: Use resources in KeyValueParticipantState, not AutoClo… (dig…
Browse files Browse the repository at this point in the history
…ital-asset#4177)

* kvutils: Use resources in KeyValueParticipantState, not AutoCloseable.

CHANGELOG_BEGIN
- [kvutils] The simplified API now uses ``com.digitalasset.resources``
  to manage acquiring and releasing resources instead of ``Closeable``.
CHANGELOG_END

* ledger-on-posix-filesystem: Remove dead code due to a refactoring fail.
  • Loading branch information
SamirTalwar authored Jan 23, 2020
1 parent a3de2fb commit e13f9a7
Show file tree
Hide file tree
Showing 31 changed files with 288 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,21 @@ package com.daml.ledger.api.server.damlonx.reference.v2
import java.time.Clock

import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.resources.ResourceOwner

class InMemoryKVParticipantStateIT
extends ParticipantStateIntegrationSpecBase("In-memory participant state for Reference v2") {

override def participantStateFactory(
participantId: ParticipantId,
ledgerId: LedgerString): ReadService with WriteService with AutoCloseable =
new InMemoryKVParticipantState(participantId, ledgerId)
ledgerId: LedgerString,
): ResourceOwner[ParticipantState] =
ResourceOwner.forCloseable(() => new InMemoryKVParticipantState(participantId, ledgerId)).vary

override def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())

override protected def afterEach(): Unit = {
ps.asInstanceOf[InMemoryKVParticipantState].close()
super.afterEach()
}
}
2 changes: 2 additions & 0 deletions ledger/ledger-on-memory/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ da_scala_library(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"//libs-scala/resources",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
Expand All @@ -49,6 +50,7 @@ da_scala_test(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"//libs-scala/resources",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:org_scalactic_scalactic_2_12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import java.time.Clock

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.ledger.on.memory.InMemoryLedgerReaderWriter._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntryId,
DamlStateKey,
Expand All @@ -25,6 +26,7 @@ import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.ledger.api.health.{HealthStatus, Healthy}
import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher
import com.digitalasset.platform.akkastreams.dispatcher.SubSource.OneAfterAnother
import com.digitalasset.resources.ResourceOwner
import com.google.protobuf.ByteString

import scala.collection.JavaConverters._
Expand All @@ -46,7 +48,8 @@ private[memory] class InMemoryState(

final class InMemoryLedgerReaderWriter(
ledgerId: LedgerId,
val participantId: ParticipantId,
override val participantId: ParticipantId,
dispatcher: Dispatcher[Index],
)(implicit executionContext: ExecutionContext)
extends LedgerWriter
with LedgerReader {
Expand All @@ -55,8 +58,6 @@ final class InMemoryLedgerReaderWriter(

private val currentState = new InMemoryState()

private val StartOffset: Int = 0

override def commit(correlationId: String, envelope: Array[Byte]): Future[SubmissionResult] =
Future {
val submission = Envelope
Expand Down Expand Up @@ -105,7 +106,7 @@ final class InMemoryLedgerReaderWriter(
.startingAt(
offset
.map(_.components.head.toInt)
.getOrElse(StartOffset),
.getOrElse(StartIndex),
OneAfterAnother[Int, List[LedgerRecord]](
(index: Int, _) => index + 1,
(index: Int) => Future.successful(List(retrieveLogEntry(index)))
Expand All @@ -117,17 +118,6 @@ final class InMemoryLedgerReaderWriter(

override def currentHealth(): HealthStatus = Healthy

override def close(): Unit = {
dispatcher.close()
}

private val dispatcher: Dispatcher[Int] =
Dispatcher("in-memory-key-value-participant-state", zeroIndex = 0, headAtInitialization = 0)

private val NamespaceLogEntries = "L"

private val sequentialLogEntryId = new SequentialLogEntryId(NamespaceLogEntries)

private def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())

Expand All @@ -136,3 +126,27 @@ final class InMemoryLedgerReaderWriter(
LedgerRecord(Offset(Array(index.toLong)), logEntry.entryId, logEntry.payload)
}
}

object InMemoryLedgerReaderWriter {
type Index = Int

private val StartIndex: Index = 0

private val NamespaceLogEntries = "L"

private val sequentialLogEntryId = new SequentialLogEntryId(NamespaceLogEntries)

def owner(
ledgerId: LedgerId,
participantId: ParticipantId,
)(implicit executionContext: ExecutionContext): ResourceOwner[InMemoryLedgerReaderWriter] =
for {
dispatcher <- ResourceOwner.forCloseable(
() =>
Dispatcher(
"in-memory-key-value-participant-state",
zeroIndex = StartIndex,
headAtInitialization = StartIndex,
))
} yield new InMemoryLedgerReaderWriter(ledgerId, participantId, dispatcher)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
object Main extends App {
Runner(
"In-Memory Ledger",
(ledgerId, participantId) => new InMemoryLedgerReaderWriter(ledgerId, participantId),
(ledgerId, participantId) => InMemoryLedgerReaderWriter.owner(ledgerId, participantId),
).run(args)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@ package com.daml.ledger.on.memory
import java.time.Clock

import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp
import com.digitalasset.resources.ResourceOwner

class InMemoryLedgerReaderWriterIntegrationSpec
extends ParticipantStateIntegrationSpecBase(
"In-memory participant state via simplified API implementation") {

override def participantStateFactory(
participantId: ParticipantId,
ledgerId: LedgerString): ReadService with WriteService with AutoCloseable = {
val readerWriter = new InMemoryLedgerReaderWriter(ledgerId, participantId)
new KeyValueParticipantState(readerWriter, readerWriter)
}
ledgerId: LedgerString,
): ResourceOwner[ParticipantState] =
InMemoryLedgerReaderWriter
.owner(ledgerId, participantId)
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))

override def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())
Expand Down
4 changes: 4 additions & 0 deletions ledger/ledger-on-posix-filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ da_scala_library(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"//libs-scala/resources",
"//libs-scala/timer-utils",
"@maven//:com_github_scopt_scopt_2_12",
"@maven//:com_google_protobuf_protobuf_java",
Expand All @@ -55,6 +56,8 @@ da_scala_library(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"//libs-scala/direct-execution-context",
"//libs-scala/resources",
],
)

Expand All @@ -76,6 +79,7 @@ da_scala_test(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"//libs-scala/resources",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.ledger.api.health.{HealthStatus, Healthy}
import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher
import com.digitalasset.platform.akkastreams.dispatcher.SubSource.OneAfterAnother
import com.digitalasset.resources.ResourceOwner
import com.google.protobuf.ByteString

import scala.collection.JavaConverters._
Expand All @@ -33,10 +34,10 @@ class FileSystemLedgerReaderWriter private (
ledgerId: LedgerId,
override val participantId: ParticipantId,
root: Path,
dispatcher: Dispatcher[Index],
)(implicit executionContext: ExecutionContext)
extends LedgerReader
with LedgerWriter
with AutoCloseable {
with LedgerWriter {

// used as the ledger lock; when committing, only one commit owns the lock at a time
private val lockPath = root.resolve("lock")
Expand All @@ -56,27 +57,16 @@ class FileSystemLedgerReaderWriter private (

private val engine = Engine()

private val dispatcher: Dispatcher[Index] =
Dispatcher(
"posix-filesystem-participant-state",
zeroIndex = StartOffset,
headAtInitialization = StartOffset,
)

override def currentHealth(): HealthStatus = Healthy

override def close(): Unit = {
dispatcher.close()
}

override def retrieveLedgerId(): LedgerId = ledgerId

override def events(offset: Option[Offset]): Source[LedgerRecord, NotUsed] =
dispatcher
.startingAt(
offset
.map(_.components.head.toInt)
.getOrElse(StartOffset),
.getOrElse(StartIndex),
OneAfterAnother[Index, immutable.Seq[LedgerRecord]](
(index: Index, _) => index + 1,
(index: Index) => Future.successful(immutable.Seq(retrieveLogEntry(index))),
Expand Down Expand Up @@ -150,7 +140,7 @@ class FileSystemLedgerReaderWriter private (
Files.readAllLines(logHeadPath).get(0).toInt
} catch {
case _: NoSuchFileException =>
StartOffset
StartIndex
}
}

Expand Down Expand Up @@ -180,7 +170,7 @@ class FileSystemLedgerReaderWriter private (
}
}

private def createDirectories(): Future[Unit] = Future {
private def createDirectories(): Unit = {
Files.createDirectories(root)
Files.createDirectories(logDirectory)
Files.createDirectories(logEntriesDirectory)
Expand All @@ -193,14 +183,24 @@ class FileSystemLedgerReaderWriter private (
object FileSystemLedgerReaderWriter {
type Index = Int

private val StartOffset: Index = 0
private val StartIndex: Index = 0

def apply(
def owner(
ledgerId: LedgerId,
participantId: ParticipantId,
root: Path,
)(implicit executionContext: ExecutionContext): Future[FileSystemLedgerReaderWriter] = {
val ledger = new FileSystemLedgerReaderWriter(ledgerId, participantId, root)
ledger.createDirectories().map(_ => ledger)
}
)(implicit executionContext: ExecutionContext): ResourceOwner[FileSystemLedgerReaderWriter] =
for {
dispatcher <- ResourceOwner.forCloseable(
() =>
Dispatcher(
"posix-filesystem-participant-state",
zeroIndex = StartIndex,
headAtInitialization = StartIndex,
))
} yield {
val participant = new FileSystemLedgerReaderWriter(ledgerId, participantId, root, dispatcher)
participant.createDirectories()
participant
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@ import java.io.File
import java.nio.file.Path

import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.app.{Config, KeyValueLedger, LedgerFactory, Runner}
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.ResourceOwner
import scopt.OptionParser

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt

object Main extends App {
Runner("File System Ledger", FileSystemLedgerFactory).run(args)

case class ExtraConfig(root: Option[Path])

object FileSystemLedgerFactory extends LedgerFactory[ExtraConfig] {
object FileSystemLedgerFactory extends LedgerFactory[FileSystemLedgerReaderWriter, ExtraConfig] {
override val defaultExtraConfig: ExtraConfig = ExtraConfig(
root = None,
)
Expand All @@ -34,19 +33,18 @@ object Main extends App {
()
}

override def apply(ledgerId: LedgerId, participantId: ParticipantId, config: ExtraConfig)(
implicit materializer: Materializer,
): KeyValueLedger = {
override def owner(
ledgerId: LedgerId,
participantId: ParticipantId,
config: ExtraConfig,
)(implicit materializer: Materializer): ResourceOwner[FileSystemLedgerReaderWriter] = {
val root = config.root.getOrElse {
throw new IllegalStateException("No root directory provided.")
}
Await.result(
FileSystemLedgerReaderWriter(
ledgerId = ledgerId,
participantId = participantId,
root = root,
),
10.seconds,
FileSystemLedgerReaderWriter.owner(
ledgerId = ledgerId,
participantId = participantId,
root = root,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,33 @@

package com.daml.ledger.on.filesystem.posix

import java.nio.file.Files
import java.nio.file.{Files, Path}

import com.daml.ledger.on.filesystem.posix.DeleteFiles.deleteFiles
import com.daml.ledger.participant.state.kvutils.app.Runner
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.resources.Resource

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}

object MainWithEphemeralDirectory extends App {
val root = Files.createTempDirectory("ledger-on-posix-filesystem-ephemeral")
implicit val executionContext: ExecutionContext = DirectExecutionContext

try {
Runner(
val root = Files.createTempDirectory("ledger-on-posix-filesystem-ephemeral-")

for {
root <- Resource[Path](
Future.successful(root),
directory => Future.successful(deleteFiles(directory)),
)
_ <- Runner(
"Ephemeral File System Ledger",
(ledgerId, participantId) =>
Await.result(
FileSystemLedgerReaderWriter(
ledgerId = ledgerId,
participantId = participantId,
root = root,
),
10.seconds)
FileSystemLedgerReaderWriter.owner(
ledgerId = ledgerId,
participantId = participantId,
root = root,
)
).run(args)
} finally {
deleteFiles(root)
}
} yield ()
}
Loading

0 comments on commit e13f9a7

Please sign in to comment.