Skip to content

Commit

Permalink
Support multi-participant DAML script (digital-asset#3605)
Browse files Browse the repository at this point in the history
* Support multi-participant DAML script

fixes digital-asset#3555

CHANGELOG_BEGIN

- [DAML Script - Experimental] DAML script can now run be used in distributed topologies.

CHANGELOG_END

* Fix ports in multiparticipants tests
  • Loading branch information
cocreature authored Nov 25, 2019
1 parent 26b569f commit 2705116
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 137 deletions.
9 changes: 8 additions & 1 deletion daml-script/daml/Daml/Script.daml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Daml.Script
, submitMustFail
, query
, allocateParty
, allocatePartyOn
, Commands
, createCmd
, exerciseCmd
Expand Down Expand Up @@ -84,13 +85,19 @@ query p = Script $ Free $ Query (QueryACS p (templateTypeRep @t) (pure . map (\(

data AllocateParty a = AllocateParty
{ displayName : Text
, participant : Optional Text
, continue : Party -> a
} deriving Functor

-- | Allocate a party with the given display name
-- using the party management service.
allocateParty : Text -> Script Party
allocateParty displayName = Script $ Free (AllocParty $ AllocateParty displayName pure)
allocateParty displayName = Script $ Free (AllocParty $ AllocateParty displayName None pure)

-- | Allocate a party with the given display name
-- using the party management service.
allocatePartyOn : Text -> Text -> Script Party
allocatePartyOn displayName participant = Script $ Free (AllocParty $ AllocateParty displayName (Some participant) pure)

data SubmitFailure = SubmitFailure
{ status : Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.digitalasset.daml.lf.speedy.SValue._
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.daml.lf.value.json.ApiCodecCompressed
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.refinements.ApiTypes.ApplicationId
import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
Expand All @@ -41,6 +42,9 @@ import com.digitalasset.ledger.api.v1.transaction_filter.{
InclusiveFilters
}
import com.digitalasset.ledger.client.LedgerClient
import com.digitalasset.ledger.client.LedgerClient
import com.digitalasset.ledger.client.configuration.LedgerClientConfiguration

import com.digitalasset.ledger.client.services.commands.CommandUpdater

object LfValueCodec extends ApiCodecCompressed[AbsoluteContractId](false, false) {
Expand All @@ -53,6 +57,82 @@ object LfValueCodec extends ApiCodecCompressed[AbsoluteContractId](false, false)
}
}

case class Participant(participant: String)
case class Party(party: String)
case class ApiParameters(host: String, port: Int)
case class Participants[T](
default_participant: Option[T],
participants: Map[Participant, T],
party_participants: Map[Party, Participant],
) {
def getPartyParticipant(party: Party): Either[String, T] =
party_participants.get(party) match {
case None =>
default_participant.fold[Either[String, T]](
Left(s"No participant for party $party and no default participant"))(Right(_))
case Some(participant) => getParticipant(Some(participant))
}
def getParticipant(participantOpt: Option[Participant]): Either[String, T] =
participantOpt match {
case None =>
default_participant.fold[Either[String, T]](Left(s"No default participant"))(Right(_))
case Some(participant) =>
participants.get(participant) match {
case None =>
default_participant.fold[Either[String, T]](
Left(s"No participant $participant and no default participant"))(Right(_))
case Some(t) => Right(t)
}
}
}

object ParticipantsJsonProtocol extends DefaultJsonProtocol {
implicit object ParticipantFormat extends JsonFormat[Participant] {
def read(value: JsValue) = value match {
case JsString(s) => Participant(s)
case _ => deserializationError("Expected Participant string")
}
def write(p: Participant) = JsString(p.participant)
}
implicit object PartyFormat extends JsonFormat[Party] {
def read(value: JsValue) = value match {
case JsString(s) => Party(s)
case _ => deserializationError("Expected Party string")
}
def write(p: Party) = JsString(p.party)
}
implicit val apiParametersFormat = jsonFormat2(ApiParameters)
implicit val participantsFormat = jsonFormat3(Participants[ApiParameters])
}

object Runner {
private def connectApiParameters(params: ApiParameters, clientConfig: LedgerClientConfiguration)(
implicit ec: ExecutionContext,
seq: ExecutionSequencerFactory): Future[LedgerClient] = {
LedgerClient.singleHost(params.host, params.port, clientConfig)
}
// We might want to have one config per participant at some point but for now this should be sufficient.
def connect(
participantParams: Participants[ApiParameters],
clientConfig: LedgerClientConfiguration)(
implicit ec: ExecutionContext,
seq: ExecutionSequencerFactory): Future[Participants[LedgerClient]] = {
for {
// The standard library is incredibly weird. Option is not Traversable so we have to convert to a list and back.
// Map is but it doesn’t return a Map so we have to call toMap afterwards.
defaultClient <- Future
.traverse(participantParams.default_participant.toList)(x =>
connectApiParameters(x, clientConfig))
.map(_.headOption)
participantClients <- Future
.traverse(participantParams.participants: Map[Participant, ApiParameters])({
case (k, v) => connectApiParameters(v, clientConfig).map((k, _))
})
.map(_.toMap)
} yield Participants(defaultClient, participantClients, participantParams.party_participants)
}
}

class Runner(
dar: Dar[(PackageId, Package)],
applicationId: ApplicationId,
Expand Down Expand Up @@ -142,9 +222,13 @@ class Runner(
SubmitAndWaitRequest(Some(commandUpdater.applyOverrides(commands)))
}

def run(client: LedgerClient, scriptId: Identifier, inputValue: Option[JsValue])(
def run(
initialClients: Participants[LedgerClient],
scriptId: Identifier,
inputValue: Option[JsValue])(
implicit ec: ExecutionContext,
mat: ActorMaterializer): Future[SValue] = {
var clients = initialClients
val scriptTy = darMap
.get(scriptId.packageId)
.flatMap(_.lookupIdentifier(scriptId.qualifiedName).toOption) match {
Expand Down Expand Up @@ -236,8 +320,10 @@ class Runner(
val requestOrErr = for {
party <- Converter.toParty(vals.get(0))
commands <- Converter.toCommands(compiledPackages, freeAp)
} yield toSubmitRequest(client.ledgerId, party, commands)
val request = requestOrErr.fold(s => throw new ConverterException(s), identity)
client <- clients.getPartyParticipant(Party(party.value))
} yield (client, toSubmitRequest(client.ledgerId, party, commands))
val (client, request) =
requestOrErr.fold(s => throw new ConverterException(s), identity)
val f =
client.commandServiceClient
.submitAndWaitForTransactionTree(request)
Expand Down Expand Up @@ -281,10 +367,14 @@ class Runner(
val filterOrErr = for {
party <- Converter.toParty(vals.get(0))
tplId <- Converter.typeRepToIdentifier(vals.get(1))
client <- clients.getPartyParticipant(Party(party.value))
} yield
TransactionFilter(
List((party.value, Filters(Some(InclusiveFilters(Seq(tplId)))))).toMap)
val filter = filterOrErr.fold(s => throw new ConverterException(s), identity)
(
client,
TransactionFilter(
List((party.value, Filters(Some(InclusiveFilters(Seq(tplId)))))).toMap))
val (client, filter) =
filterOrErr.fold(s => throw new ConverterException(s), identity)
val acsResponses = client.activeContractSetClient
.getActiveContracts(filter, verbose = true)
.runWith(Sink.seq)
Expand All @@ -304,16 +394,34 @@ class Runner(
}
case SVariant(_, "AllocParty", v) => {
v match {
case SRecord(_, _, vals) if vals.size == 2 => {
case SRecord(_, _, vals) if vals.size == 3 => {
val displayName = vals.get(0) match {
case SText(value) => value
case v => throw new ConverterException(s"Expected SText but got $v")
}
val continue = vals.get(1)
val participantName = vals.get(1) match {
case SOptional(Some(SText(t))) => Some(Participant(t))
case SOptional(None) => None
case v => throw new ConverterException(s"Expected SOptional(SText) but got $v")
}
val client = clients.getParticipant(participantName) match {
case Left(err) => throw new RuntimeException(err)
case Right(client) => client
}
val continue = vals.get(2)
val f =
client.partyManagementClient.allocateParty(None, Some(displayName))
f.flatMap(allocRes => {
val party = allocRes.party
participantName match {
case None => {
// If no participant is specified, we use default_participant so we don’t need to change anything.
}
case Some(participant) =>
clients =
clients.copy(party_participants = clients.party_participants + (Party(
party) -> participant))
}
machine.ctrl =
Speedy.CtrlExpr(SEApp(SEValue(continue), Array(SEValue(SParty(party)))))
go()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import com.digitalasset.platform.services.time.TimeProviderType
case class RunnerConfig(
darPath: File,
scriptIdentifier: String,
ledgerHost: String,
ledgerPort: Int,
ledgerHost: Option[String],
ledgerPort: Option[Int],
participantConfig: Option[File],
timeProviderType: TimeProviderType,
commandTtl: Duration,
inputFile: Option[File],
Expand All @@ -33,15 +34,20 @@ object RunnerConfig {
.text("Identifier of the script that should be run in the format Module.Name:Entity.Name")

opt[String]("ledger-host")
.required()
.action((t, c) => c.copy(ledgerHost = t))
.optional()
.action((t, c) => c.copy(ledgerHost = Some(t)))
.text("Ledger hostname")

opt[Int]("ledger-port")
.required()
.action((t, c) => c.copy(ledgerPort = t))
.optional()
.action((t, c) => c.copy(ledgerPort = Some(t)))
.text("Ledger port")

opt[File]("participant-config")
.optional()
.action((t, c) => c.copy(participantConfig = Some(t)))
.text("File containing the participant configuration in JSON format")

opt[Unit]('w', "wall-clock-time")
.action { (t, c) =>
c.copy(timeProviderType = TimeProviderType.WallClock)
Expand All @@ -59,15 +65,28 @@ object RunnerConfig {
c.copy(inputFile = Some(t))
}
.text("Path to a file containing the input value for the script in JSON format.")

checkConfig(c => {
if (c.ledgerHost.isDefined != c.ledgerPort.isDefined) {
failure("Must specify both --ledger-host and --ledger-port")
} else if (c.ledgerHost.isDefined && c.participantConfig.isDefined) {
failure("Cannot specify both --ledger-host and --participant-config")
} else if (c.ledgerHost.isEmpty && c.participantConfig.isEmpty) {
failure("Must specify either --ledger-host or --participant-config")
} else {
success
}
})
}
def parse(args: Array[String]): Option[RunnerConfig] =
parser.parse(
args,
RunnerConfig(
darPath = null,
scriptIdentifier = null,
ledgerHost = "",
ledgerPort = 0,
ledgerHost = None,
ledgerPort = None,
participantConfig = None,
timeProviderType = TimeProviderType.Static,
commandTtl = Duration.ofSeconds(30L),
inputFile = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.digitalasset.daml.lf.language.Ast.Package
import com.digitalasset.daml_lf_dev.DamlLf
import com.digitalasset.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.digitalasset.ledger.api.refinements.ApiTypes.ApplicationId
import com.digitalasset.ledger.client.LedgerClient
import com.digitalasset.ledger.client.configuration.{
CommandClientConfiguration,
LedgerClientConfiguration,
Expand Down Expand Up @@ -80,9 +79,29 @@ object RunnerMain {
})

val runner = new Runner(dar, applicationId, commandUpdater)
val participantParams = config.participantConfig match {
case Some(file) => {
val source = Source.fromFile(file)
val fileContent = try {
source.mkString
} finally {
source.close
}
val jsVal = fileContent.parseJson
import ParticipantsJsonProtocol._
jsVal.convertTo[Participants[ApiParameters]]
}
case None =>
Participants(
default_participant =
Some(ApiParameters(config.ledgerHost.get, config.ledgerPort.get)),
participants = Map.empty,
party_participants = Map.empty)
}
println(participantParams)
val flow: Future[Unit] = for {
client <- LedgerClient.singleHost(config.ledgerHost, config.ledgerPort, clientConfig)
_ <- runner.run(client, scriptId, inputValue)
clients <- Runner.connect(participantParams, clientConfig)
_ <- runner.run(clients, scriptId, inputValue)
} yield ()

flow.onComplete(_ => system.terminate())
Expand Down
Loading

0 comments on commit 2705116

Please sign in to comment.