Skip to content

Commit

Permalink
[JSON-API] Port of http-perf-test to sandbox(aka next) (#11543)
Browse files Browse the repository at this point in the history
* Changes to migrate http-perf-test back to sandbox with more parallelization for single user scenarios.
Increased parallelization is due to the architectural changes in sandbox where it uses
a tick every 100 millis to trigger stuff/data to be available on the read side

CHANGELOG_BEGIN
CHANGELOG_END

* Parallelization fixes for scenarios ExerciseCommand and SyncQueryNewAcs scenarios

* refactor sequential scenario run, make query part of SyncQueryVariableAcs run with single user
  • Loading branch information
akshayshirahatti-da authored Nov 9, 2021
1 parent 56a6db8 commit 8d2b1b9
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ class AsyncQueryConstantAcs
pause(1.second)
}
.exec(
1.to(numberOfRuns).map(runId => exec(query(runId)))
1.to(numberOfRuns / defaultNumUsers).map(runId => exec(query(runId)))
)

setUp(
fillAcsScenario(wantedAcsSize, silent = true).inject(atOnceUsers(1)),
asyncQueryScenario.inject(atOnceUsers(1)),
fillAcsScenario(wantedAcsSize, silent = true).inject(atOnceUsers(defaultNumUsers)).andThen {
asyncQueryScenario.inject(atOnceUsers(defaultNumUsers))
}
).protocols(httpProtocol)

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ class CreateAndExerciseCommand extends Simulation with SimulationConfig {
}
}"""

private val numberOfRuns = 2000
private val request = http("CreateAndExerciseCommand")
.post("/v1/create-and-exercise")
.body(StringBody(jsonCommand))

private val scn = scenario("CreateAndExerciseCommandScenario")
.repeat(2000)(exec(request.silent)) // server warmup
.repeat(2000)(exec(request))
.repeat(numberOfRuns / defaultNumUsers)(exec(request.silent)) // server warmup
.repeat(numberOfRuns / defaultNumUsers)(exec(request))

setUp(
scn.inject(atOnceUsers(1))
scn.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ class CreateCommand extends Simulation with SimulationConfig {
}
}"""

private val numberOfRuns = 1000
private val request = http("CreateCommand")
.post("/v1/create")
.body(StringBody(jsonCommand))

private val scn = scenario("CreateCommandScenario")
.repeat(1000)(exec(request))
.repeat(numberOfRuns / defaultNumUsers)(exec(request))

setUp(
scn.inject(atOnceUsers(1))
scn.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ExerciseCommand extends Simulation with SimulationConfig {
}
}"""

private val numberOfRuns = 2000
private val createRequest = http("CreateCommand")
.post("/v1/create")
.body(StringBody(createCommand))
Expand All @@ -37,18 +38,24 @@ class ExerciseCommand extends Simulation with SimulationConfig {
.post("/v1/exercise")
.body(StringBody(exerciseCommand))

private val scn = scenario("ExerciseCommandScenario")
.repeat(2000)(exec(createRequest.silent)) // populate the ACS
.exec(
// retrieve all contractIds
http("GetACS")
.get("/v1/query")
.check(status.is(200), jsonPath("$.result[*].contractId").findAll.saveAs("contractIds"))
.silent
private val scn = scenario("ExerciseCommandScenario-Create")
.repeat(numberOfRuns / defaultNumUsers)(exec(createRequest.silent)) // populate the ACS

private val queryScn = scenario("ExerciseCommandScenario-QueryAndExercise")
.repeat(1)(
exec(
// retrieve all contractIds
http("GetACS")
.get("/v1/query")
.check(status.is(200), jsonPath("$.result[*].contractId").findAll.saveAs("contractIds"))
.silent
)
)
.foreach("${contractIds}", "contractId")(exec(exerciseRequest))

setUp(
scn.inject(atOnceUsers(1))
scn
.inject(atOnceUsers(defaultNumUsers))
.andThen(queryScn.inject(atOnceUsers(1)))
).protocols(httpProtocol)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.gatling.http.check.HttpCheck
import io.gatling.http.request.builder.HttpRequestBuilder

private[scenario] trait HasCreateRequest {
this: HasRandomAmount =>
this: HasRandomAmount with SimulationConfig =>

private lazy val acsQueue: BlockingQueue[String] = new LinkedBlockingQueue[String]()

Expand Down Expand Up @@ -40,7 +40,7 @@ private[scenario] trait HasCreateRequest {

def fillAcsScenario(size: Int, silent: Boolean): ScenarioBuilder =
scenario(s"FillAcsScenario, size: $size")
.doWhile(_ => this.acsSize() < size) {
.repeat(size / defaultNumUsers) {
feed(Iterator.continually(Map("amount" -> randomAmount())))
.group("FillAcsGroup") {
val create =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ private[scenario] trait SimulationConfig {
.authorizationHeader(s"Bearer $jwt")
.contentTypeHeader("application/json")

protected[this] val defaultNumUsers = 10
private lazy val hostAndPort: String = System.getProperty(HostAndPortKey, "localhost:7575")

protected[this] lazy val jwt: String = System.getProperty(JwtKey, aliceJwt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ class SyncQueryConstantAcs extends Simulation with SimulationConfig with HasRand
}"""))

