Skip to content

Commit

Permalink
[User management] Persistence with caching (#12344)
Browse files Browse the repository at this point in the history
Adding
 - `PersistentUserManagementStore` and `CachedUserManagementStore`,
 - `UserManagementStorageBackendTemplate` and sql migrations,
 - CLI flags: `--user-management-max-cache-size` and `--user-management-cache-expiry`;
 And wiring `PersistentUserManagementStore` where before we had `InMemoryUserManagementStore`.
  • Loading branch information
pbatko-da authored Jan 18, 2022
1 parent f07791e commit 4fe6e53
Show file tree
Hide file tree
Showing 43 changed files with 1,551 additions and 162 deletions.
6 changes: 6 additions & 0 deletions ledger/caching/src/main/scala/com/daml/caching/Cache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ abstract class Cache[Key, Value] {
*/
def getIfPresent(key: Key): Option[Value]

/** Discard any cached value for the key.
*
* The behavior of this operation is undefined for an entry that is being loaded and is otherwise not present.
*/
def invalidate(key: Key): Unit

/** Transform values when reading from or writing to the cache.
*
* Optionally allows the mapping to discard values by returning [[None]] when transforming before
Expand Down
34 changes: 30 additions & 4 deletions ledger/caching/src/main/scala/com/daml/caching/CaffeineCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package com.daml.caching
import com.daml.metrics.CacheMetrics
import com.github.benmanes.caffeine.{cache => caffeine}

import scala.compat.java8.FutureConverters
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future

object CaffeineCache {

Expand All @@ -31,16 +33,27 @@ object CaffeineCache {

override def getOrAcquire(key: Key, acquire: Key => Value): Value =
cache.get(key, key => acquire(key))

override def invalidate(key: Key): Unit = cache.invalidate(key)
}

final class AsyncLoadingCacheScala[Key <: AnyRef, Value <: AnyRef](
cache: caffeine.AsyncLoadingCache[Key, Value],
cacheMetrics: CacheMetrics,
) {
installMetrics(cacheMetrics, cache.synchronous())

def get(key: Key): Future[Value] = FutureConverters.toScala(cache.get(key))

def invalidate(key: Key): Unit = cache.synchronous().invalidate(key)

}

private final class InstrumentedCaffeineCache[Key <: AnyRef, Value <: AnyRef](
cache: caffeine.Cache[Key, Value],
metrics: CacheMetrics,
) extends ConcurrentCache[Key, Value] {
metrics.registerSizeGauge(() => cache.estimatedSize())
metrics.registerWeightGauge(() =>
cache.policy().eviction().asScala.flatMap(_.weightedSize.asScala).getOrElse(0)
)
installMetrics(metrics, cache)

private val delegate = new SimpleCaffeineCache(cache)

Expand All @@ -52,6 +65,19 @@ object CaffeineCache {

override def getOrAcquire(key: Key, acquire: Key => Value): Value =
delegate.getOrAcquire(key, acquire)

override def invalidate(key: Key): Unit =
delegate.invalidate(key)
}

private def installMetrics[Key <: AnyRef, Value <: AnyRef](
metrics: CacheMetrics,
cache: caffeine.Cache[Key, Value],
): Unit = {
metrics.registerSizeGauge(() => cache.estimatedSize())
metrics.registerWeightGauge(() =>
cache.policy().eviction().asScala.flatMap(_.weightedSize.asScala).getOrElse(0)
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ private[caching] final class MappedCache[Key, Value, NewValue](

override def getIfPresent(key: Key): Option[NewValue] =
delegate.getIfPresent(key).map(mapAfterReading)

override def invalidate(key: Key): Unit =
delegate.invalidate(key)
}
2 changes: 2 additions & 0 deletions ledger/caching/src/main/scala/com/daml/caching/NoCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ final class NoCache[Key, Value] private[caching] extends ConcurrentCache[Key, Va
override def getIfPresent(key: Key): Option[Value] = None

override def getOrAcquire(key: Key, acquire: Key => Value): Value = acquire(key)

override def invalidate(key: Key): Unit = ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ final class MapBackedCacheForTesting[Key, Value](store: ConcurrentMap[Key, Value

override def getOrAcquire(key: Key, acquire: Key => Value): Value =
store.computeIfAbsent(key, acquire(_))

override def invalidate(key: Key): Unit = store.remove(key): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ object ErrorCategory {
*/
@Description(
"""The mutable state of the system does not satisfy the preconditions required to execute the request.
|We consider the whole Daml ledger including ledger config, parties, packages, and command
|We consider the whole Daml ledger including ledger config, parties, packages, users and command
|deduplication to be mutable system state. Thus all Daml interpretation errors are reported
|as this error or one of its specializations."""
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ class ErrorFactoriesSpec
expectedDetails: Seq[ErrorDetails.ErrorDetail],
expectedLogEntry: ExpectedLogEntry,
): Unit = {
assertError[this.type, this.type](
val _ = assertError[this.type, this.type](
actual = statusRuntimeException,
expectedCode,
expectedMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ object Assertions {
}
}

def assertEquals[T](actual: T, expected: T): Unit = {
try {
MUnit.assertEquals(actual, expected)
} catch {
case e: ComparisonFailException =>
throw AssertionErrorWithPreformattedMessage(
e.message,
s"two objects are supposed to be equal but they are not",
)
}
}

def assertSameElements[T](actual: Iterable[T], expected: Iterable[T]): Unit = {
assert(
actual.toSet == expected.toSet,
s"Actual |${actual.mkString(", ")}| should have the same elements as (expected): |${expected.mkString(", ")}|",
)
}

/** Asserts GRPC error codes depending on the self-service error codes feature in the Ledger API. */
def assertGrpcError(
participant: ParticipantTestContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,20 @@ private[testtool] final class ParticipantTestContext private[participant] (

private[this] val identifierPrefix =
s"$applicationId-$endpointId-$identifierSuffix"
private[this] def nextId(idType: String): () => String =
Identification.indexSuffix(s"$identifierPrefix-$idType")

private[this] val nextPartyHintId: () => String = nextId("party")
private[this] val nextCommandId: () => String = nextId("command")
private[this] val nextSubmissionId: () => String = nextId("submission")
private[this] def nextIdGenerator(name: String, lowerCase: Boolean = false): () => String = {
val f = Identification.indexSuffix(s"$identifierPrefix-$name")
if (lowerCase)
() => f().toLowerCase
else
f
}

private[this] val nextPartyHintId: () => String = nextIdGenerator("party")
private[this] val nextCommandId: () => String = nextIdGenerator("command")
private[this] val nextSubmissionId: () => String = nextIdGenerator("submission")
private[this] val workflowId: String = s"$applicationId-$identifierSuffix"
val nextKeyId: () => String = nextId("key")
val nextKeyId: () => String = nextIdGenerator("key")
val nextUserId: () => String = nextIdGenerator("user", lowerCase = true)

override def toString: String = s"participant $endpointId"

Expand Down
Loading

0 comments on commit 4fe6e53

Please sign in to comment.