Skip to content

Commit

Permalink
JSON API: add PruningTest and make it pass (digital-asset#17167)
Browse files Browse the repository at this point in the history
If the ledger has been pruned more recently that the last cached copy, then attempting to fetch the changes since that last offset will fail, rending the relevant template(s) unqueryable. This PR detects that condition, clears the cache for the relevant template and queries again, which will refresh the cache with a fresh copy of the ACS for that template, and serve the query from that.

I also made some usability tweaks around running canton-ee tests, to help improve the dev experience for failures I came across while trying to run them. Specifically
* Use `--config=canton-ee` to opt into the tests which require canton-ee
* When downloading that EE from artifactory, provide better diagnostics if the required auth isn't set up.
  • Loading branch information
raphael-speyer-da authored Jul 26, 2023
1 parent 3e846d9 commit 47d21fe
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 94 deletions.
4 changes: 4 additions & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,7 @@ build -c opt
try-import %workspace%/.bazelrc.local
build --build_tag_filters=-canton-ee
test --test_tag_filters=-canton-ee

# Set flag --config=canton-ee to run tests which depend on Canton Enterprise Edition
build:canton-ee --build_tag_filters=canton-ee
test:canton-ee --test_tag_filters=canton-ee
8 changes: 5 additions & 3 deletions bazel_tools/build_environment.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ npm_version = "{NPM_VERSION}"
mvn_version = "{MVN_VERSION}"
ghc_version = "{GHC_VERSION}"
sdk_version = "{SDK_VERSION}"
artif_auth = "{artif_auth}"
artif_user = "{artif_user}"
artif_pass = "{artif_pass}"
""".format(
SDK_VERSION = semver,
NPM_VERSION = semver,
MVN_VERSION = semver,
GHC_VERSION = ghc,
artif_auth = ctx.os.environ.get("ARTIFACTORY_AUTH", default = ""),
artif_user = ctx.os.environ.get("ARTIFACTORY_USERNAME", default = ""),
artif_pass = ctx.os.environ.get("ARTIFACTORY_PASSWORD", default = ""),
),
executable = False,
)

build_environment = repository_rule(
# Tell Bazel that this rule will produce different results if any of the
# env vars in the list has changed.
environ = ["DAML_SDK_RELEASE_VERSION", "ARTIFACTORY_AUTH"],
environ = ["DAML_SDK_RELEASE_VERSION", "ARTIFACTORY_USERNAME", "ARTIFACTORY_PASSWORD"],
implementation = _impl,
attrs = {},
)
2 changes: 0 additions & 2 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ if (Test-Path -Path $env:appdata\stack\pantry\hackage\hackage-security-lock) {
Remove-Item -ErrorAction Continue -Force -Recurse -Path $env:appdata\stack
}

$env:ARTIFACTORY_AUTH = [Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes("$env:ARTIFACTORY_USERNAME" + ":" + "$env:ARTIFACTORY_PASSWORD"))

function bazel() {
Write-Output ">> bazel $args"
$global:lastexitcode = 0
Expand Down
7 changes: 2 additions & 5 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ else
bazel=bazel
fi

if [ -n "${ARTIFACTORY_USERNAME:-}" ] && [ -n "${ARTIFACTORY_PASSWORD:-}" ]; then
export ARTIFACTORY_AUTH=$(echo -n "$ARTIFACTORY_USERNAME:$ARTIFACTORY_PASSWORD" | base64 -w0)
fi

# Bazel test only builds targets that are dependencies of a test suite so do a full build first.
$bazel build //... \
--build_tag_filters "${tag_filter:1}" \
Expand Down Expand Up @@ -94,7 +90,8 @@ $bazel test //... \
--experimental_profile_include_target_label \
--build_event_json_file test-events.json \
--build_event_publish_all_actions \
--experimental_execution_log_file "$ARTIFACT_DIRS/logs/test_execution${execution_log_postfix}.log"
--experimental_execution_log_file "$ARTIFACT_DIRS/logs/test_execution${execution_log_postfix}.log" \
--config=canton-ee

# Make sure that Bazel query works.
$bazel query 'deps(//...)' >/dev/null
Expand Down
10 changes: 6 additions & 4 deletions canton/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load(
"lf_scalacopts_stricter",
)
load("//:canton_dep.bzl", "canton")
load("@build_environment//:configuration.bzl", "artif_auth")
load("@build_environment//:configuration.bzl", "artif_pass", "artif_user")
load("@os_info//:os_info.bzl", "is_windows")

java_binary(
Expand Down Expand Up @@ -74,13 +74,14 @@ url=https://digitalasset.jfrog.io/artifactory/canton-enterprise/canton-enterpris
curl=$(location {curl})
tmp=$$(mktemp)
auth=$$(echo -n "{artif_user}:{artif_pass}" | base64 -w0)
$$curl --location \
--fail \
--insecure \
--silent \
-H "Authorization: Basic {auth}" \
-H "Authorization: Basic $$auth" \
$$url \
> $$tmp
> $$tmp || echo >&2 " Failed to download from $$url \n Have you set your ARTIFACTORY_USERNAME and ARTIFACTORY_PASSWORD env vars correctly?"
computed_sha=$$(sha256sum $$tmp | awk '{{print $$1}}')
if [ "$$CANTON_ENTERPRISE_SHA" != "$$computed_sha" ]; then
Expand All @@ -95,7 +96,8 @@ tar xzf $$tmp
cp canton-*/lib/*.jar $@
""".format(
auth = artif_auth,
artif_user = artif_user,
artif_pass = artif_pass,
curl = "@curl_dev_env//:bin/curl" if not is_windows else "@curl_dev_env//:bin/curl.exe",
local = "true" if canton["local"] else "",
src = ":lib/canton-ee.jar" if canton["local"] else "@canton//:jar", # not used in else case but still must resolve
Expand Down
10 changes: 3 additions & 7 deletions canton/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ open-source repository so we cannot assume every contributor will have a Canton
EE license key.

Tests that require Canton EE **must** be tagged with `"canton-ee"`, which is
disabled by default through `.bazelrc`. To run those tests locally, either
explicitly target them or add `--build_tag_filters=` or `--test_tag_filters=`
as appropriate (yes, these are the full options: by setting the "running"
filters to empty for the current run, you overwrite the `-canton-ee` set in
`.bazelrc` which excludes the Canton EE tests, thereby removing the exclusion
and including the tests).
disabled by default through `.bazelrc`. To run those tests locally, add
`--config=canton-ee`, thereby removing the exclusion and including the tests.

Those tests are run on CI.

If you're using a local build of canton (setting `local` to `True` per above)
_and_ you are explicitly overwriting the `*_tag_filters` to run the Canton EE
_and_ you are explicitly using `--config=canton-ee` to run the Canton EE
tests, they will be run using your provided `canton-ee.jar` (which therefore
needs to be an EE jar at that point).
1 change: 0 additions & 1 deletion ci/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ jobs:
# Actually run some tests
# Note: Oracle tests all run sequentially because they all access the same Oracle instance,
# and we sometimes observe transient connection issues when running tests in parallel.
export ARTIFACTORY_AUTH=$(echo -n "$ARTIFACTORY_USERNAME:$ARTIFACTORY_PASSWORD" | base64 -w0)
bazel test \
--config=oracle \
--test_strategy=exclusive \
Expand Down
4 changes: 4 additions & 0 deletions compatibility/.bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,7 @@ build -c opt
try-import %workspace%/.bazelrc.local
build --build_tag_filters=-canton-ee
test --test_tag_filters=-canton-ee

# Set flag --config=canton-ee to run tests which depend on Canton Enterprise Edition
build:canton-ee --build_tag_filters=canton-ee
test:canton-ee --test_tag_filters=canton-ee
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler): ConnectionIO[Int]

final def deleteTemplate(tpid: SurrogateTpId)(implicit log: LogHandler): ConnectionIO[Unit] = {
for {
_ <- sql"DELETE FROM $contractTableName WHERE tpid = $tpid".update.run
_ <- sql"DELETE FROM $ledgerOffsetTableName WHERE tpid = $tpid".update.run
} yield {}
}

// ContractTypeId -> CId[String]
final def deleteContracts(
cids: Map[SurrogateTpId, Set[String]]
Expand Down
1 change: 1 addition & 0 deletions ledger-service/http-json/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ alias(
"//libs-scala/scala-utils",
"//libs-scala/test-evidence/scalatest:test-evidence-scalatest",
"//libs-scala/test-evidence/tag:test-evidence-tag",
"//libs-scala/timer-utils",
"//observability/metrics",
"//observability/metrics:metrics-test-lib",
"//runtime-components/non-repudiation",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.http

final class PruningTest
extends AbstractHttpServiceIntegrationTestFunsCustomToken
with HttpServicePostgresInt {

override def staticContentConfig: Option[StaticContentConfig] = None
override def useTls = HttpServiceTestFixture.UseTls.Tls
override def wsConfig: Option[WebsocketConfig] = None

import com.daml.ledger.api.v1.admin.{participant_pruning_service => PruneGrpc}
"fail querying after pruned offset" in withHttpService { fixture =>
import scala.concurrent.duration._
import com.daml.timer.RetryStrategy
for {
(alice, aliceHeaders) <- fixture.getUniquePartyAndAuthHeaders("Alice")
query = jsObject(s"""{"templateIds": ["Iou:Iou"]}""")

// do query to populate cache
_ <- searchExpectOk(List(), query, fixture, aliceHeaders)

// perform more actions on the ledger
(offsetBeforeArchive, offsetAfterArchive) <- offsetBeforeAfterArchival(
alice,
fixture,
aliceHeaders,
)

// prune, at an offset that is later than the last cache refresh.
_ <- RetryStrategy.constant(20, 1.second) { case (_, _) =>
for {
// Add artificial ledger activity to advance the safe prune offset. Repeated on failure.
_ <- postCreateCommand(iouCreateCommand(alice), fixture, aliceHeaders)
pruned <- PruneGrpc.ParticipantPruningServiceGrpc
.stub(fixture.client.channel)
.prune(
PruneGrpc.PruneRequest(
pruneUpTo = domain.Offset unwrap offsetAfterArchive,
pruneAllDivulgedContracts = true,
)
)
} yield pruned should ===(PruneGrpc.PruneResponse())
}

// now query again to ensure it handles PARTICIPANT_PRUNED_DATA_ACCESSED gracefully
_ <- searchExpectOk(List(), query, fixture, aliceHeaders)
} yield succeed
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -917,4 +917,74 @@ trait AbstractHttpServiceIntegrationTestFuns
.sample
.getOrElse(sys.error(s"can't generate ${n}b string"))
}

import AbstractHttpServiceIntegrationTestFuns.{UriFixture, EncoderFixture}

protected def offsetBeforeAfterArchival(
party: domain.Party,
fixture: UriFixture with EncoderFixture,
headers: List[HttpHeader],
): Future[(domain.Offset, domain.Offset)] = {
import WebsocketTestFixture._
import fixture.uri
type In = JsValue // JsValue might not be the most convenient choice
val syntax = Consume.syntax[In]
import syntax._

def offsetAfterCreate(): Consume.FCC[In, (domain.ContractId, domain.Offset)] = for {
// make a contract
create <- liftF(
postCreateCommand(
iouCreateCommand(party),
fixture,
headers,
)
)
cid = resultContractId(create)
// wait for the creation's offset
offsetAfter <- readUntil[In] {
case ContractDelta(creates, _, off @ Some(_)) =>
if (creates.exists(_._1 == cid)) off else None
case _ => None
}
} yield (cid, offsetAfter)

import akka.stream.{KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.Keep

def readMidwayOffset(kill: UniqueKillSwitch) = for {
// wait for the ACS
_ <- readUntil[In] {
case ContractDelta(_, _, offset) => offset
case _ => None
}
// make a contract and fetch the offset after it
(cid, betweenOffset) <- offsetAfterCreate()
// archive it
archive <- liftF(postArchiveCommand(TpId.Iou.Iou, cid, fixture, headers))
_ = archive._1 shouldBe (StatusCodes.OK)
// wait for the archival offset
afterOffset <- readUntil[In] {
case ContractDelta(_, archived, offset) =>
if (archived.exists(_.contractId == cid)) offset else None
case _ => None
}
// if you try to prune afterOffset, pruning fails with
// OFFSET_OUT_OF_RANGE(9,db14ee96): prune_up_to needs to be before ledger end 0000000000000007
// create another dummy contract and ignore it
_ <- offsetAfterCreate()
_ = kill.shutdown()
} yield (betweenOffset, afterOffset)

val query = """[{"templateIds": ["Iou:Iou"]}]"""
for {
jwt <- jwtForParties(uri)(List(party), List(), "participant0")
(kill, source) =
singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
offsets <- source.via(parseResp).runWith(Consume.interpret(readMidwayOffset(kill)))
} yield offsets
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1197,71 +1197,6 @@ abstract class AbstractWebsocketServiceIntegrationTest(val integration: String)
}
}

import AbstractHttpServiceIntegrationTestFuns.{UriFixture, EncoderFixture}

private[this] def offsetBeforeAfterArchival(
party: domain.Party,
fixture: UriFixture with EncoderFixture,
headers: List[HttpHeader],
): Future[(domain.Offset, domain.Offset)] = {
import fixture.uri
type In = JsValue // JsValue might not be the most convenient choice
val syntax = Consume.syntax[In]
import syntax._

def offsetAfterCreate(): Consume.FCC[In, (domain.ContractId, domain.Offset)] = for {
// make a contract
create <- liftF(
postCreateCommand(
iouCreateCommand(party),
fixture,
headers,
)
)
cid = resultContractId(create)
// wait for the creation's offset
offsetAfter <- readUntil[In] {
case ContractDelta(creates, _, off @ Some(_)) =>
if (creates.exists(_._1 == cid)) off else None
case _ => None
}
} yield (cid, offsetAfter)

def readMidwayOffset(kill: UniqueKillSwitch) = for {
// wait for the ACS
_ <- readUntil[In] {
case ContractDelta(_, _, offset) => offset
case _ => None
}
// make a contract and fetch the offset after it
(cid, betweenOffset) <- offsetAfterCreate()
// archive it
archive <- liftF(postArchiveCommand(TpId.Iou.Iou, cid, fixture, headers))
_ = archive._1 should ===(StatusCodes.OK)
// wait for the archival offset
afterOffset <- readUntil[In] {
case ContractDelta(_, archived, offset) =>
if (archived.exists(_.contractId == cid)) offset else None
case _ => None
}
// if you try to prune afterOffset, pruning fails with
// OFFSET_OUT_OF_RANGE(9,db14ee96): prune_up_to needs to be before ledger end 0000000000000007
// create another dummy contract and ignore it
_ <- offsetAfterCreate()
_ = kill.shutdown()
} yield (betweenOffset, afterOffset)

val query = """[{"templateIds": ["Iou:Iou"]}]"""
for {
jwt <- jwtForParties(uri)(List(party), List(), "participant0")
(kill, source) =
singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
offsets <- source.via(parseResp).runWith(Consume.interpret(readMidwayOffset(kill)))
} yield offsets
}

import AbstractHttpServiceIntegrationTestFuns.UriFixture

"query on a bunch of random splits should yield consistent results" in withHttpService {
Expand Down
Loading

0 comments on commit 47d21fe

Please sign in to comment.