private val scn = scenario("SyncQueryScenario")
.repeat(5000) {
.repeat(5000 / defaultNumUsers) {
// populate the ACS
feed(Iterator.continually(Map("amount" -> randomAmount())))
.exec(createRequest.silent)
}
.repeat(500) {
.repeat(500 / defaultNumUsers) {
// run queries
feed(Iterator.continually(Map("amount" -> randomAmount())))
.exec(queryRequest)
}

setUp(
scn.inject(atOnceUsers(1))
scn.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ class SyncQueryMegaAcs extends Simulation with SimulationConfig with HasRandomAm
scenario(s"SyncQueryMegaScenario $scnName")
.exec(createRequest.silent)
// populate the ACS
.repeat(10, "amount") {
.repeat(10 / defaultNumUsers, "amount") {
feed(Iterator continually env)
.exec(createManyRequest.silent)
}
// run queries
.repeat(500) {
.repeat(500 / defaultNumUsers) {
// unless we request under Alice, we don't get negatives in the DB
def m(amount: Int, reqJwt: String, templateId: String): Record[Any] =
Map("amount" -> amount, "reqJwt" -> reqJwt, "templateId" -> templateId)
Expand All @@ -116,7 +116,7 @@ class SyncQueryMegaAcs extends Simulation with SimulationConfig with HasRandomAm
private val scn = scns.head

setUp(
scn.inject(atOnceUsers(1))
scn.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package com.daml.http.perf.scenario

import io.gatling.core.Predef._
import io.gatling.http.Predef._
import io.gatling.http.check.HttpCheck

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class SyncQueryNewAcs
Expand All @@ -21,24 +24,29 @@ class SyncQueryNewAcs

private val syncQueryNewAcs =
scenario(s"SyncQueryNewAcs, numberOfRuns: $numberOfRuns, ACS size: $wantedAcsSize")
.repeat(numberOfRuns) {
.repeat(numberOfRuns / defaultNumUsers) {
val acsQueue: BlockingQueue[String] = new LinkedBlockingQueue[String]()
val captureContractId: HttpCheck =
jsonPath("$.result.contractId").transform(x => acsQueue.put(x))
group("Populate ACS") {
doWhile(_ => acsSize() < wantedAcsSize) {
doWhile(_ => acsQueue.size() < wantedAcsSize) {
feed(Iterator.continually(Map("amount" -> String.valueOf(randomAmount()))))
.exec(randomAmountCreateRequest.check(status.is(200), captureContractId).silent)
.exec {
randomAmountCreateRequest.check(status.is(200), captureContractId).silent
}
}
}.group("Run Query") {
feed(Iterator.continually(Map("amount" -> String.valueOf(randomAmount()))))
.exec(randomAmountQueryRequest.notSilent)
}.group("Archive ACS") {
doWhile(_ => acsSize() > 0) {
feed(Iterator.continually(Map("archiveContractId" -> removeNextContractIdFromAcs())))
doWhile(_ => acsQueue.size() > 0) {
feed(Iterator.continually(Map("archiveContractId" -> acsQueue.poll())))
.exec(archiveRequest.silent)
}
}
}

setUp(
syncQueryNewAcs.inject(atOnceUsers(1))
syncQueryNewAcs.inject(atOnceUsers(defaultNumUsers))
).protocols(httpProtocol)
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class SyncQueryVariableAcs
}

setUp(
fillAcsScenario(wantedAcsSize, silent = true).inject(atOnceUsers(1)),
syncQueryScenario.inject(atOnceUsers(1)),
fillAcsScenario(wantedAcsSize, silent = true).inject(atOnceUsers(defaultNumUsers)).andThen {
syncQueryScenario.inject(atOnceUsers(1))
}
).protocols(httpProtocol)
}
2 changes: 1 addition & 1 deletion ledger-service/http-json-testing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ hj_scalacopts = lf_scalacopts + [
"//ledger/participant-integration-api",
"//ledger/sandbox",
"//ledger/sandbox:sandbox-scala-tests-lib",
"//ledger/sandbox-classic",
"//ledger/sandbox-common",
"//ledger/sandbox-common:sandbox-common-scala-tests-lib",
"//libs-scala/contextualized-logging",
"//libs-scala/db-utils",
"//libs-scala/ports",
"//libs-scala/resources",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ import com.daml.ledger.client.configuration.{
LedgerIdRequirement,
}
import com.daml.ledger.client.withoutledgerid.{LedgerClient => DamlLedgerClient}
import com.daml.ledger.resources.ResourceContext
import com.daml.logging.LoggingContextOf
import com.daml.metrics.Metrics
import com.daml.platform.apiserver.SeedService.Seeding
import com.daml.platform.common.LedgerIdMode
import com.daml.platform.sandbox
import com.daml.platform.sandbox.SandboxServer
import com.daml.platform.sandbox.SandboxBackend
import com.daml.platform.sandbox.config.SandboxConfig
import com.daml.platform.sandboxnext.Runner
import com.daml.platform.services.time.TimeProviderType
import com.daml.ports.Port
import com.typesafe.scalalogging.LazyLogging
Expand Down Expand Up @@ -150,19 +151,35 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
useTls: UseTls = UseTls.NoTls,
authService: Option[AuthService] = None,
)(testFn: (Port, DamlLedgerClient, LedgerId) => Future[A])(implicit
mat: Materializer,
aesf: ExecutionSequencerFactory,
ec: ExecutionContext,
): Future[A] = {

val ledgerId = LedgerId(testName)
val applicationId = ApplicationId(testName)
implicit val resourceContext: ResourceContext = ResourceContext(ec)

val ledgerF = for {
urlResource <- Future(
SandboxBackend.H2Database.owner
.map(info => Some(info.jdbcUrl))
.acquire()
)
jdbcUrl <- urlResource.asFuture
ledger <- Future(
new SandboxServer(ledgerConfig(Port.Dynamic, dars, ledgerId, authService, useTls), mat)
new Runner(
ledgerConfig(
Port.Dynamic,
dars,
ledgerId,
useTls = useTls,
authService = authService,
jdbcUrl = jdbcUrl,
)
)
.acquire()
)
port <- ledger.portF
port <- ledger.asFuture
} yield (ledger, port)

val clientF: Future[DamlLedgerClient] = for {
Expand All @@ -179,11 +196,12 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
a <- testFn(ledgerPort, client, ledgerId)
} yield a

fa.onComplete { _ =>
ledgerF.foreach(_._1.close())
fa.transformWith { ta =>
ledgerF
.flatMap(_._1.release())
.fallbackTo(Future.unit)
.transform(_ => ta)
}

fa
}

private def ledgerConfig(
Expand All @@ -192,10 +210,12 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
ledgerId: LedgerId,
authService: Option[AuthService],
useTls: UseTls,
jdbcUrl: Option[String],
): SandboxConfig =
sandbox.DefaultConfig.copy(
SandboxConfig.defaultConfig.copy(
port = ledgerPort,
damlPackages = dars,
jdbcUrl = jdbcUrl,
timeProviderType = Some(TimeProviderType.WallClock),
tlsConfig = if (useTls) Some(serverTlsConfig) else None,
ledgerIdMode = LedgerIdMode.Static(ledgerId),
Expand Down

0 comments on commit 8d2b1b9

Please sign in to comment.