Skip to content

Commit

Permalink
Sandbox Sql fixture (digital-asset#247)
Browse files Browse the repository at this point in the history
* SandboxSQL fixture + fixing various issues

WIP - playing around with Resources to support Sandbox Fixture with Postgres

Sandbox with Postgres fixture works - LedgerIdentityServiceGivenIT PASSES

adding guards for empty collections in PostgresLedgerDao

CommandCompletionServiceITi passed for both fixtures

fixing EventID formatting problem

another formatting fix

changed wrong autoCommit behaviour

CommandStaticTimeIT passed on both ledgers

rolling back transaction on duplicates

dealing with duplicate commands in postgres

fixing mismapped record_time

adding extra logging for fatal errors

disabling sql support as it's not finished yet

cleanup

removed unnecessary test classes

cleanup

formatting

* ADT was missing Product with Serializable

* fixed the compilation error
  • Loading branch information
gabor-aranyossy authored Apr 10, 2019
1 parent 233b814 commit 094b4b6
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

package com.digitalasset.ledger.api.testing.utils

final case class ProjectedResource[Source, +Target](
final case class MappedResource[Source, +Target](
underlying: Resource[Source],
transform: Source => Target)
extends Resource[Target] {

override def value: Target = transform(underlying.value)

override def setup(): Unit = underlying.setup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ trait Resource[+Value] extends AutoCloseable {
*/
def setup(): Unit

def map[Target](transform: Value => Target): Resource[Target] = ProjectedResource(this, transform)
def map[Target](f: Value => Target): Resource[Target] = MappedResource(this, f)

def derive[Target: ClassTag](
transform: Value => Target,
Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-api-integration-tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
'//ledger/ledger-api-common:ledger-api-common',
'//ledger/ledger-api-client:ledger-api-client',
'//ledger/sandbox:sandbox',
'//ledger/sandbox:sandbox-scala-tests-lib',
'//3rdparty/jvm/com/google/guava:guava',
'//3rdparty/jvm/io/grpc:grpc_services',
'//3rdparty/jvm/com/typesafe/scala_logging:scala_logging',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.scalatest.{AsyncWordSpec, Matchers, OptionValues}
@SuppressWarnings(Array("org.wartremover.warts.Any"))
class PackageServiceIT
extends AsyncWordSpec
with AkkaBeforeAndAfterAll
with MultiLedgerFixture
with AkkaBeforeAndAfterAll
with SuiteResourceManagementAroundAll
with AsyncTimeLimitedTests
with Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import scala.concurrent.duration.{FiniteDuration, _}
object PlatformApplications {

/**
* Meant to be a simple common denominator between sandbox and LS
* configuration. The constructor is private to avoid using
* Meant to be a simple common denominator for fixture configuration. The constructor is private to avoid using
* exceptions for validation.
*
* In the companion object add more smart constructors with
Expand Down Expand Up @@ -95,7 +94,7 @@ object PlatformApplications {
def default: Config = defaultWithLedgerId(Some(defaultLedgerId))
}

def sandboxApplication(config: Config) = {
def sandboxApplication(config: Config, jdbcUrl: Option[String]) = {
val selectedPort = 0

val sandboxCommandConfig = SandboxConfig.defaultCommandConfig
Expand All @@ -112,7 +111,7 @@ object PlatformApplications {
tlsConfig = None,
ledgerIdMode =
config.ledgerId.fold[LedgerIdMode](LedgerIdMode.Random)(LedgerIdMode.HardCoded),
jdbcUrl = None
jdbcUrl = jdbcUrl
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ package com.digitalasset.platform.apitesting
sealed abstract class LedgerBackend extends Product with Serializable

object LedgerBackend {
case object Sandbox extends LedgerBackend
case object SandboxInMemory extends LedgerBackend
case object SandboxSql extends LedgerBackend

val allBackends: Set[LedgerBackend] = Set(Sandbox)
val allBackends: Set[LedgerBackend] = Set(SandboxInMemory, SandboxSql)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.testing.utils.Resource
import com.digitalasset.platform.PlatformApplications
import com.digitalasset.platform.apitesting.LedgerFactories.SandboxStore.InMemory
import com.digitalasset.platform.damllf.PackageParser
import com.digitalasset.platform.sandbox.SandboxApplication.SandboxServer
import com.digitalasset.platform.sandbox.persistence.{PostgresFixture, PostgresResource}

import scala.util.control.NonFatal

Expand All @@ -37,18 +40,61 @@ object LedgerFactories {
private def getPackageIdOrThrow(path: Path): Ref.PackageId =
getPackageId(path).fold(t => throw t, identity)

def createSandboxResource(config: PlatformApplications.Config)(
sealed abstract class SandboxStore extends Product with Serializable

object SandboxStore {

case object InMemory extends SandboxStore

case object Postgres extends SandboxStore

}

def createSandboxResource(config: PlatformApplications.Config, store: SandboxStore = InMemory)(
implicit esf: ExecutionSequencerFactory): Resource[LedgerContext.SingleChannelContext] = {
val packageIds = config.darFiles.map(getPackageIdOrThrow)
SandboxServerResource(PlatformApplications.sandboxApplication(config)).map {
case PlatformChannels(channel) =>
LedgerContext.SingleChannelContext(channel, config.ledgerId, packageIds)

def createResource(server: SandboxServer) =
SandboxServerResource(server).map {
case PlatformChannels(channel) =>
LedgerContext.SingleChannelContext(channel, config.ledgerId, packageIds)
}

store match {
case SandboxStore.InMemory =>
createResource(PlatformApplications.sandboxApplication(config, None))
case SandboxStore.Postgres =>
new Resource[LedgerContext.SingleChannelContext] {
@volatile
private var postgres: Resource[PostgresFixture] = null

@volatile
private var sandbox: Resource[LedgerContext.SingleChannelContext] = null

override def value(): LedgerContext.SingleChannelContext = sandbox.value

override def setup(): Unit = {
postgres = PostgresResource()
postgres.setup()
sandbox = createResource(
PlatformApplications.sandboxApplication(config, Some(postgres.value.jdbcUrl)))
sandbox.setup()
}

override def close(): Unit = {
sandbox.close()
postgres.close()

sandbox = null
postgres = null
}
}
}

}

def createRemoteServerResource(config: PlatformApplications.Config, host: String, port: Int)(
implicit esf: ExecutionSequencerFactory): Resource[LedgerContext.SingleChannelContext] = {

val packageIds = config.darFiles.map(getPackageIdOrThrow)
RemoteServerResource(host, port).map {
case PlatformChannels(channel) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.digitalasset.platform.apitesting

import com.digitalasset.ledger.api.testing.utils.Resource
import com.digitalasset.platform.PlatformApplications
import com.digitalasset.platform.apitesting.LedgerFactories.SandboxStore
import com.digitalasset.platform.esf.TestExecutionSequencerFactory
import org.scalatest.AsyncTestSuite

Expand All @@ -22,14 +23,17 @@ trait MultiLedgerFixture
protected def basePort = 6865

/** Overriding this provides an easy way to narrow down testing to a single implementation. */
override protected def fixtureIdsEnabled: Set[LedgerBackend] = LedgerBackend.allBackends
override protected def fixtureIdsEnabled: Set[LedgerBackend] =
Set(LedgerBackend.SandboxInMemory) //TODO: enable the SQL one as well once we have all ITs working with it

override protected def constructResource(
index: Int,
fixtureId: LedgerBackend): Resource[LedgerContext] = {
fixtureId match {
case LedgerBackend.Sandbox =>
LedgerFactories.createSandboxResource(config)
case LedgerBackend.SandboxInMemory =>
LedgerFactories.createSandboxResource(config, SandboxStore.InMemory)
case LedgerBackend.SandboxSql =>
LedgerFactories.createSandboxResource(config, SandboxStore.Postgres)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ object SandboxApplication {
case None => ("in-memory", Ledger.inMemory(ledgerId, timeProvider, acs, records))
case Some(jdbcUrl) =>
sys.error("Postgres persistence is not supported yet.") //TODO: remove this when we do
// val ledgerF = Ledger.postgres(jdbcUrl, ledgerId, timeProvider, records)
// val ledger = Try(Await.result(ledgerF, asyncTolerance))
// .getOrElse(sys.error("Could not start PostgreSQL persistence layer"))
// (s"sql", ledger)
// val ledgerF = Ledger.postgres(jdbcUrl, ledgerId, timeProvider, records)
// val ledger = Try(Await.result(ledgerF, asyncTolerance))
// .getOrElse(sys.error("Could not start PostgreSQL persistence layer"))
// (s"sql", ledger)
}

val ledgerBackend = new SandboxLedgerBackend(ledger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.sandbox.config.LedgerIdGenerator
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.PersistenceResponse.{Duplicate, Ok}
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{
Contract,
LedgerDao,
Expand Down Expand Up @@ -84,20 +85,23 @@ private class SqlLedger(
createPersistenceQueue()

private def createPersistenceQueue(): SourceQueueWithComplete[Long => LedgerEntry] = {
val offsetGenerator: Source[Long, NotUsed] =
Source.fromIterator(() => Iterator.iterate(headAtInitialization)(l => nextOffset(l)))

val persistenceQueue = Source.queue[Long => LedgerEntry](128, OverflowStrategy.backpressure)
implicit val ec: ExecutionContext = DirectExecutionContext
persistenceQueue
.zipWith(offsetGenerator)((f, offset) => offset -> f(offset))
.mapAsync(1) {
case (offset, ledgerEntry) => //strictly one after another!
val newLedgerEnd = nextOffset(offset)
for {
_ <- ledgerDao.storeLedgerEntry(offset, newLedgerEnd, ledgerEntry)
_ = dispatcher.signalNewHead(newLedgerEnd) //signalling downstream subscriptions
_ = headRef = newLedgerEnd //updating the headRef
} yield ()
.mapAsync(1) { ledgerEntryGen => //strictly one after another!
val offset = headRef // we can only do this because there is not parallelism here!
val ledgerEntry = ledgerEntryGen(offset)
val newLedgerEnd = nextOffset(offset)
ledgerDao
.storeLedgerEntry(offset, newLedgerEnd, ledgerEntry)
.map {
case Ok =>
headRef = newLedgerEnd //updating the headRef
dispatcher.signalNewHead(newLedgerEnd) //signalling downstream subscriptions
case Duplicate =>
() //we are staying with offset we had
}(DirectExecutionContext)
}
.toMat(Sink.ignore)(Keep.left[SourceQueueWithComplete[Long => LedgerEntry], Future[Done]])
.run()
Expand Down Expand Up @@ -159,14 +163,16 @@ private class SqlLedger(
val transactionId = offset.toString
val toAbsCoid: ContractId => AbsoluteContractId =
SandboxEventIdFormatter.makeAbsCoid(transactionId)

val mappedTx = tx.transaction
.mapContractIdAndValue(toAbsCoid, _.mapContractId(toAbsCoid))
.mapNodeId(_.index.toString)
.mapNodeId(SandboxEventIdFormatter.fromTransactionId(transactionId, _))

val mappedDisclosure = tx.blindingInfo.explicitDisclosure
.map {
case (nodeId, party) =>
nodeId.index.toString -> party.map(_.underlyingString)
SandboxEventIdFormatter.fromTransactionId(transactionId, nodeId) -> party.map(
_.underlyingString)
}

LedgerEntry.Transaction(
Expand All @@ -176,7 +182,7 @@ private class SqlLedger(
tx.submitter,
tx.workflowId,
tx.ledgerEffectiveTime,
tx.maximumRecordTime,
Instant.now(),
mappedTx,
mappedDisclosure
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ object Contract {

case class LedgerSnapshot(offset: Long, acs: Source[Contract, NotUsed])

sealed abstract class PersistenceResponse extends Product with Serializable

object PersistenceResponse {

case object Ok extends PersistenceResponse

case object Duplicate extends PersistenceResponse

}

trait LedgerDao {

/** Looks up the ledger id */
Expand All @@ -59,9 +69,10 @@ trait LedgerDao {
* @param offset the offset to look at
* @return the LedgerEntry found, or throws an exception
*/
def lookupLedgerEntryAssert(offset: Long): Future[LedgerEntry] =
def lookupLedgerEntryAssert(offset: Long): Future[LedgerEntry] = {
lookupLedgerEntry(offset).map(
_.getOrElse(sys.error(s"ledger entry not found for offset: $offset")))(DirectExecutionContext)
}

/**
* Returns a snapshot of the ledger.
Expand Down Expand Up @@ -89,10 +100,14 @@ trait LedgerDao {
* Stores a ledger entry. The ledger end gets updated as well in the same transaction.
* WARNING: this code cannot be run concurrently on subsequent entry persistence operations!
*
* @param offset the offset to store the ledger entry
* @param offset the offset to store the ledger entry
* @param newLedgerEnd the new ledger end, valid after this operation finishes
* @param ledgerEntry the LedgerEntry to be stored
* @param ledgerEntry the LedgerEntry to be stored
* @return Ok when the operation was successful otherwise a Duplicate
*/
def storeLedgerEntry(offset: Long, newLedgerEnd: Long, ledgerEntry: LedgerEntry): Future[Unit]
def storeLedgerEntry(
offset: Long,
newLedgerEnd: Long,
ledgerEntry: LedgerEntry): Future[PersistenceResponse]

}
Loading

0 comments on commit 094b4b6

Please sign in to comment.