From 4fe6e53b9dc4b9435aef483b92eee4b0696f46dd Mon Sep 17 00:00:00 2001 From: pbatko-da Date: Tue, 18 Jan 2022 15:59:22 +0100 Subject: [PATCH] [User management] Persistence with caching (#12344) Adding - `PersistentUserManagementStore` and `CachedUserManagementStore`, - `UserManagementStorageBackendTemplate` and sql migrations, - CLI flags: `--user-management-max-cache-size` and `--user-management-cache-expiry`; And wiring `PersistentUserManagementStore` where before we had `InMemoryUserManagementStore`. --- .../main/scala/com/daml/caching/Cache.scala | 6 + .../com/daml/caching/CaffeineCache.scala | 34 +- .../scala/com/daml/caching/MappedCache.scala | 3 + .../main/scala/com/daml/caching/NoCache.scala | 2 + .../caching/MapBackedCacheForTesting.scala | 2 + .../scala/com/daml/error/ErrorCategory.scala | 2 +- .../api/validation/ErrorFactoriesSpec.scala | 2 +- .../testtool/infrastructure/Assertions.scala | 19 + .../participant/ParticipantTestContext.scala | 20 +- .../suites/UserManagementServiceIT.scala | 345 ++++++++++++++---- .../main/scala/com/daml/metrics/Metrics.scala | 14 + .../participant-integration-api/BUILD.bazel | 1 + .../V1__Append_only_schema.sha256 | 2 +- .../V1__Append_only_schema.sql | 24 ++ .../V8__add_user_managment.sha256 | 1 + .../V8__add_user_managment.sql | 24 ++ .../V118__add_user_managment.sha256 | 1 + .../V118__add_user_managment.sql | 29 ++ .../store/backend/StorageBackend.scala | 35 +- .../store/backend/StorageBackendFactory.scala | 1 + .../common/CommonStorageBackendFactory.scala | 5 + ...UserManagementStorageBackendTemplate.scala | 204 +++++++++++ .../backend/h2/H2ResetStorageBackend.scala | 4 + .../oracle/OracleResetStorageBackend.scala | 4 + .../PostgresResetStorageBackend.scala | 4 + .../CachedUserManagementStore.scala | 98 +++++ .../PersistentUserManagementStore.scala | 200 ++++++++++ .../backend/StorageBackendProvider.scala | 7 +- .../store/backend/StorageBackendSuite.scala | 3 +- .../StorageBackendTestsUserManagement.scala | 178 +++++++++ .../CachedUserManagementStoreSpec.scala | 139 +++++++ .../InMemoryUserManagementStore.scala | 21 +- .../state/index/v2/UserManagementStore.scala | 27 +- .../state/kvutils/app/Config.scala | 29 ++ .../state/kvutils/app/Runner.scala | 11 +- .../state/kvutils/app/ConfigSpec.scala | 39 ++ .../platform/sandbox/SandboxServer.scala | 13 +- .../platform/sandbox/cli/CommonCliBase.scala | 23 ++ .../sandbox/config/SandboxConfig.scala | 11 +- .../sandbox/cli/CommonCliSpecBase.scala | 40 ++ .../ledger/sandbox/SandboxOnXRunner.scala | 66 ++-- .../scala/platform/sandboxnext/Runner.scala | 11 +- .../platform/testing/LogCollector.scala | 9 +- 43 files changed, 1551 insertions(+), 162 deletions(-) create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V8__add_user_managment.sha256 create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V8__add_user_managment.sql create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V118__add_user_managment.sha256 create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V118__add_user_managment.sql create mode 100644 ledger/participant-integration-api/src/main/scala/platform/store/backend/common/UserManagementStorageBackendTemplate.scala create mode 100644 ledger/participant-integration-api/src/main/scala/platform/usermanagement/CachedUserManagementStore.scala create mode 100644 ledger/participant-integration-api/src/main/scala/platform/usermanagement/PersistentUserManagementStore.scala create mode 100644 ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsUserManagement.scala create mode 100644 ledger/participant-integration-api/src/test/suite/scala/platform/usermanagement/CachedUserManagementStoreSpec.scala diff --git a/ledger/caching/src/main/scala/com/daml/caching/Cache.scala b/ledger/caching/src/main/scala/com/daml/caching/Cache.scala index eb42f18091a4..c0e546242fb8 100644 --- a/ledger/caching/src/main/scala/com/daml/caching/Cache.scala +++ b/ledger/caching/src/main/scala/com/daml/caching/Cache.scala @@ -22,6 +22,12 @@ abstract class Cache[Key, Value] { */ def getIfPresent(key: Key): Option[Value] + /** Discard any cached value for the key. + * + * The behavior of this operation is undefined for an entry that is being loaded and is otherwise not present. + */ + def invalidate(key: Key): Unit + /** Transform values when reading from or writing to the cache. * * Optionally allows the mapping to discard values by returning [[None]] when transforming before diff --git a/ledger/caching/src/main/scala/com/daml/caching/CaffeineCache.scala b/ledger/caching/src/main/scala/com/daml/caching/CaffeineCache.scala index a916294c7e56..9344672d521c 100644 --- a/ledger/caching/src/main/scala/com/daml/caching/CaffeineCache.scala +++ b/ledger/caching/src/main/scala/com/daml/caching/CaffeineCache.scala @@ -6,7 +6,9 @@ package com.daml.caching import com.daml.metrics.CacheMetrics import com.github.benmanes.caffeine.{cache => caffeine} +import scala.compat.java8.FutureConverters import scala.compat.java8.OptionConverters._ +import scala.concurrent.Future object CaffeineCache { @@ -31,16 +33,27 @@ object CaffeineCache { override def getOrAcquire(key: Key, acquire: Key => Value): Value = cache.get(key, key => acquire(key)) + + override def invalidate(key: Key): Unit = cache.invalidate(key) + } + + final class AsyncLoadingCacheScala[Key <: AnyRef, Value <: AnyRef]( + cache: caffeine.AsyncLoadingCache[Key, Value], + cacheMetrics: CacheMetrics, + ) { + installMetrics(cacheMetrics, cache.synchronous()) + + def get(key: Key): Future[Value] = FutureConverters.toScala(cache.get(key)) + + def invalidate(key: Key): Unit = cache.synchronous().invalidate(key) + } private final class InstrumentedCaffeineCache[Key <: AnyRef, Value <: AnyRef]( cache: caffeine.Cache[Key, Value], metrics: CacheMetrics, ) extends ConcurrentCache[Key, Value] { - metrics.registerSizeGauge(() => cache.estimatedSize()) - metrics.registerWeightGauge(() => - cache.policy().eviction().asScala.flatMap(_.weightedSize.asScala).getOrElse(0) - ) + installMetrics(metrics, cache) private val delegate = new SimpleCaffeineCache(cache) @@ -52,6 +65,19 @@ object CaffeineCache { override def getOrAcquire(key: Key, acquire: Key => Value): Value = delegate.getOrAcquire(key, acquire) + + override def invalidate(key: Key): Unit = + delegate.invalidate(key) + } + + private def installMetrics[Key <: AnyRef, Value <: AnyRef]( + metrics: CacheMetrics, + cache: caffeine.Cache[Key, Value], + ): Unit = { + metrics.registerSizeGauge(() => cache.estimatedSize()) + metrics.registerWeightGauge(() => + cache.policy().eviction().asScala.flatMap(_.weightedSize.asScala).getOrElse(0) + ) } } diff --git a/ledger/caching/src/main/scala/com/daml/caching/MappedCache.scala b/ledger/caching/src/main/scala/com/daml/caching/MappedCache.scala index 60087069b0c4..cea82f15b412 100644 --- a/ledger/caching/src/main/scala/com/daml/caching/MappedCache.scala +++ b/ledger/caching/src/main/scala/com/daml/caching/MappedCache.scala @@ -15,4 +15,7 @@ private[caching] final class MappedCache[Key, Value, NewValue]( override def getIfPresent(key: Key): Option[NewValue] = delegate.getIfPresent(key).map(mapAfterReading) + + override def invalidate(key: Key): Unit = + delegate.invalidate(key) } diff --git a/ledger/caching/src/main/scala/com/daml/caching/NoCache.scala b/ledger/caching/src/main/scala/com/daml/caching/NoCache.scala index 7eaa3415ba3c..84dea6d7e3e0 100644 --- a/ledger/caching/src/main/scala/com/daml/caching/NoCache.scala +++ b/ledger/caching/src/main/scala/com/daml/caching/NoCache.scala @@ -9,4 +9,6 @@ final class NoCache[Key, Value] private[caching] extends ConcurrentCache[Key, Va override def getIfPresent(key: Key): Option[Value] = None override def getOrAcquire(key: Key, acquire: Key => Value): Value = acquire(key) + + override def invalidate(key: Key): Unit = () } diff --git a/ledger/caching/src/test/lib/scala/com/daml/caching/MapBackedCacheForTesting.scala b/ledger/caching/src/test/lib/scala/com/daml/caching/MapBackedCacheForTesting.scala index 704ae617c65b..c2b88536557f 100644 --- a/ledger/caching/src/test/lib/scala/com/daml/caching/MapBackedCacheForTesting.scala +++ b/ledger/caching/src/test/lib/scala/com/daml/caching/MapBackedCacheForTesting.scala @@ -17,4 +17,6 @@ final class MapBackedCacheForTesting[Key, Value](store: ConcurrentMap[Key, Value override def getOrAcquire(key: Key, acquire: Key => Value): Value = store.computeIfAbsent(key, acquire(_)) + + override def invalidate(key: Key): Unit = store.remove(key): Unit } diff --git a/ledger/error/src/main/scala/com/daml/error/ErrorCategory.scala b/ledger/error/src/main/scala/com/daml/error/ErrorCategory.scala index 26eec0277b27..0d6b753134fd 100644 --- a/ledger/error/src/main/scala/com/daml/error/ErrorCategory.scala +++ b/ledger/error/src/main/scala/com/daml/error/ErrorCategory.scala @@ -231,7 +231,7 @@ object ErrorCategory { */ @Description( """The mutable state of the system does not satisfy the preconditions required to execute the request. - |We consider the whole Daml ledger including ledger config, parties, packages, and command + |We consider the whole Daml ledger including ledger config, parties, packages, users and command |deduplication to be mutable system state. Thus all Daml interpretation errors are reported |as this error or one of its specializations.""" ) diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala index a491de8865d2..c27d5d3c6231 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala @@ -952,7 +952,7 @@ class ErrorFactoriesSpec expectedDetails: Seq[ErrorDetails.ErrorDetail], expectedLogEntry: ExpectedLogEntry, ): Unit = { - assertError[this.type, this.type]( + val _ = assertError[this.type, this.type]( actual = statusRuntimeException, expectedCode, expectedMessage, diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/Assertions.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/Assertions.scala index ad79d4ff3379..fafa3896246e 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/Assertions.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/Assertions.scala @@ -47,6 +47,25 @@ object Assertions { } } + def assertEquals[T](actual: T, expected: T): Unit = { + try { + MUnit.assertEquals(actual, expected) + } catch { + case e: ComparisonFailException => + throw AssertionErrorWithPreformattedMessage( + e.message, + s"two objects are supposed to be equal but they are not", + ) + } + } + + def assertSameElements[T](actual: Iterable[T], expected: Iterable[T]): Unit = { + assert( + actual.toSet == expected.toSet, + s"Actual |${actual.mkString(", ")}| should have the same elements as (expected): |${expected.mkString(", ")}|", + ) + } + /** Asserts GRPC error codes depending on the self-service error codes feature in the Ledger API. */ def assertGrpcError( participant: ParticipantTestContext, diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala index 2ee049f25f91..5d4d1aad20d0 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala @@ -133,14 +133,20 @@ private[testtool] final class ParticipantTestContext private[participant] ( private[this] val identifierPrefix = s"$applicationId-$endpointId-$identifierSuffix" - private[this] def nextId(idType: String): () => String = - Identification.indexSuffix(s"$identifierPrefix-$idType") - - private[this] val nextPartyHintId: () => String = nextId("party") - private[this] val nextCommandId: () => String = nextId("command") - private[this] val nextSubmissionId: () => String = nextId("submission") + private[this] def nextIdGenerator(name: String, lowerCase: Boolean = false): () => String = { + val f = Identification.indexSuffix(s"$identifierPrefix-$name") + if (lowerCase) + () => f().toLowerCase + else + f + } + + private[this] val nextPartyHintId: () => String = nextIdGenerator("party") + private[this] val nextCommandId: () => String = nextIdGenerator("command") + private[this] val nextSubmissionId: () => String = nextIdGenerator("submission") private[this] val workflowId: String = s"$applicationId-$identifierSuffix" - val nextKeyId: () => String = nextId("key") + val nextKeyId: () => String = nextIdGenerator("key") + val nextUserId: () => String = nextIdGenerator("user", lowerCase = true) override def toString: String = s"participant $endpointId" diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/UserManagementServiceIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/UserManagementServiceIT.scala index 0982e3c5d479..9b1df567d6f6 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/UserManagementServiceIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/UserManagementServiceIT.scala @@ -10,33 +10,28 @@ import com.daml.error.definitions.LedgerApiErrors import com.daml.ledger.api.testtool.infrastructure.Allocation._ import com.daml.ledger.api.testtool.infrastructure.Assertions._ import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite +import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext import com.daml.ledger.api.v1.admin.user_management_service.{ CreateUserRequest, DeleteUserRequest, + DeleteUserResponse, GetUserRequest, GrantUserRightsRequest, ListUserRightsRequest, + ListUserRightsResponse, + ListUsersRequest, RevokeUserRightsRequest, + RevokeUserRightsResponse, User, Right => Permission, } import com.daml.ledger.api.v1.admin.{user_management_service => proto} import io.grpc.Status -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} final class UserManagementServiceIT extends LedgerTestSuite { - // TODO (i12059): complete testing - // create user - // - invalid user-name - // - invalid rights - - // get user - // delete user - // list users - // grant user rights - // revoke user rights - // list user rights + test( "UserManagementCreateUserInvalidArguments", "Test argument validation for UserManagement#CreateUser", @@ -50,13 +45,20 @@ final class UserManagementServiceIT extends LedgerTestSuite { problem: String, user: User, rights: Seq[proto.Right], - errorCode: ErrorCode, - ): Future[Unit] = + expectedErrorCode: ErrorCode, + ): Future[Unit] = { for { - error <- ledger.userManagement + throwable <- ledger.userManagement .createUser(CreateUserRequest(Some(user), rights)) - .mustFail(problem) - } yield assertGrpcError(ledger, error, Status.Code.INVALID_ARGUMENT, errorCode, None) + .mustFail(context = problem) + } yield assertGrpcError( + participant = ledger, + t = throwable, + expectedCode = Status.Code.INVALID_ARGUMENT, + selfServiceErrorCode = expectedErrorCode, + exceptionMessageSubstring = None, + ) + } for { _ <- createAndCheck( @@ -94,12 +96,12 @@ final class UserManagementServiceIT extends LedgerTestSuite { enabled = _.userManagement, disabledReason = "requires user management feature", )(implicit ec => { case Participants(Participant(ledger)) => - def getAndCheck(problem: String, userId: String, errorCode: ErrorCode): Future[Unit] = + def getAndCheck(problem: String, userId: String, expectedErrorCode: ErrorCode): Future[Unit] = for { error <- ledger.userManagement .getUser(GetUserRequest(userId)) .mustFail(problem) - } yield assertGrpcError(ledger, error, Status.Code.INVALID_ARGUMENT, errorCode, None) + } yield assertGrpcError(ledger, error, Status.Code.INVALID_ARGUMENT, expectedErrorCode, None) for { _ <- getAndCheck("empty user-id", "", LedgerApiErrors.RequestValidation.InvalidArgument) @@ -107,73 +109,268 @@ final class UserManagementServiceIT extends LedgerTestSuite { } yield () }) - test( - "TestAllUserManagementRpcs", - "Exercise every rpc once with success and once with a failure", - allocate(NoParties), - enabled = _.userManagement, - disabledReason = "requires user management feature", - )(implicit ec => { case Participants(Participant(ledger)) => + private val adminPermission = + Permission(Permission.Kind.ParticipantAdmin(Permission.ParticipantAdmin())) + private val actAsPermission1 = + Permission(Permission.Kind.CanActAs(Permission.CanActAs("acting-party-1"))) + private val readAsPermission1 = + Permission(Permission.Kind.CanReadAs(Permission.CanReadAs("reading-party-1"))) + private val userRightsBatch = List( + actAsPermission1, + Permission(Permission.Kind.CanActAs(Permission.CanActAs("acting-party-2"))), + readAsPermission1, + Permission(Permission.Kind.CanReadAs(Permission.CanReadAs("reading-party-2"))), + ) + private val AdminUserId = "participant_admin" + + userManagementTest( + "TestAdminExists", + "Ensure admin user exists", + )(implicit ec => { implicit ledger => + for { + get1 <- ledger.userManagement.getUser(GetUserRequest(AdminUserId)) + rights1 <- ledger.userManagement.listUserRights(ListUserRightsRequest(AdminUserId)) + } yield { + assertEquals(get1, User(AdminUserId, "")) + assertEquals(rights1, ListUserRightsResponse(Seq(adminPermission))) + } + }) + + userManagementTest( + "TestCreateUser", + "Exercise CreateUser rpc", + )(implicit ec => { implicit ledger => + val userId1 = ledger.nextUserId() + val userId2 = ledger.nextUserId() + val user1 = User(userId1, "party1") + val user2 = User(userId2, "") for { - // TODO: actually exercise all RPCs - createResult <- ledger.userManagement.createUser(CreateUserRequest(Some(User("a", "b")), Nil)) - createAgainError <- ledger.userManagement - .createUser(CreateUserRequest(Some(User("a", "b")), Nil)) + res1 <- ledger.userManagement.createUser( + CreateUserRequest(Some(user1), Nil) + ) + res2 <- ledger.userManagement + .createUser(CreateUserRequest(Some(user1), Nil)) .mustFail("allocating a duplicate user") + res3 <- ledger.userManagement.createUser(CreateUserRequest(Some(user2), Nil)) + res4 <- ledger.userManagement.deleteUser(DeleteUserRequest(userId2)) + } yield { + assertEquals(res1, user1) + assertUserAlreadyExists(res2) + assertEquals(res3, user2) + assertEquals(res4, DeleteUserResponse()) + } + }) - getUserResult <- ledger.userManagement.getUser(GetUserRequest("a")) - getUserError <- ledger.userManagement - .getUser(GetUserRequest("b")) + userManagementTest( + "TestGetUser", + "Exercise GetUser rpc", + )(implicit ec => { implicit ledger => + val userId1 = ledger.nextUserId() + val userId2 = ledger.nextUserId() + val user1 = User(userId1, "party1") + for { + _ <- ledger.userManagement.createUser( + CreateUserRequest(Some(user1), Nil) + ) + res1 <- ledger.userManagement.getUser(GetUserRequest(userId1)) + res2 <- ledger.userManagement + .getUser(GetUserRequest(userId2)) .mustFail("retrieving non-existent user") + } yield { + assertUserNotFound(res2) + assert(res1 == user1) + } + }) - grantResult <- ledger.userManagement.grantUserRights( - GrantUserRightsRequest( - "a", - List(Permission(Permission.Kind.ParticipantAdmin(Permission.ParticipantAdmin()))), - ) + userManagementTest( + "TestDeleteUser", + "Exercise DeleteUser rpc", + )(implicit ec => { implicit ledger => + val userId1 = ledger.nextUserId() + val userId2 = ledger.nextUserId() + val user1 = User(userId1, "party1") + for { + _ <- ledger.userManagement.createUser( + CreateUserRequest(Some(user1), Nil) ) - listRightsResult <- ledger.userManagement.listUserRights(ListUserRightsRequest("a")) - revokeResult <- ledger.userManagement.revokeUserRights( - RevokeUserRightsRequest( - "a", - List(Permission(Permission.Kind.ParticipantAdmin(Permission.ParticipantAdmin()))), - ) + res1 <- ledger.userManagement.deleteUser(DeleteUserRequest(userId1)) + res2 <- ledger.userManagement + .deleteUser(DeleteUserRequest(userId2)) + .mustFail("deleting non-existent user") + } yield { + assertEquals(res1, DeleteUserResponse()) + assertUserNotFound(res2) + } + }) + + userManagementTest( + "TestListUsers", + "Exercise ListUsers rpc", + )(implicit ec => { implicit ledger => + val userId1 = ledger.nextUserId() + val userId2 = ledger.nextUserId() + val user1 = User(userId1, "party1") + val user2 = User(userId2, "party4") + for { + _ <- ledger.userManagement.createUser( + CreateUserRequest(Some(user1), Nil) ) - _ <- ledger.userManagement.deleteUser(DeleteUserRequest("a")) + res1 <- ledger.userManagement.listUsers(ListUsersRequest()) + res2 <- ledger.userManagement.createUser( + CreateUserRequest(Some(user2), Nil) + ) + res3 <- ledger.userManagement.listUsers(ListUsersRequest()) + res4 <- ledger.userManagement.deleteUser(DeleteUserRequest(userId2)) + res5 <- ledger.userManagement.listUsers(ListUsersRequest()) } yield { - assertGrpcError( - ledger, - createAgainError, - Status.Code.ALREADY_EXISTS, - LedgerApiErrors.AdminServices.UserAlreadyExists, - None, - ) - assertGrpcError( - ledger, - getUserError, - Status.Code.NOT_FOUND, - LedgerApiErrors.AdminServices.UserNotFound, - None, - ) - assert(createResult == User("a", "b")) - assert(getUserResult == User("a", "b")) - assert( - grantResult.newlyGrantedRights == List( - Permission(Permission.Kind.ParticipantAdmin(Permission.ParticipantAdmin())) - ) + def filterUsers(users: Iterable[User]) = users.filter(u => u.id == userId1 || u.id == userId2) + assertSameElements(filterUsers(res1.users), Seq(user1)) + assertEquals(res2, user2) + assertSameElements( + filterUsers(res3.users), + Set(user1, user2), ) - assert( - revokeResult.newlyRevokedRights == List( - Permission(Permission.Kind.ParticipantAdmin(Permission.ParticipantAdmin())) - ) + assertEquals(res4, DeleteUserResponse()) + assertSameElements(filterUsers(res5.users), Seq(user1)) + } + }) + + userManagementTest( + "TestGrantUserRights", + "Exercise GrantUserRights rpc", + )(implicit ec => { implicit ledger => + val userId1 = ledger.nextUserId() + val userId2 = ledger.nextUserId() + val user1 = User(userId1, "party1") + for { + _ <- ledger.userManagement.createUser(CreateUserRequest(Some(user1), Nil)) + res1 <- ledger.userManagement.grantUserRights( + GrantUserRightsRequest(userId1, List(adminPermission)) + ) + res2 <- ledger.userManagement + .grantUserRights(GrantUserRightsRequest(userId2, List(adminPermission))) + .mustFail("granting right to a non-existent user") + res3 <- ledger.userManagement.grantUserRights( + GrantUserRightsRequest(userId1, List(adminPermission)) + ) + res4 <- ledger.userManagement.grantUserRights( + GrantUserRightsRequest(userId1, userRightsBatch) + ) + } yield { + assertSameElements(res1.newlyGrantedRights, List(adminPermission)) + assertUserNotFound(res2) + assertSameElements(res3.newlyGrantedRights, List.empty) + assertSameElements(res4.newlyGrantedRights.toSet, userRightsBatch.toSet) + } + }) + + userManagementTest( + "TestRevokeUserRights", + "Exercise RevokeUserRights rpc", + )(implicit ec => { implicit ledger => + val userId1 = ledger.nextUserId() + val userId2 = ledger.nextUserId() + val user1 = User(userId1, "party1") + for { + _ <- ledger.userManagement.createUser( + CreateUserRequest(Some(user1), List(adminPermission) ++ userRightsBatch) + ) + res1 <- ledger.userManagement.revokeUserRights( + RevokeUserRightsRequest(userId1, List(adminPermission)) + ) + res2 <- ledger.userManagement + .revokeUserRights(RevokeUserRightsRequest(userId2, List(adminPermission))) + .mustFail("revoking right from a non-existent user") + res3 <- ledger.userManagement.revokeUserRights( + RevokeUserRightsRequest(userId1, List(adminPermission)) ) - assert( - listRightsResult.rights.toSet == Set( - Permission(Permission.Kind.ParticipantAdmin(Permission.ParticipantAdmin())) -// Permission(Permission.Kind.CanActAs(Permission.CanActAs("acting-party"))), -// Permission(Permission.Kind.CanReadAs(Permission.CanReadAs("reader-party"))), + res4 <- ledger.userManagement.revokeUserRights( + RevokeUserRightsRequest(userId1, userRightsBatch) + ) + } yield { + assertEquals(res1, RevokeUserRightsResponse(List(adminPermission))) + assertUserNotFound(res2) + assertSameElements(res3.newlyRevokedRights, List.empty) + assertSameElements(res4.newlyRevokedRights.toSet, userRightsBatch.toSet) + } + }) + + userManagementTest( + "TestListUserRights", + "Exercise ListUserRights rpc", + )(implicit ec => { implicit ledger => + val userId1 = ledger.nextUserId() + val user1 = User(userId1, "party4") + for { + res1 <- ledger.userManagement.createUser( + CreateUserRequest(Some(user1), Nil) + ) + res2 <- ledger.userManagement.listUserRights(ListUserRightsRequest(userId1)) + res3 <- ledger.userManagement.grantUserRights( + GrantUserRightsRequest( + userId1, + List(adminPermission, actAsPermission1, readAsPermission1), ) ) + res4 <- ledger.userManagement.listUserRights(ListUserRightsRequest(userId1)) + res5 <- ledger.userManagement.revokeUserRights( + RevokeUserRightsRequest(userId1, List(adminPermission)) + ) + res6 <- ledger.userManagement + .listUserRights(ListUserRightsRequest(userId1)) + } yield { + assertEquals(res1, user1) + assertEquals(res2, ListUserRightsResponse(Seq.empty)) + assertSameElements( + res3.newlyGrantedRights.toSet, + Set(adminPermission, actAsPermission1, readAsPermission1), + ) + assertSameElements( + res4.rights.toSet, + Set(adminPermission, actAsPermission1, readAsPermission1), + ) + assertSameElements(res5.newlyRevokedRights, Seq(adminPermission)) + assertSameElements(res6.rights.toSet, Set(actAsPermission1, readAsPermission1)) } }) + + private def userManagementTest( + shortIdentifier: String, + description: String, + )( + body: ExecutionContext => ParticipantTestContext => Future[Unit] + ): Unit = { + test( + shortIdentifier = shortIdentifier, + description = description, + allocate(NoParties), + enabled = _.userManagement, + disabledReason = "requires user management feature", + )(implicit ec => { case Participants(Participant(ledger)) => + body(ec)(ledger) + }) + } + + private def assertUserNotFound(t: Throwable)(implicit ledger: ParticipantTestContext): Unit = { + assertGrpcError( + participant = ledger, + t = t, + expectedCode = Status.Code.NOT_FOUND, + selfServiceErrorCode = LedgerApiErrors.AdminServices.UserNotFound, + exceptionMessageSubstring = None, + ) + } + + private def assertUserAlreadyExists( + t: Throwable + )(implicit ledger: ParticipantTestContext): Unit = { + assertGrpcError( + participant = ledger, + t = t, + expectedCode = Status.Code.ALREADY_EXISTS, + selfServiceErrorCode = LedgerApiErrors.AdminServices.UserAlreadyExists, + exceptionMessageSubstring = None, + ) + } + } diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index b853ddee601f..bb98203b9b4f 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -336,6 +336,20 @@ final class Metrics(val registry: MetricRegistry) { } } + object userManagement { + private val Prefix = daml.Prefix :+ "user_management" + + val cache = new CacheMetrics(registry, Prefix :+ "cache") + + private def createDbMetrics(name: String): DatabaseMetrics = + new DatabaseMetrics(registry, Prefix, name) + val getUserInfo: DatabaseMetrics = createDbMetrics("get_user_info") + val createUser: DatabaseMetrics = createDbMetrics("create_user") + val deleteUser: DatabaseMetrics = createDbMetrics("delete_user") + val grantRights: DatabaseMetrics = createDbMetrics("grant_rights") + val revokeRights: DatabaseMetrics = createDbMetrics("revoke_rights") + val listUsers: DatabaseMetrics = createDbMetrics("list_users") + } object index { private val Prefix = daml.Prefix :+ "index" diff --git a/ledger/participant-integration-api/BUILD.bazel b/ledger/participant-integration-api/BUILD.bazel index 67b4671172be..3df5ea82cb72 100644 --- a/ledger/participant-integration-api/BUILD.bazel +++ b/ledger/participant-integration-api/BUILD.bazel @@ -84,6 +84,7 @@ compile_deps = [ "@maven//:org_postgresql_postgresql", "@maven//:com_oracle_database_jdbc_ojdbc8", "@maven//:com_google_api_grpc_proto_google_common_protos", + "@maven//:com_github_ben_manes_caffeine_caffeine", ] scala_compile_deps = [ diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 index aebb8c89457d..68f23382d546 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -8f22abf50be616e45c6a8cb4069ebbac2f90c048337c73ede365869dec006fce +8bf87f5a750140b287b3ecf1b38bf87581d563bcfd3965481c67a460347a8670 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql index 114198f57c54..ce28ad117bbb 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql @@ -36,6 +36,30 @@ CREATE TABLE configuration_entries ( CREATE INDEX idx_configuration_submission ON configuration_entries (submission_id); +--------------------------------------------------------------------------------------------------- +-- User management tables +--------------------------------------------------------------------------------------------------- +CREATE TABLE participant_users ( + internal_id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + user_id VARCHAR(256) NOT NULL UNIQUE, + primary_party VARCHAR(512) NOT NULL, + created_at BIGINT NOT NULL DEFAULT CAST((1000 * 1000 * EXTRACT(epoch FROM CURRENT_TIMESTAMP)) AS BIGINT) +); + +CREATE TABLE participant_user_rights ( + user_internal_id INTEGER NOT NULL REFERENCES participant_users (internal_id) ON DELETE CASCADE, + user_right INTEGER NOT NULL, + for_party VARCHAR(512) NOT NULL, + granted_at BIGINT NOT NULL DEFAULT CAST((1000 * 1000 * EXTRACT(epoch FROM CURRENT_TIMESTAMP)) AS BIGINT), + UNIQUE (user_internal_id, user_right, for_party) +); + +INSERT INTO participant_users(user_id, primary_party) VALUES ('participant_admin', '!'); +INSERT INTO participant_user_rights(user_internal_id, user_right, for_party) + SELECT internal_id, 1, '!' + FROM participant_users + WHERE user_id = 'participant_admin'; + --------------------------------------------------------------------------------------------------- -- Packages table --------------------------------------------------------------------------------------------------- diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V8__add_user_managment.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V8__add_user_managment.sha256 new file mode 100644 index 000000000000..98ad311fe9e9 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V8__add_user_managment.sha256 @@ -0,0 +1 @@ +0279d98062ef0732505e1f82d26a30f49d4b68a5809cea4a2b78e6acf16d558b diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V8__add_user_managment.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V8__add_user_managment.sql new file mode 100644 index 000000000000..e7e297b27f87 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V8__add_user_managment.sql @@ -0,0 +1,24 @@ +-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + + +CREATE TABLE participant_users ( + internal_id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + user_id VARCHAR2(256) NOT NULL UNIQUE, + primary_party VARCHAR2(512) NOT NULL, + created_at NUMBER DEFAULT 0 NOT NULL +); + +CREATE TABLE participant_user_rights ( + user_internal_id NUMBER NOT NULL REFERENCES participant_users (internal_id) ON DELETE CASCADE, + user_right NUMBER NOT NULL, + for_party VARCHAR2(512) NOT NULL, + granted_at NUMBER DEFAULT 0 NOT NULL, + UNIQUE (user_internal_id, user_right, for_party) +); + +INSERT INTO participant_users(user_id, primary_party) VALUES ('participant_admin', '!'); +INSERT INTO participant_user_rights(user_internal_id, user_right, for_party) + SELECT internal_id, 1, '!' + FROM participant_users + WHERE user_id = 'participant_admin'; diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V118__add_user_managment.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V118__add_user_managment.sha256 new file mode 100644 index 000000000000..4f63836f55a7 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V118__add_user_managment.sha256 @@ -0,0 +1 @@ +67809c0acfa7b3c9a3fced287dc64dde34ee8cccfbe741201e162ca2f94f56e6 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V118__add_user_managment.sql b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V118__add_user_managment.sql new file mode 100644 index 000000000000..9604a2032eea --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V118__add_user_managment.sql @@ -0,0 +1,29 @@ +-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + + + +CREATE TABLE participant_users ( + internal_id SERIAL PRIMARY KEY, + user_id VARCHAR(256) NOT NULL UNIQUE, + primary_party VARCHAR(512) NOT NULL, + created_at BIGINT NOT NULL DEFAULT (1000 * 1000 * EXTRACT(epoch FROM CURRENT_TIMESTAMP))::bigint +); +COMMENT ON COLUMN participant_users.created_at IS + 'Up to and including Postgres v13 EXTRACT function returns double type and thus this column is susceptible to microseconds rounding errors from around year 2250. In contrast, in Postgres v14 EXTRACT returns numeric type which effectively has arbitrary precision for this use case'; + +CREATE TABLE participant_user_rights ( + user_internal_id INTEGER NOT NULL REFERENCES participant_users (internal_id) ON DELETE CASCADE, + user_right INTEGER NOT NULL, + for_party VARCHAR(512) NOT NULL, + granted_at BIGINT NOT NULL DEFAULT (1000 * 1000 * EXTRACT(epoch FROM CURRENT_TIMESTAMP))::bigint, + UNIQUE (user_internal_id, user_right, for_party) +); +COMMENT ON COLUMN participant_user_rights.granted_at IS + 'Up to and including Postgres v13 EXTRACT function returns double type and thus this column is susceptible to microseconds rounding errors from around year 2250. In contrast, in Postgres v14 EXTRACT returns numeric type which effectively has arbitrary precision for this use case'; + +INSERT INTO participant_users(user_id, primary_party) VALUES ('participant_admin', '!'); +INSERT INTO participant_user_rights(user_internal_id, user_right, for_party) + SELECT internal_id, 1, '!' + FROM participant_users + WHERE user_id = 'participant_admin'; \ No newline at end of file diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala index 147db70f9da7..a7c034ef503e 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala @@ -4,13 +4,15 @@ package com.daml.platform.store.backend import java.sql.Connection +import javax.sql.DataSource -import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails} +import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails, User, UserRight} import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.UserId import com.daml.lf.data.Time.Timestamp import com.daml.lf.ledger.EventId import com.daml.logging.LoggingContext @@ -23,7 +25,6 @@ import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState import com.daml.platform.store.interning.StringInterning import com.daml.scalautil.NeverEqualsOverride -import javax.sql.DataSource import scala.annotation.unused import scala.util.Try @@ -413,3 +414,33 @@ trait StringInterningStorageBackend { connection: Connection ): Iterable[(Int, String)] } + +trait UserManagementStorageBackend { + + def createUser(user: User)(connection: Connection): Int + + def deleteUser(id: UserId)(connection: Connection): Boolean + + def getUser(id: UserId)(connection: Connection): Option[UserManagementStorageBackend.DbUser] + + def getUsers()(connection: Connection): Vector[User] + + /** @return true if the right didn't exist and we have just added it. + */ + def addUserRight(internalId: Int, right: UserRight)( + connection: Connection + ): Boolean + + /** @return true if the right existed and we have just deleted it. + */ + def deleteUserRight(internalId: Int, right: UserRight)(connection: Connection): Boolean + + def userRightExists(internalId: Int, right: UserRight)(connection: Connection): Boolean + + def getUserRights(internalId: Int)(connection: Connection): Set[UserRight] + +} + +object UserManagementStorageBackend { + case class DbUser(internalId: Int, domainUser: User) +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala index 849a686c50a3..2b3e624c05ff 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala @@ -31,6 +31,7 @@ trait StorageBackendFactory { def createIntegrityStorageBackend: IntegrityStorageBackend def createResetStorageBackend: ResetStorageBackend def createStringInterningStorageBackend: StringInterningStorageBackend + def createUserManagementStorageBackend: UserManagementStorageBackend final def readStorageBackend( ledgerEndCache: LedgerEndCache, diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala index 80a000362a73..b5535bc5f195 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala @@ -10,6 +10,7 @@ import com.daml.platform.store.backend.{ ParameterStorageBackend, StorageBackendFactory, StringInterningStorageBackend, + UserManagementStorageBackend, } import com.daml.platform.store.cache.LedgerEndCache @@ -31,4 +32,8 @@ trait CommonStorageBackendFactory extends StorageBackendFactory { override val createStringInterningStorageBackend: StringInterningStorageBackend = StringInterningStorageBackendTemplate + + override val createUserManagementStorageBackend: UserManagementStorageBackend = + UserManagementStorageBackendTemplate + } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/UserManagementStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/UserManagementStorageBackendTemplate.scala new file mode 100644 index 000000000000..2eeebcca245e --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/UserManagementStorageBackendTemplate.scala @@ -0,0 +1,204 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.store.backend.common + +import java.sql.Connection + +import anorm.SqlParser.{int, str} +import anorm.{RowParser, SqlParser, SqlStringInterpolation, ~} +import com.daml.ledger.api.domain +import com.daml.ledger.api.domain.UserRight +import com.daml.ledger.api.domain.UserRight.{CanActAs, CanReadAs, ParticipantAdmin} +import com.daml.ledger.api.v1.admin.user_management_service.Right +import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.UserId +import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf +import com.daml.platform.store.backend.UserManagementStorageBackend + +import scala.util.Try + +object UserManagementStorageBackendTemplate extends UserManagementStorageBackend { + + /** Marker value for absence of party in a db table (`primary_party` and `for_party` columns). + * + * Oracle doesn't distinguish between empty strings and NULLs. + * When we use NULL/empty string in Oracle then in Postgres we have two choices: + * a) Choosing NULL: we would have to use partial index to ensure multi-column uniqueness constrains (`UNIQUE (user_internal_id, user_right, for_party)` index). + * b) Choosing empty string: we would end up with DB aware backends having ot use `... is NULL` in Oracle vs. `... = ''` in Postgres + */ + private val AbsenceOfPartyMarker = "!" + + private val ParticipantUserParser: RowParser[(Int, String, String)] = + int("internal_id") ~ str("user_id") ~ str("primary_party") map { + case internalId ~ userId ~ primaryParty => + (internalId, userId, primaryParty) + } + + private val ParticipantUserParser2: RowParser[(String, String)] = + str("user_id") ~ str("primary_party") map { case userId ~ primaryParty => + (userId, primaryParty) + } + + private val UserRightParser: RowParser[(Int, String)] = + int("user_right") ~ str("for_party") map { case user_right ~ for_party => + (user_right, for_party) + } + + private val IntParser0: RowParser[Int] = + int("dummy") map { i => i } + + override def createUser(user: domain.User)( + connection: Connection + ): Int = { + val internalId: Try[Int] = + SQL""" + INSERT INTO participant_users (user_id, primary_party) + VALUES (${user.id: String}, ${partyToDbString(user.primaryParty)}) + """.executeInsert1("internal_id")(SqlParser.scalar[Int].single)(connection) + internalId.get + } + + override def getUser( + id: UserId + )(connection: Connection): Option[UserManagementStorageBackend.DbUser] = { + SQL""" + SELECT internal_id, user_id, primary_party, created_at + FROM participant_users + WHERE user_id = ${id: String} + """ + .as(ParticipantUserParser.singleOpt)(connection) + .map { case (internalId, userId, primaryPartyRaw) => + UserManagementStorageBackend.DbUser( + internalId = internalId, + domainUser = domain.User( + id = Ref.UserId.assertFromString(userId), + primaryParty = dbStringToPartyString(primaryPartyRaw), + ), + ) + } + } + + override def getUsers()(connection: Connection): Vector[domain.User] = { + def domainUser(userId: String, primaryParty: Option[String]): domain.User = { + domain.User( + Ref.UserId.assertFromString(userId), + primaryParty.map(Ref.Party.assertFromString), + ) + } + SQL"""SELECT internal_id, user_id, primary_party + FROM participant_users""" + .asVectorOf(ParticipantUserParser2)(connection) + .map { case (userId, primaryPartyRaw) => + domainUser(userId, dbStringToPartyString(primaryPartyRaw)) + } + } + + override def deleteUser(userId: Ref.UserId)(connection: Connection): Boolean = { + val updatedRowsCount = + SQL""" + DELETE FROM participant_users WHERE user_id = ${userId: String} + """.executeUpdate()(connection) + updatedRowsCount == 1 + } + + override def userRightExists(internalId: Int, right: UserRight)( + connection: Connection + ): Boolean = { + val (userRight: Int, forParty: String) = fromUserRight(right) + + import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation + val res: Seq[_] = + SQL""" + SELECT 1 AS dummy + FROM participant_user_rights ur + WHERE ur.user_internal_id = ${internalId} + AND + ur.user_right = ${userRight} + AND + ur.for_party = $forParty""".asVectorOf(IntParser0)(connection) + assert(res.length <= 1) + res.length == 1 + } + + override def addUserRight(internalId: Int, right: UserRight)( + connection: Connection + ): Boolean = { + val (userRight: Int, forParty: String) = fromUserRight(right) + val rowsUpdated: Int = + SQL""" + INSERT INTO participant_user_rights (user_internal_id, user_right, for_party) + VALUES ( + ${internalId}, + ${userRight}, + ${forParty} + ) + """.executeUpdate()(connection) + rowsUpdated == 1 + } + + override def getUserRights(internalId: Int)(connection: Connection): Set[domain.UserRight] = { + val rec: Seq[(Int, String)] = + SQL""" + SELECT ur.user_right, ur.for_party + FROM participant_user_rights ur + WHERE ur.user_internal_id = ${internalId} + """.asVectorOf(UserRightParser)(connection) + rec.map { case (userRight, forPartyRaw) => + makeUserRight( + value = userRight, + partyRaw = forPartyRaw, + ) + }.toSet + } + + override def deleteUserRight(internalId: Int, right: domain.UserRight)( + connection: Connection + ): Boolean = { + val (userRight: Int, forParty: String) = fromUserRight(right) + val updatedRowCount: Int = + SQL""" + DELETE FROM participant_user_rights ur + WHERE + ur.user_internal_id = ${internalId} + AND + ur.user_right = ${userRight} + AND + ur.for_party = ${forParty} + """.executeUpdate()(connection) + updatedRowCount == 1 + } + + private def makeUserRight(value: Int, partyRaw: String): UserRight = { + val partyO = dbStringToPartyString(partyRaw) + (value, partyO) match { + case (Right.PARTICIPANT_ADMIN_FIELD_NUMBER, None) => ParticipantAdmin + case (Right.CAN_ACT_AS_FIELD_NUMBER, Some(party)) => CanActAs(party) + case (Right.CAN_READ_AS_FIELD_NUMBER, Some(party)) => CanReadAs(party) + case _ => + throw new RuntimeException(s"Could not convert ${(value, partyO)} to a user right!") + } + } + + private def fromUserRight(right: UserRight): (Int, String) = { + right match { + case ParticipantAdmin => (Right.PARTICIPANT_ADMIN_FIELD_NUMBER, AbsenceOfPartyMarker) + case CanActAs(party) => (Right.CAN_ACT_AS_FIELD_NUMBER, party: String) + case CanReadAs(party) => (Right.CAN_READ_AS_FIELD_NUMBER, party: String) + case _ => + throw new RuntimeException(s"Could not recognize user right: $right!") + } + } + + private def dbStringToPartyString(raw: String): Option[Ref.Party] = { + if (raw == AbsenceOfPartyMarker) + None + else + Some(Ref.Party.assertFromString(raw)) + } + + private def partyToDbString(party: Option[Ref.Party]): String = { + (party: Option[String]).getOrElse(AbsenceOfPartyMarker) + } + +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2ResetStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2ResetStorageBackend.scala index 2a3c623b15f9..47823f95768e 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2ResetStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2ResetStorageBackend.scala @@ -24,6 +24,8 @@ object H2ResetStorageBackend extends ResetStorageBackend { |truncate table party_entries; |truncate table string_interning; |truncate table participant_events_create_filter; + |truncate table participant_users; + |truncate table participant_user_rights; |set referential_integrity true;""".stripMargin) .execute()(connection) () @@ -44,6 +46,8 @@ object H2ResetStorageBackend extends ResetStorageBackend { |truncate table party_entries; |truncate table string_interning; |truncate table participant_events_create_filter; + |truncate table participant_users; + |truncate table participant_user_rights; |set referential_integrity true;""".stripMargin) .execute()(connection) () diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleResetStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleResetStorageBackend.scala index e64978805a0c..0a1ce5707332 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleResetStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleResetStorageBackend.scala @@ -23,6 +23,8 @@ object OracleResetStorageBackend extends ResetStorageBackend { "truncate table party_entries cascade", "truncate table string_interning cascade", "truncate table participant_events_create_filter cascade", + "truncate table participant_users cascade", + "truncate table participant_user_rights cascade", ).map(SQL(_)).foreach(_.execute()(connection)) override def resetAll(connection: Connection): Unit = @@ -40,5 +42,7 @@ object OracleResetStorageBackend extends ResetStorageBackend { "truncate table party_entries cascade", "truncate table string_interning cascade", "truncate table participant_events_create_filter cascade", + "truncate table participant_users cascade", + "truncate table participant_user_rights cascade", ).map(SQL(_)).foreach(_.execute()(connection)) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresResetStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresResetStorageBackend.scala index 2cf817de97d6..fb29be28f1dc 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresResetStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresResetStorageBackend.scala @@ -22,6 +22,8 @@ object PostgresResetStorageBackend extends ResetStorageBackend { |truncate table party_entries cascade; |truncate table string_interning cascade; |truncate table participant_events_create_filter cascade; + |truncate table participant_users cascade; + |truncate table participant_user_rights cascade; |""".stripMargin) .execute()(connection) () @@ -41,6 +43,8 @@ object PostgresResetStorageBackend extends ResetStorageBackend { |truncate table party_entries cascade; |truncate table string_interning cascade; |truncate table participant_events_create_filter cascade; + |truncate table participant_users cascade; + |truncate table participant_user_rights cascade; |""".stripMargin) .execute()(connection) () diff --git a/ledger/participant-integration-api/src/main/scala/platform/usermanagement/CachedUserManagementStore.scala b/ledger/participant-integration-api/src/main/scala/platform/usermanagement/CachedUserManagementStore.scala new file mode 100644 index 000000000000..26209fae78a4 --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/usermanagement/CachedUserManagementStore.scala @@ -0,0 +1,98 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.usermanagement + +import java.time.Duration +import java.util.concurrent.{CompletableFuture, Executor} + +import com.daml.caching.CaffeineCache +import com.daml.ledger.api.domain +import com.daml.ledger.api.domain.User +import com.daml.ledger.participant.state.index.v2.UserManagementStore +import com.daml.ledger.participant.state.index.v2.UserManagementStore.{Result, UserInfo, Users} +import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.UserId +import com.daml.metrics.Metrics +import com.github.benmanes.caffeine.cache.AsyncCacheLoader +import com.github.benmanes.caffeine.{cache => caffeine} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +class CachedUserManagementStore( + delegate: UserManagementStore, + expiryAfterWriteInSeconds: Int, + maximumCacheSize: Int, + metrics: Metrics, +)(implicit val executionContext: ExecutionContext) + extends UserManagementStore { + + private val cache: CaffeineCache.AsyncLoadingCacheScala[Ref.UserId, Result[UserInfo]] = + new CaffeineCache.AsyncLoadingCacheScala( + caffeine.Caffeine + .newBuilder() + .expireAfterWrite(Duration.ofSeconds(expiryAfterWriteInSeconds.toLong)) + .maximumSize(maximumCacheSize.toLong) + .buildAsync( + new AsyncCacheLoader[UserId, Result[UserInfo]] { + override def asyncLoad( + key: UserId, + executor: Executor, + ): CompletableFuture[Result[UserInfo]] = { + val cf = new CompletableFuture[Result[UserInfo]] + delegate.getUserInfo(key).onComplete { + case Success(value) => cf.complete(value) + case Failure(e) => cf.completeExceptionally(e) + } + cf + } + } + ), + metrics.daml.userManagement.cache, + ) + + override def getUserInfo(id: UserId): Future[Result[UserManagementStore.UserInfo]] = { + cache.get(id) + } + + override def createUser(user: User, rights: Set[domain.UserRight]): Future[Result[Unit]] = + delegate.createUser(user, rights) + + override def deleteUser(id: UserId): Future[Result[Unit]] = { + delegate + .deleteUser(id) + .map(tapInvalidateOnSuccess(id)) + } + + override def grantRights( + id: UserId, + rights: Set[domain.UserRight], + ): Future[Result[Set[domain.UserRight]]] = { + delegate + .grantRights(id, rights) + .map(tapInvalidateOnSuccess(id)) + } + + override def revokeRights( + id: UserId, + rights: Set[domain.UserRight], + ): Future[Result[Set[domain.UserRight]]] = { + delegate + .revokeRights(id, rights) + .map(tapInvalidateOnSuccess(id)) + } + + override def listUsers(): Future[Result[Users]] = { + delegate.listUsers() + } + + private def tapInvalidateOnSuccess[T](id: UserId)(r: Result[T]): Result[T] = { + r match { + case Right(_) => cache.invalidate(id) + case Left(_) => + } + r + } + +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/usermanagement/PersistentUserManagementStore.scala b/ledger/participant-integration-api/src/main/scala/platform/usermanagement/PersistentUserManagementStore.scala new file mode 100644 index 000000000000..92c22bae42bb --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/usermanagement/PersistentUserManagementStore.scala @@ -0,0 +1,200 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.usermanagement + +import java.sql.Connection + +import com.daml.ledger.api.domain +import com.daml.ledger.participant.state.index.v2.UserManagementStore +import com.daml.ledger.participant.state.index.v2.UserManagementStore.{ + Result, + UserExists, + UserInfo, + UserNotFound, + Users, +} +import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.UserId +import com.daml.logging.{ContextualizedLogger, LoggingContext} +import com.daml.metrics.{DatabaseMetrics, Metrics} +import com.daml.platform.store.appendonlydao.DbDispatcher +import com.daml.platform.store.backend.UserManagementStorageBackend +import com.daml.platform.store.backend.common.UserManagementStorageBackendTemplate + +import scala.concurrent.{ExecutionContext, Future} + +object UserManagementConfig { + + val default: UserManagementConfig = UserManagementConfig( + maximumCacheSize = 100, + cacheExpiryAfterWriteInSeconds = 5, + ) +} +final case class UserManagementConfig( + maximumCacheSize: Int, + cacheExpiryAfterWriteInSeconds: Int, +) + +object PersistentUserManagementStore { + def cached( + dbDispatcher: DbDispatcher, + metrics: Metrics, + cacheExpiryAfterWriteInSeconds: Int, + maximumCacheSize: Int, + )(implicit executionContext: ExecutionContext): UserManagementStore = { + new CachedUserManagementStore( + delegate = new PersistentUserManagementStore( + dbDispatcher = dbDispatcher, + metrics = metrics, + ), + expiryAfterWriteInSeconds = cacheExpiryAfterWriteInSeconds, + maximumCacheSize = maximumCacheSize, + metrics = metrics, + ) + } +} + +class PersistentUserManagementStore( + dbDispatcher: DbDispatcher, + metrics: Metrics, +) extends UserManagementStore { + + private val backend: UserManagementStorageBackend = UserManagementStorageBackendTemplate + private val logger = ContextualizedLogger.get(getClass) + + implicit private val loggingContext: LoggingContext = LoggingContext.newLoggingContext(identity) + + override def getUserInfo(id: UserId): Future[Result[UserInfo]] = { + inTransaction(_.getUserInfo) { implicit connection => + withUser(id) { dbUser => + val rights = backend.getUserRights(internalId = dbUser.internalId)(connection) + UserInfo(dbUser.domainUser, rights) + } + } + } + + override def createUser( + user: domain.User, + rights: Set[domain.UserRight], + ): Future[Result[Unit]] = { + inTransaction(_.createUser) { implicit connection: Connection => + withoutUser(user.id) { + val internalId = backend.createUser(user)(connection) + rights.foreach(right => + backend.addUserRight(internalId = internalId, right = right)( + connection + ) + ) + () + } + }.map(tapSuccess { _ => + logger.info( + s"Created new user: ${user} with ${rights.size} rights: ${rightsDigestText(rights)}" + ) + })(scala.concurrent.ExecutionContext.parasitic) + } + + override def deleteUser(id: UserId): Future[Result[Unit]] = { + inTransaction(_.deleteUser) { implicit connection => + if (!backend.deleteUser(id = id)(connection = connection)) { + Left(UserNotFound(userId = id)) + } else { + Right(()) + } + }.map(tapSuccess { _ => + logger.info(s"Deleted user with id: ${id}") + })(scala.concurrent.ExecutionContext.parasitic) + } + + override def grantRights( + id: UserId, + rights: Set[domain.UserRight], + ): Future[Result[Set[domain.UserRight]]] = { + inTransaction(_.grantRights) { implicit connection => + withUser(id = id) { user => + val addedRights = rights.filter { right => + if (!backend.userRightExists(internalId = user.internalId, right = right)(connection)) { + backend.addUserRight( + internalId = user.internalId, + right = right, + )(connection) + } else { + false + } + } + addedRights + } + }.map(tapSuccess { grantedRights => + logger.info( + s"Granted ${grantedRights.size} user rights to user ${id}: ${rightsDigestText(grantedRights)}" + ) + })(scala.concurrent.ExecutionContext.parasitic) + } + + override def revokeRights( + id: UserId, + rights: Set[domain.UserRight], + ): Future[Result[Set[domain.UserRight]]] = { + inTransaction(_.revokeRights) { implicit connection => + withUser(id = id) { user => + val revokedRights = rights.filter { right => + if (backend.userRightExists(internalId = user.internalId, right = right)(connection)) { + backend.deleteUserRight(internalId = user.internalId, right = right)(connection) + } else { + false + } + } + revokedRights + } + }.map(tapSuccess { revokedRights => + logger.info( + s"Revoked ${revokedRights.size} user rights from user ${id}: ${rightsDigestText(revokedRights)}" + ) + })(scala.concurrent.ExecutionContext.parasitic) + + } + + override def listUsers(): Future[Result[Users]] = { + inTransaction(_.listUsers) { connection => + Right(backend.getUsers()(connection)) + } + } + + private def inTransaction[T]( + dbMetric: metrics.daml.userManagement.type => DatabaseMetrics + )(thunk: Connection => T): Future[T] = { + dbDispatcher.executeSql(dbMetric(metrics.daml.userManagement))(thunk) + } + + private def withUser[T]( + id: Ref.UserId + )(f: UserManagementStorageBackend.DbUser => T)(implicit connection: Connection): Result[T] = { + backend.getUser(id = id)(connection) match { + case Some(user) => Right(f(user)) + case None => Left(UserNotFound(userId = id)) + } + } + + private def withoutUser[T]( + id: Ref.UserId + )(t: => T)(implicit connection: Connection): Result[T] = { + backend.getUser(id = id)(connection) match { + case Some(user) => Left(UserExists(userId = user.domainUser.id)) + case None => Right(t) + } + } + + private def tapSuccess[T](f: T => Unit)(r: Result[T]): Result[T] = { + r.map { v => + f(v) + v + } + } + + private def rightsDigestText(rights: Iterable[domain.UserRight]): String = { + val closingBracket = if (rights.size > 5) ", ..." else "" + rights.take(5).mkString("", ", ", closingBracket) + } + +} diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala index a8e52c3dfa92..af98ebe59688 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala @@ -53,9 +53,8 @@ private[backend] trait StorageBackendProvider { } } -private[backend] trait StorageBackendProviderPostgres - extends StorageBackendProvider - with PostgresAroundAll { this: Suite => +trait StorageBackendProviderPostgres extends StorageBackendProvider with PostgresAroundAll { + this: Suite => override protected def jdbcUrl: String = postgresDatabase.url override protected val backend: TestBackend = TestBackend(PostgresStorageBackendFactory) } @@ -90,6 +89,7 @@ case class TestBackend( stringInterning: StringInterningStorageBackend, ledgerEndCache: MutableLedgerEndCache, stringInterningSupport: MockStringInterning, + userManagement: UserManagementStorageBackend, ) object TestBackend { @@ -114,6 +114,7 @@ object TestBackend { stringInterning = storageBackendFactory.createStringInterningStorageBackend, ledgerEndCache = ledgerEndCache, stringInterningSupport = stringInterning, + userManagement = storageBackendFactory.createUserManagementStorageBackend, ) } } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala index 1b1fb57a61a9..603665ccf5e8 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSuite.scala @@ -17,6 +17,7 @@ trait StorageBackendSuite with StorageBackendTestsIntegrity with StorageBackendTestsDeduplication with StorageBackendTestsTimestamps - with StorageBackendTestsStringInterning { + with StorageBackendTestsStringInterning + with StorageBackendTestsUserManagement { this: AnyFlatSpec => } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsUserManagement.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsUserManagement.scala new file mode 100644 index 000000000000..77a394f7f4a3 --- /dev/null +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsUserManagement.scala @@ -0,0 +1,178 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.store.backend + +import java.sql.SQLException +import java.util.UUID + +import com.daml.ledger.api.domain.User +import com.daml.ledger.api.domain.UserRight.{CanActAs, CanReadAs, ParticipantAdmin} +import com.daml.lf.data.Ref +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{Inside, OptionValues} + +private[backend] trait StorageBackendTestsUserManagement + extends Matchers + with Inside + with StorageBackendSpec + with OptionValues { + this: AnyFlatSpec => + + behavior of "StorageBackend (user management)" + + // Representative values for each kind of user right + private val right1 = ParticipantAdmin + private val right2 = CanActAs(Ref.Party.assertFromString("party_act_as_1")) + private val right3 = CanReadAs(Ref.Party.assertFromString("party_read_as_1")) + + private def tested = backend.userManagement + + it should "use invalid party string to mark absence of party" in { + intercept[IllegalArgumentException]( + Ref.Party.assertFromString("!") + ).getMessage shouldBe "non expected character 0x21 in Daml-LF Party \"!\"" + } + + it should "create user (createUser)" in { + val user1 = newUniqueUser() + val user2 = newUniqueUser() + val internalId1 = executeSql(tested.createUser(user1)) + // Attempting to add a duplicate user + assertThrows[SQLException](executeSql(tested.createUser(user1))) + val internalId2 = executeSql(tested.createUser(user2)) + val _ = executeSql(tested.createUser(newUniqueUser(emptyPrimaryParty = true))) + internalId1 should not equal internalId2 + internalId1 should not equal internalId2 + } + + it should "handle user ops (getUser, deleteUser)" in { + val user1 = newUniqueUser() + val user2 = newUniqueUser() + val _ = executeSql(tested.createUser(user1)) + val getExisting = executeSql(tested.getUser(user1.id)) + val deleteExisting = executeSql(tested.deleteUser(user1.id)) + val deleteNonexistent = executeSql(tested.deleteUser(user2.id)) + val getDeleted = executeSql(tested.getUser(user1.id)) + val getNonexistent = executeSql(tested.getUser(user2.id)) + getExisting.value.domainUser shouldBe user1 + deleteExisting shouldBe true + deleteNonexistent shouldBe false + getDeleted shouldBe None + getNonexistent shouldBe None + } + + it should "get users (getUsers)" in { + val user1 = newUniqueUser() + val user2 = newUniqueUser() + val emptyUsers = executeSql(tested.getUsers()) + val _ = executeSql(tested.createUser(user1)) + val _ = executeSql(tested.createUser(user2)) + val allUsers = executeSql(tested.getUsers()) + emptyUsers shouldBe empty + allUsers should contain theSameElementsAs Seq(user1, user2) + } + + it should "handle adding rights to non-existent user" in { + val nonExistentUserInternalId = 123 + val allUsers = executeSql(tested.getUsers()) + val _ = executeSql(tested.userRightExists(nonExistentUserInternalId, right2)) + allUsers shouldBe empty + } + + it should "handle adding duplicate rights" in { + val user1 = newUniqueUser() + val adminRight = ParticipantAdmin + val readAsRight = CanReadAs(Ref.Party.assertFromString("party_read_as_1")) + val actAsRight = CanActAs(Ref.Party.assertFromString("party_act_as_1")) + val internalId = executeSql(tested.createUser(user = user1)) + val addOk1 = executeSql(tested.addUserRight(internalId, adminRight)) + // Attempting to add a duplicate user admin right + assertThrows[SQLException](executeSql(tested.addUserRight(internalId, adminRight))) + val addOk2 = executeSql(tested.addUserRight(internalId, readAsRight)) + // Attempting to add a duplicate user readAs right + assertThrows[SQLException](executeSql(tested.addUserRight(internalId, readAsRight))) + val addOk3 = executeSql(tested.addUserRight(internalId, actAsRight)) + // Attempting to add a duplicate user actAs right + assertThrows[SQLException](executeSql(tested.addUserRight(internalId, actAsRight))) + addOk1 shouldBe true + addOk2 shouldBe true + addOk3 shouldBe true + } + + it should "handle removing absent rights" in { + val user1 = newUniqueUser() + val internalId = executeSql(tested.createUser(user1)) + val delete1 = executeSql(tested.deleteUserRight(internalId, right1)) + val delete2 = executeSql(tested.deleteUserRight(internalId, right2)) + val delete3 = executeSql(tested.deleteUserRight(internalId, right3)) + delete1 shouldBe false + delete2 shouldBe false + delete3 shouldBe false + } + + it should "handle multiple rights (getUserRights, addUserRight, deleteUserRight)" in { + val user1 = newUniqueUser() + val internalId = executeSql(tested.createUser(user1)) + val rights1 = executeSql(tested.getUserRights(internalId)) + val addRight1 = executeSql(tested.addUserRight(internalId, right1)) + val addRight2 = executeSql(tested.addUserRight(internalId, right2)) + val addRight3 = executeSql(tested.addUserRight(internalId, right3)) + val rights2 = executeSql(tested.getUserRights(internalId)) + val deleteRight2 = executeSql(tested.deleteUserRight(internalId, right2)) + val rights3 = executeSql(tested.getUserRights(internalId)) + val deleteRight3 = executeSql(tested.deleteUserRight(internalId, right3)) + val rights4 = executeSql(tested.getUserRights(internalId)) + rights1 shouldBe empty + addRight1 shouldBe true + addRight2 shouldBe true + addRight3 shouldBe true + rights2 should contain theSameElementsAs Seq(right1, right2, right3) + deleteRight2 shouldBe true + rights3 should contain theSameElementsAs Seq(right1, right3) + deleteRight3 shouldBe true + rights4 should contain theSameElementsAs Seq(right1) + } + + it should "add and delete a single right (userRightExists, addUserRight, deleteUserRight, getUserRights)" in { + val user1 = newUniqueUser() + val internalId = executeSql(tested.createUser(user1)) + // no rights + val rightExists0 = executeSql(tested.userRightExists(internalId, right1)) + val rights0 = executeSql(tested.getUserRights(internalId)) + // add one rights + val addRight = executeSql(tested.addUserRight(internalId, right1)) + val rightExists1 = executeSql(tested.userRightExists(internalId, right1)) + val rights1 = executeSql(tested.getUserRights(internalId)) + // delete + val deleteRight = executeSql(tested.deleteUserRight(internalId, right1)) + val rightExists2 = executeSql(tested.userRightExists(internalId, right1)) + val rights2 = executeSql(tested.getUserRights(internalId)) + // no rights + rightExists0 shouldBe false + rights0 shouldBe empty + // added right + addRight shouldBe true + rightExists1 shouldBe true + rights1 should contain theSameElementsAs Seq(right1) + // deleted right + deleteRight shouldBe true + rightExists2 shouldBe false + rights2 shouldBe empty + } + + private def newUniqueUser(emptyPrimaryParty: Boolean = false): User = { + val uuid = UUID.randomUUID.toString + val primaryParty = + if (emptyPrimaryParty) + None + else + Some(Ref.Party.assertFromString(s"primary_party_${uuid}")) + User( + id = Ref.UserId.assertFromString(s"user_id_${uuid}"), + primaryParty = primaryParty, + ) + } + +} diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/usermanagement/CachedUserManagementStoreSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/usermanagement/CachedUserManagementStoreSpec.scala new file mode 100644 index 000000000000..0028724811bc --- /dev/null +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/usermanagement/CachedUserManagementStoreSpec.scala @@ -0,0 +1,139 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.usermanagement + +import com.codahale.metrics.MetricRegistry +import com.daml.ledger.api.domain.{User, UserRight} +import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore +import com.daml.ledger.participant.state.index.v2.UserManagementStore.{UserInfo, UserNotFound} +import com.daml.ledger.resources.TestResourceContext +import com.daml.lf.data.Ref +import com.daml.metrics.Metrics +import org.mockito.{ArgumentMatchersSugar, MockitoSugar} +import org.scalatest.freespec.AsyncFreeSpec +import org.scalatest.matchers.should.Matchers + +class CachedUserManagementStoreSpec + extends AsyncFreeSpec + with TestResourceContext + with Matchers + with MockitoSugar + with ArgumentMatchersSugar { + + private val user = User( + id = Ref.UserId.assertFromString("user_id1"), + primaryParty = Some(Ref.Party.assertFromString("primary_party1")), + ) + private val right1 = UserRight.CanActAs(Ref.Party.assertFromString("party_id1")) + private val right2 = UserRight.ParticipantAdmin + private val right3 = UserRight.CanActAs(Ref.Party.assertFromString("party_id2")) + private val rights = Set(right1, right2) + private val userInfo = UserInfo(user, rights) + + "test cache population" in { + val delegate = spy(new InMemoryUserManagementStore()) + val tested = createTested(delegate) + + for { + _ <- tested.createUser(userInfo.user, userInfo.rights) + get1 <- tested.getUserInfo(user.id) + get2 <- tested.getUserInfo(user.id) + getUser <- tested.getUser(user.id) + listRights <- tested.listUserRights(user.id) + } yield { + verify(delegate, times(1)).createUser(userInfo.user, userInfo.rights) + verify(delegate, times(1)).getUserInfo(userInfo.user.id) + verifyNoMoreInteractions(delegate) + get1 shouldBe Right(userInfo) + get2 shouldBe Right(userInfo) + getUser shouldBe Right(userInfo.user) + listRights shouldBe Right(userInfo.rights) + } + } + + "test cache invalidation after every write method" in { + val delegate = spy(new InMemoryUserManagementStore()) + val tested = createTested(delegate) + + val userInfo = UserInfo(user, rights) + + for { + _ <- tested.createUser(userInfo.user, userInfo.rights) + get1 <- tested.getUserInfo(user.id) + _ <- tested.grantRights(user.id, Set(right1)) + get2 <- tested.getUserInfo(user.id) + _ <- tested.revokeRights(user.id, Set(right3)) + get3 <- tested.getUserInfo(user.id) + _ <- tested.deleteUser(user.id) + get4 <- tested.getUserInfo(user.id) + } yield { + val order = inOrder(delegate) + order.verify(delegate, times(1)).createUser(user, userInfo.rights) + order.verify(delegate, times(1)).getUserInfo(user.id) + order.verify(delegate, times(1)).grantRights(eqTo(user.id), any[Set[UserRight]]) + order.verify(delegate, times(1)).getUserInfo(userInfo.user.id) + order.verify(delegate, times(1)).revokeRights(eqTo(user.id), any[Set[UserRight]]) + order.verify(delegate, times(1)).getUserInfo(userInfo.user.id) + order.verify(delegate, times(1)).deleteUser(userInfo.user.id) + order.verify(delegate, times(1)).getUserInfo(userInfo.user.id) + order.verifyNoMoreInteractions() + get1 shouldBe Right(userInfo) + get2 shouldBe Right(userInfo) + get3 shouldBe Right(userInfo) + get4 shouldBe Left(UserNotFound(user.id)) + } + } + + "listing all users should not be cached" in { + val delegate = spy(new InMemoryUserManagementStore(createAdmin = false)) + val tested = createTested(delegate) + + for { + res0 <- tested.createUser(user, rights) + res1 <- tested.listUsers() + res2 <- tested.listUsers() + } yield { + val order = inOrder(delegate) + order.verify(delegate, times(1)).createUser(user, rights) + order.verify(delegate, times(2)).listUsers() + order.verifyNoMoreInteractions() + res0 shouldBe Right(()) + res1 shouldBe Right(Seq(user)) + res2 shouldBe Right(Seq(user)) + } + } + + "cache entries expire after a set time" in { + val delegate = spy(new InMemoryUserManagementStore()) + val tested = createTested(delegate) + + for { + create1 <- tested.createUser(user, rights) + get1 <- tested.getUserInfo(user.id) + get2 <- tested.getUserInfo(user.id) + get3 <- { + Thread.sleep(2000); tested.getUserInfo(user.id) + } + } yield { + val order = inOrder(delegate) + order.verify(delegate, times(1)).createUser(any[User], any[Set[UserRight]]) + order.verify(delegate, times(2)).getUserInfo(any[Ref.UserId]) + order.verifyNoMoreInteractions() + create1 shouldBe Right(()) + get1 shouldBe Right(userInfo) + get2 shouldBe Right(userInfo) + get3 shouldBe Right(userInfo) + } + } + + private def createTested(delegate: InMemoryUserManagementStore): CachedUserManagementStore = { + new CachedUserManagementStore( + delegate, + expiryAfterWriteInSeconds = 1, + maximumCacheSize = 10, + new Metrics(new MetricRegistry), + ) + } + +} diff --git a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/impl/inmemory/InMemoryUserManagementStore.scala b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/impl/inmemory/InMemoryUserManagementStore.scala index 0e80a4a08638..529d4ef2e875 100644 --- a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/impl/inmemory/InMemoryUserManagementStore.scala +++ b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/impl/inmemory/InMemoryUserManagementStore.scala @@ -7,27 +7,31 @@ import com.daml.ledger.api.domain.{User, UserRight} import com.daml.ledger.participant.state.index.v2.UserManagementStore import com.daml.ledger.participant.state.index.v2.UserManagementStore._ import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.UserId import scala.collection.mutable import scala.concurrent.Future -class InMemoryUserManagementStore extends UserManagementStore { +class InMemoryUserManagementStore(createAdmin: Boolean = true) extends UserManagementStore { import InMemoryUserManagementStore._ // Underlying mutable map to keep track of UserInfo state. // Structured so we can use a ConcurrentHashMap (to more closely mimic a real implementation, where performance is key). // We synchronize on a private object (the mutable map), not the service (which could cause deadlocks). // (No need to mark state as volatile -- rely on synchronized to establish the JMM's happens-before relation.) - private val state: mutable.Map[Ref.UserId, UserInfo] = mutable.Map(AdminUser.toStateEntry) + private val state: mutable.Map[Ref.UserId, UserInfo] = mutable.Map() + if (createAdmin) { + state.put(AdminUser.user.id, AdminUser) + } + + override def getUserInfo(id: UserId): Future[Result[UserManagementStore.UserInfo]] = + withUser(id)(identity) override def createUser(user: User, rights: Set[UserRight]): Future[Result[Unit]] = withoutUser(user.id) { state.update(user.id, UserInfo(user, rights)) } - override def getUser(id: Ref.UserId): Future[Result[User]] = - withUser(id)(_.user) - override def deleteUser(id: Ref.UserId): Future[Result[Unit]] = withUser(id) { _ => state.remove(id) @@ -60,9 +64,6 @@ class InMemoryUserManagementStore extends UserManagementStore { effectivelyRevoked } - override def listUserRights(id: Ref.UserId): Future[Result[Set[UserRight]]] = - withUser(id)(_.rights) - def listUsers(): Future[Result[Users]] = withState { Right(state.values.map(_.user).toSeq) @@ -102,9 +103,7 @@ class InMemoryUserManagementStore extends UserManagementStore { } object InMemoryUserManagementStore { - case class UserInfo(user: User, rights: Set[UserRight]) { - def toStateEntry: (Ref.UserId, UserInfo) = user.id -> this - } + private val AdminUser = UserInfo( user = User(Ref.UserId.assertFromString("participant_admin"), None), rights = Set(UserRight.ParticipantAdmin), diff --git a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/UserManagementStore.scala b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/UserManagementStore.scala index bf22bdbef01b..f830beb11025 100644 --- a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/UserManagementStore.scala +++ b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/UserManagementStore.scala @@ -6,14 +6,20 @@ package com.daml.ledger.participant.state.index.v2 import com.daml.ledger.api.domain.{User, UserRight} import com.daml.lf.data.Ref -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} trait UserManagementStore { import UserManagementStore._ - def createUser(user: User, rights: Set[UserRight]): Future[Result[Unit]] + // read access + + def getUserInfo(id: Ref.UserId): Future[Result[UserInfo]] + + def listUsers(): Future[Result[Users]] + + // write access - def getUser(id: Ref.UserId): Future[Result[User]] + def createUser(user: User, rights: Set[UserRight]): Future[Result[Unit]] def deleteUser(id: Ref.UserId): Future[Result[Unit]] @@ -21,16 +27,25 @@ trait UserManagementStore { def revokeRights(id: Ref.UserId, rights: Set[UserRight]): Future[Result[Set[UserRight]]] - def listUserRights(id: Ref.UserId): Future[Result[Set[UserRight]]] + // read helpers + + final def getUser(id: Ref.UserId): Future[Result[User]] = { + getUserInfo(id).map(_.map(_.user))(ExecutionContext.parasitic) + } + + final def listUserRights(id: Ref.UserId): Future[Result[Set[UserRight]]] = { + getUserInfo(id).map(_.map(_.rights))(ExecutionContext.parasitic) + } - def listUsers(): Future[Result[Users]] } object UserManagementStore { type Result[T] = Either[Error, T] type Users = Seq[User] - sealed trait Error + case class UserInfo(user: User, rights: Set[UserRight]) + + sealed trait Error extends RuntimeException final case class UserNotFound(userId: Ref.UserId) extends Error final case class UserExists(userId: Ref.UserId) extends Error } diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala index 349361b0f62b..2568590d2f5f 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala @@ -23,6 +23,7 @@ import com.daml.platform.configuration.{ IndexConfiguration, SubmissionConfiguration, } +import com.daml.platform.usermanagement.UserManagementConfig import com.daml.ports.Port import io.netty.handler.ssl.ClientAuth import scopt.OptionParser @@ -57,9 +58,15 @@ final case class Config[Extra]( enableInMemoryFanOutForLedgerApi: Boolean, extra: Extra, enableSelfServiceErrorCodes: Boolean, + userManagementConfig: UserManagementConfig, ) { def withTlsConfig(modify: TlsConfiguration => TlsConfiguration): Config[Extra] = copy(tlsConfig = Some(modify(tlsConfig.getOrElse(TlsConfiguration.Empty)))) + + def withUserManagementConfig( + modify: UserManagementConfig => UserManagementConfig + ): Config[Extra] = + copy(userManagementConfig = modify(userManagementConfig)) } object Config { @@ -96,6 +103,7 @@ object Config { maxDeduplicationDuration = None, extra = extra, enableSelfServiceErrorCodes = true, + userManagementConfig = UserManagementConfig.default, ) def ownerWithoutExtras(name: String, args: collection.Seq[String]): ResourceOwner[Config[Unit]] = @@ -643,6 +651,27 @@ object Config { "Enables gRPC error code compatibility mode to the pre-1.18 behaviour. This option is deprecated and will be removed in a future release." ) .action((_, config: Config[Extra]) => config.copy(enableSelfServiceErrorCodes = false)) + + opt[Int]("user-management-cache-expiry") + .optional() + .text( + s"Defaults to ${UserManagementConfig.default.cacheExpiryAfterWriteInSeconds} seconds. " + + // TODO participant user management: Update max delay to 2x the configured value when made use of in throttled stream authorization. + "Determines the maximum delay for propagating user management state changes." + ) + .action((value, config: Config[Extra]) => + config.withUserManagementConfig(_.copy(cacheExpiryAfterWriteInSeconds = value)) + ) + + opt[Int]("user-management-max-cache-size") + .optional() + .text( + s"Defaults to ${UserManagementConfig.default.maximumCacheSize} entries. " + + "Determines the maximum in-memory cache size for user management state." + ) + .action((value, config: Config[Extra]) => + config.withUserManagementConfig(_.copy(maximumCacheSize = value)) + ) } extraOptions(parser) parser diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala index 1382d281a4f1..b769b06d8da3 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala @@ -15,7 +15,6 @@ import com.daml.ledger.api.v1.experimental_features.{ CommandDeduplicationPeriodSupport, CommandDeduplicationType, } -import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore import com.daml.ledger.participant.state.v2.metrics.{TimedReadService, TimedWriteService} import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.lf.engine.{Engine, EngineConfig} @@ -26,6 +25,7 @@ import com.daml.platform.apiserver.{StandaloneApiServer, StandaloneIndexService} import com.daml.platform.configuration.ServerRole import com.daml.platform.indexer.StandaloneIndexerServer import com.daml.platform.server.api.validation.ErrorFactories +import com.daml.platform.usermanagement.PersistentUserManagementStore import com.daml.platform.store.{DbSupport, LfValueTranslationCache} import com.daml.ports.Port @@ -166,8 +166,13 @@ final class Runner[T <: ReadWriteService, Extra]( metrics = metrics, ) .acquire() - userManagementStore = - new InMemoryUserManagementStore // TODO persistence wiring comes here + userManagementStore = PersistentUserManagementStore.cached( + dbDispatcher = dbSupport.dbDispatcher, + metrics = metrics, + cacheExpiryAfterWriteInSeconds = + config.userManagementConfig.cacheExpiryAfterWriteInSeconds, + maximumCacheSize = config.userManagementConfig.maximumCacheSize, + )(servicesExecutionContext) indexService <- StandaloneIndexService( dbSupport = dbSupport, ledgerId = config.ledgerId, diff --git a/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/ConfigSpec.scala b/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/ConfigSpec.scala index e9ef91d2d1e7..0b4d31238267 100644 --- a/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/ConfigSpec.scala +++ b/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/ConfigSpec.scala @@ -57,6 +57,13 @@ final class ConfigSpec getEnvVar = getEnvVar, ) + private def configParserSimple(parameters: Seq[String] = Seq.empty): Option[Config[Unit]] = + configParser( + Seq( + dumpIndexMetadataCommand, + "some-jdbc-url", + ) ++ parameters + ) behavior of "Runner" it should "disable self service error codes when compatibility gRPC error codes flag is set" in { @@ -260,5 +267,37 @@ final class ConfigSpec config.tlsConfig.value.clientAuth shouldBe ClientAuth.REQUIRE } + it should "handle '--user-management-max-cache-size' flag correctly" in { + // missing cache size value + configParserSimple( + Seq("--user-management-max-cache-size") + ) shouldBe None + // default + configParserSimple().value.userManagementConfig.maximumCacheSize shouldBe 100 + // custom value + configParserSimple( + Seq( + "--user-management-max-cache-size", + "123", + ) + ).value.userManagementConfig.maximumCacheSize shouldBe 123 + } + + it should "handle '--user-management-cache-expiry' flag correctly" in { + // missing cache size value + configParserSimple( + Seq("--user-management-cache-expiry") + ) shouldBe None + // default + configParserSimple().value.userManagementConfig.cacheExpiryAfterWriteInSeconds shouldBe 5 + // custom value + configParserSimple( + Seq( + "--user-management-cache-expiry", + "123", + ) + ).value.userManagementConfig.cacheExpiryAfterWriteInSeconds shouldBe 123 + } + private def parsingFailure(): Nothing = fail("Config parsing failed.") } diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala index ff0b23ebe7de..bbe10ed6e20e 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala @@ -54,6 +54,7 @@ import com.daml.platform.sandbox.stores.{InMemoryActiveLedgerState, SandboxIndex import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.services.time.TimeProviderType import com.daml.platform.store.{DbSupport, DbType, FlywayMigrations, LfValueTranslationCache} +import com.daml.platform.usermanagement.PersistentUserManagementStore import com.daml.ports.Port import scalaz.syntax.tag._ @@ -334,7 +335,17 @@ final class SandboxServer( case None => Resource.successful(None) } - userManagementStore = new InMemoryUserManagementStore // TODO persistence wiring comes here + userManagementStore = dbSupportOption match { + case Some(dbSupport) => + PersistentUserManagementStore.cached( + dbDispatcher = dbSupport.dbDispatcher, + metrics = metrics, + cacheExpiryAfterWriteInSeconds = + config.userManagementConfig.cacheExpiryAfterWriteInSeconds, + maximumCacheSize = config.userManagementConfig.maximumCacheSize, + )(servicesExecutionContext) + case None => new InMemoryUserManagementStore + } indexAndWriteService <- (dbSupportOption match { case Some(dbSupport: DbSupport) => SandboxIndexAndWriteService.postgres( diff --git a/ledger/sandbox-common/src/main/scala/platform/sandbox/cli/CommonCliBase.scala b/ledger/sandbox-common/src/main/scala/platform/sandbox/cli/CommonCliBase.scala index a2ac6681b1cd..3d4758fc2709 100644 --- a/ledger/sandbox-common/src/main/scala/platform/sandbox/cli/CommonCliBase.scala +++ b/ledger/sandbox-common/src/main/scala/platform/sandbox/cli/CommonCliBase.scala @@ -5,6 +5,7 @@ package com.daml.platform.sandbox.cli import java.io.File import java.time.Duration + import com.daml.buildinfo.BuildInfo import com.daml.jwt.JwtVerifierConfigurationCli import com.daml.ledger.api.auth.AuthServiceJWT @@ -19,6 +20,7 @@ import com.daml.platform.configuration.Readers._ import com.daml.platform.sandbox.cli.CommonCliBase._ import com.daml.platform.sandbox.config.{LedgerName, SandboxConfig} import com.daml.platform.services.time.TimeProviderType +import com.daml.platform.usermanagement.UserManagementConfig import com.daml.ports.Port import io.netty.handler.ssl.ClientAuth import scalaz.syntax.tag._ @@ -391,6 +393,27 @@ class CommonCliBase(name: LedgerName) { ) .action((_, config: SandboxConfig) => config.copy(enableSelfServiceErrorCodes = false)) + opt[Int]("user-management-cache-expiry") + .optional() + .text( + s"Defaults to ${UserManagementConfig.default.cacheExpiryAfterWriteInSeconds} seconds. " + + // TODO participant user management: Update max delay to 2x the configured value when made use of in throttled stream authorization. + "Determines the maximum delay for propagating user management state changes." + ) + .action((value, config: SandboxConfig) => + config.withUserManagementConfig(_.copy(cacheExpiryAfterWriteInSeconds = value)) + ) + + opt[Int]("user-management-max-cache-size") + .optional() + .text( + s"Defaults to ${UserManagementConfig.default.maximumCacheSize} entries. " + + "Determines the maximum in-memory cache size for user management state." + ) + .action((value, config: SandboxConfig) => + config.withUserManagementConfig(_.copy(maximumCacheSize = value)) + ) + com.daml.cliopts.Metrics.metricsReporterParse(this)( (setter, config) => config.copy(metricsReporter = setter(config.metricsReporter)), (setter, config) => diff --git a/ledger/sandbox-common/src/main/scala/platform/sandbox/config/SandboxConfig.scala b/ledger/sandbox-common/src/main/scala/platform/sandbox/config/SandboxConfig.scala index 7946c24288ca..e70f08c02b04 100644 --- a/ledger/sandbox-common/src/main/scala/platform/sandbox/config/SandboxConfig.scala +++ b/ledger/sandbox-common/src/main/scala/platform/sandbox/config/SandboxConfig.scala @@ -19,10 +19,12 @@ import com.daml.platform.configuration.{ } import com.daml.platform.services.time.TimeProviderType import com.daml.ports.Port - import java.io.File import java.nio.file.Path import java.time.Duration + +import com.daml.platform.usermanagement.UserManagementConfig + import scala.concurrent.duration.{DurationInt, FiniteDuration} /** Defines the basic configuration for running sandbox @@ -71,11 +73,17 @@ final case class SandboxConfig( sqlStartMode: Option[PostgresStartupMode], enableCompression: Boolean, enableSelfServiceErrorCodes: Boolean, + userManagementConfig: UserManagementConfig, ) { def withTlsConfig(modify: TlsConfiguration => TlsConfiguration): SandboxConfig = copy(tlsConfig = Some(modify(tlsConfig.getOrElse(TlsConfiguration.Empty)))) + def withUserManagementConfig( + modify: UserManagementConfig => UserManagementConfig + ): SandboxConfig = + copy(userManagementConfig = modify(userManagementConfig)) + lazy val initialLedgerConfiguration: InitialLedgerConfiguration = InitialLedgerConfiguration( Configuration.reasonableInitialConfiguration.copy( @@ -168,6 +176,7 @@ object SandboxConfig { sqlStartMode = Some(DefaultSqlStartupMode), enableCompression = false, enableSelfServiceErrorCodes = true, + userManagementConfig = UserManagementConfig.default, ) sealed abstract class EngineMode extends Product with Serializable diff --git a/ledger/sandbox-common/src/test/lib/scala/platform/sandbox/cli/CommonCliSpecBase.scala b/ledger/sandbox-common/src/test/lib/scala/platform/sandbox/cli/CommonCliSpecBase.scala index ed1273979f34..bc96e86cac70 100644 --- a/ledger/sandbox-common/src/test/lib/scala/platform/sandbox/cli/CommonCliSpecBase.scala +++ b/ledger/sandbox-common/src/test/lib/scala/platform/sandbox/cli/CommonCliSpecBase.scala @@ -364,6 +364,46 @@ abstract class CommonCliSpecBase( ) } + "handle '--user-management-max-cache-size' flag correctly" in { + // missing cache size value + checkOptionFail( + Array("--user-management-max-cache-size") + ) + // default + checkOption( + Array.empty, + _.withUserManagementConfig(_.copy(maximumCacheSize = 100)), + ) + // custom value + checkOption( + Array( + "--user-management-max-cache-size", + "123", + ), + _.withUserManagementConfig(_.copy(maximumCacheSize = 123)), + ) + } + + "handle '--user-management-cache-expiry' flag correctly" in { + // missing cache size value + checkOptionFail( + Array("--user-management-cache-expiry") + ) + // default + checkOption( + Array.empty, + _.withUserManagementConfig(_.copy(cacheExpiryAfterWriteInSeconds = 5)), + ) + // custom value + checkOption( + Array( + "--user-management-cache-expiry", + "123", + ), + _.withUserManagementConfig(_.copy(cacheExpiryAfterWriteInSeconds = 123)), + ) + } + } protected def checkOption( diff --git a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala index 94f4b39f9b21..f5b80fcbd03d 100644 --- a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala +++ b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/SandboxOnXRunner.scala @@ -19,7 +19,6 @@ import com.daml.ledger.api.v1.experimental_features.{ CommandDeduplicationType, } import com.daml.ledger.offset.Offset -import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore import com.daml.ledger.participant.state.index.v2.IndexService import com.daml.ledger.participant.state.kvutils.app.{ Config, @@ -47,6 +46,7 @@ import com.daml.platform.configuration.{PartyConfiguration, ServerRole} import com.daml.platform.indexer.StandaloneIndexerServer import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.store.{DbSupport, LfValueTranslationCache} +import com.daml.platform.usermanagement.PersistentUserManagementStore import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} import scala.util.chaining._ @@ -166,13 +166,23 @@ object SandboxOnXRunner { translationCache, ) - indexService <- standaloneIndexService( - sharedEngine, - config, - apiServerConfig, - metrics, - translationCache, - servicesExecutionContext, + dbSupport: DbSupport <- DbSupport + .owner( + jdbcUrl = apiServerConfig.jdbcUrl, + serverRole = ServerRole.ApiServer, + connectionPoolSize = apiServerConfig.databaseConnectionPoolSize, + connectionTimeout = apiServerConfig.databaseConnectionTimeout, + metrics = metrics, + ) + + indexService <- StandaloneIndexService( + ledgerId = config.ledgerId, + config = apiServerConfig, + metrics = metrics, + engine = sharedEngine, + servicesExecutionContext = servicesExecutionContext, + lfValueTranslationCache = translationCache, + dbSupport = dbSupport, ) timeServiceBackend = BridgeConfigProvider.timeServiceBackend(config) @@ -194,42 +204,12 @@ object SandboxOnXRunner { new TimedWriteService(writeService, metrics), indexerHealthChecks, timeServiceBackend, + dbSupport, ) } yield () } } - private def standaloneIndexService( - sharedEngine: Engine, - config: Config[BridgeConfig], - apiServerConfig: ApiServerConfig, - metrics: Metrics, - translationCache: LfValueTranslationCache.Cache, - servicesExecutionContext: ExecutionContextExecutorService, - )(implicit - loggingContext: LoggingContext, - materializer: Materializer, - ): ResourceOwner[IndexService] = - for { - dbSupport <- DbSupport - .owner( - jdbcUrl = apiServerConfig.jdbcUrl, - serverRole = ServerRole.ApiServer, - connectionPoolSize = apiServerConfig.databaseConnectionPoolSize, - connectionTimeout = apiServerConfig.databaseConnectionTimeout, - metrics = metrics, - ) - indexService <- StandaloneIndexService( - ledgerId = config.ledgerId, - config = apiServerConfig, - metrics = metrics, - engine = sharedEngine, - servicesExecutionContext = servicesExecutionContext, - lfValueTranslationCache = translationCache, - dbSupport = dbSupport, - ) - } yield indexService - private def buildStandaloneApiServer( sharedEngine: Engine, indexService: IndexService, @@ -238,6 +218,7 @@ object SandboxOnXRunner { writeService: WriteService, healthChecksWithIndexer: HealthChecks, timeServiceBackend: Option[TimeServiceBackend], + dbSupport: DbSupport, )(implicit actorSystem: ActorSystem, loggingContext: LoggingContext, @@ -259,7 +240,12 @@ object SandboxOnXRunner { otherInterceptors = BridgeConfigProvider.interceptors(config), engine = sharedEngine, servicesExecutionContext = servicesExecutionContext, - userManagementStore = new InMemoryUserManagementStore, // TODO persistence wiring comes here + userManagementStore = PersistentUserManagementStore.cached( + dbDispatcher = dbSupport.dbDispatcher, + metrics = metrics, + cacheExpiryAfterWriteInSeconds = config.userManagementConfig.cacheExpiryAfterWriteInSeconds, + maximumCacheSize = config.userManagementConfig.maximumCacheSize, + )(servicesExecutionContext), commandDeduplicationFeatures = CommandDeduplicationFeatures.of( deduplicationPeriodSupport = Some( CommandDeduplicationPeriodSupport.of( diff --git a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala index f44955aea3e6..f35d0c460fe1 100644 --- a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala +++ b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala @@ -25,7 +25,6 @@ import com.daml.ledger.api.v1.experimental_features.{ import com.daml.ledger.configuration.LedgerId import com.daml.ledger.on.sql.Database.InvalidDatabaseException import com.daml.ledger.on.sql.SqlLedgerReaderWriter -import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore import com.daml.ledger.participant.state.kvutils.api.{ KeyValueParticipantStateReader, KeyValueParticipantStateWriter, @@ -55,6 +54,7 @@ import com.daml.platform.sandboxnext.Runner._ import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.services.time.TimeProviderType import com.daml.platform.store.{DbSupport, LfValueTranslationCache} +import com.daml.platform.usermanagement.PersistentUserManagementStore import com.daml.ports.Port import com.daml.resources.ResettableResourceOwner import com.daml.telemetry.{DefaultTelemetry, SpanKind, SpanName} @@ -282,8 +282,13 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { connectionTimeout = apiServerConfig.databaseConnectionTimeout, metrics = metrics, ) - userManagementStore = - new InMemoryUserManagementStore // TODO persistence wiring comes here + userManagementStore = PersistentUserManagementStore.cached( + dbDispatcher = dbSupport.dbDispatcher, + metrics = metrics, + cacheExpiryAfterWriteInSeconds = + config.userManagementConfig.cacheExpiryAfterWriteInSeconds, + maximumCacheSize = config.userManagementConfig.maximumCacheSize, + )(servicesExecutionContext) indexService <- StandaloneIndexService( dbSupport = dbSupport, ledgerId = ledgerId, diff --git a/ledger/test-common/src/main/scala/com/digitalasset/platform/testing/LogCollector.scala b/ledger/test-common/src/main/scala/com/digitalasset/platform/testing/LogCollector.scala index 8f81f7f35109..71a6113e61f4 100644 --- a/ledger/test-common/src/main/scala/com/digitalasset/platform/testing/LogCollector.scala +++ b/ledger/test-common/src/main/scala/com/digitalasset/platform/testing/LogCollector.scala @@ -8,8 +8,8 @@ import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.AppenderBase import com.daml.platform.testing.LogCollector.Entry import com.daml.scalautil.Statement -import org.scalatest.matchers.should.Matchers import org.scalatest.Checkpoints.Checkpoint +import org.scalatest.matchers.should.Matchers import org.slf4j.Marker import scala.beans.BeanProperty @@ -74,7 +74,10 @@ final class LogCollector extends AppenderBase[ILoggingEvent] { trait LogCollectorAssertions { self: Matchers => - def assertLogEntry(actual: LogCollector.Entry, expected: LogCollector.ExpectedLogEntry): Unit = { + def assertLogEntry( + actual: LogCollector.Entry, + expected: LogCollector.ExpectedLogEntry, + ): Unit = { assertLogEntry(actual, expected.level, expected.msg, expected.markerRegex) } @@ -82,7 +85,7 @@ trait LogCollectorAssertions { self: Matchers => actual: LogCollector.Entry, expectedLogLevel: Level, expectedMsg: String, - expectedMarkerRegex: Option[String], + expectedMarkerRegex: Option[String] = None, ): Unit = { val cp = new Checkpoint cp { Statement.discard { actual.level shouldBe expectedLogLevel } }