Skip to content

Commit

Permalink
Remove optionality from Cosmos caching #104
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 4, 2019
1 parent f166b8b commit f468420
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 23 deletions.
4 changes: 2 additions & 2 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type StreamResolver(storage) =
| Storage.StorageConfig.Es (gateway, cache, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStore.GesResolver<'event,'state>(gateway, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve
| Storage.StorageConfig.Cosmos (gateway, cache, unfolds, databaseId, collectionId) ->
| Storage.StorageConfig.Cosmos (gateway, caching, unfolds, databaseId, collectionId) ->
let store = Equinox.Cosmos.CosmosStore(gateway, databaseId, collectionId)
let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot |> Some else None
Equinox.Cosmos.CosmosResolver<'event,'state>(store, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve
Equinox.Cosmos.CosmosResolver<'event,'state>(store, codec, fold, initial, caching, ?access = accessStrategy).Resolve

type ServiceBuilder(storageConfig, handlerLog) =
let resolver = StreamResolver(storageConfig)
Expand Down
6 changes: 3 additions & 3 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ let defaultBatchSize = 500
type StorageConfig =
| Memory of Equinox.MemoryStore.VolatileStore
| Es of Equinox.EventStore.GesGateway * Equinox.EventStore.CachingStrategy option * unfolds: bool
| Cosmos of Equinox.Cosmos.CosmosGateway * Equinox.Cosmos.CachingStrategy option * unfolds: bool * databaseId: string * collectionId: string
| Cosmos of Equinox.Cosmos.CosmosGateway * Equinox.Cosmos.CachingStrategy * unfolds: bool * databaseId: string * collectionId: string

module MemoryStore =
let config () =
Expand Down Expand Up @@ -129,6 +129,6 @@ module Cosmos =
let cacheStrategy =
if cache then
let c = Caching.Cache("equinox-tool", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.)
else CachingStrategy.NoCaching
StorageConfig.Cosmos (createGateway conn (defaultBatchSize,pageSize), cacheStrategy, unfolds, dbName, collName)
4 changes: 2 additions & 2 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesResolver(gateway, codec, fold, initial).Resolve

let resolveCosmosStreamWithProjection gateway =
CosmosResolver(gateway, codec, fold, initial, AccessStrategy.Snapshot snapshot).Resolve
CosmosResolver(gateway, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve
let resolveCosmosStreamWithoutCustomAccessStrategy gateway =
CosmosResolver(gateway, codec, fold, initial).Resolve
CosmosResolver(gateway, codec, fold, initial, CachingStrategy.NoCaching).Resolve

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ let resolveStreamGesWithoutAccessStrategy gateway =
GesResolver(gateway defaultBatchSize, codec, fold, initial).Resolve

let resolveStreamCosmosWithKnownEventTypeSemantics gateway =
CosmosResolver(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType).Resolve
CosmosResolver(gateway 1, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.AnyKnownEventType).Resolve
let resolveStreamCosmosWithoutCustomAccessStrategy gateway =
CosmosResolver(gateway defaultBatchSize, codec, fold, initial).Resolve
CosmosResolver(gateway defaultBatchSize, codec, fold, initial, CachingStrategy.NoCaching).Resolve

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ let createServiceGes gateway log =
Backend.Favorites.Service(log, resolveStream)

let createServiceCosmos gateway log =
let resolveStream = CosmosResolver(gateway, codec, fold, initial, AccessStrategy.Snapshot snapshot).Resolve
let resolveStream = CosmosResolver(gateway, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Cosmos.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ module Store =

module FavoritesCategory =
let codec = Equinox.UnionCodec.JsonUtf8.Create<Favorites.Event>(Newtonsoft.Json.JsonSerializerSettings())
let resolve = CosmosResolver(Store.store, codec, Favorites.fold, Favorites.initial, caching=Store.cacheStrategy).Resolve
let resolve = CosmosResolver(Store.store, codec, Favorites.fold, Favorites.initial, Store.cacheStrategy).Resolve

let service = Favorites.Service(Log.log, FavoritesCategory.resolve)

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Todo.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ module Store =
module TodosCategory =
let codec = Equinox.UnionCodec.JsonUtf8.Create<Event>(Newtonsoft.Json.JsonSerializerSettings())
let access = Equinox.Cosmos.AccessStrategy.Snapshot (isOrigin,compact)
let resolve = CosmosResolver(Store.store, codec, fold, initial, access=access, caching=Store.cacheStrategy).Resolve
let resolve = CosmosResolver(Store.store, codec, fold, initial, Store.cacheStrategy, access=access).Resolve

let service = Service(log, TodosCategory.resolve)

Expand Down
17 changes: 12 additions & 5 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,13 @@ type CosmosStore(gateway: CosmosGateway, collections: CosmosCollections, [<O; D(

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CachingStrategy =
/// Do not apply any caching strategy for this Stream.
/// NB opting not to leverage caching when using CosmosDb can have significant implications for the scalability
/// of your application, both in terms of latency and running costs.
/// While the cost of a cache miss can be ameliorated to varying degrees by employing an appropriate `AccessStrategy`
/// [that works well and has been validated for your scenario with real data], even a cache with a low Hit Rate provides
/// a direct benefit in terms of the number of Request Unit (RU)s that need to be provisioned to your CosmosDb instances.
| NoCaching
/// Retain a single 'state per streamName, together with the associated etag
/// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
/// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in tip
Expand All @@ -890,11 +897,11 @@ type AccessStrategy<'event,'state> =
/// Trust every event type as being an origin
| AnyKnownEventType

type CosmosResolver<'event, 'state>(store : CosmosStore, codec, fold, initial, [<O; D(null)>]?access, [<O; D(null)>]?caching) =
type CosmosResolver<'event, 'state>(store : CosmosStore, codec, fold, initial, caching, [<O; D(null)>]?access) =
let readCacheOption =
match caching with
| None -> None
| Some (CachingStrategy.SlidingWindow(cache, _)) -> Some(cache, null)
| CachingStrategy.NoCaching -> None
| CachingStrategy.SlidingWindow(cache, _) -> Some(cache, null)
let isOrigin, projectOption =
match access with
| None -> (fun _ -> false), None
Expand All @@ -905,8 +912,8 @@ type CosmosResolver<'event, 'state>(store : CosmosStore, codec, fold, initial, [
let folder = Folder<'event, 'state>(cosmosCat, fold, initial, isOrigin, ?unfold=projectOption, ?readCache = readCacheOption)
let category : Store.ICategory<_,_,CollectionStream> =
match caching with
| None -> folder :> _
| Some (CachingStrategy.SlidingWindow(cache, window)) ->
| CachingStrategy.NoCaching -> folder :> _
| CachingStrategy.SlidingWindow(cache, window) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder

let resolveStream (streamId, maybeCollectionInitializationGate) =
Expand Down
12 changes: 6 additions & 6 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,32 @@ module Cart =
let codec = genCodec<Domain.Cart.Events.Event>()
let createServiceWithoutOptimization connection batchSize log =
let store = createCosmosStore connection batchSize
let resolveStream = CosmosResolver(store, codec, fold, initial).Resolve
let resolveStream = CosmosResolver(store, codec, fold, initial, CachingStrategy.NoCaching).Resolve
Backend.Cart.Service(log, resolveStream)
let createServiceWithoutOptimizationAndMaxItems connection batchSize maxEventsPerSlice log =
let store = createCosmosStoreWithMaxEventsPerSlice connection batchSize maxEventsPerSlice
let resolveStream = CosmosResolver(store, codec, fold, initial).Resolve
let resolveStream = CosmosResolver(store, codec, fold, initial, CachingStrategy.NoCaching).Resolve
Backend.Cart.Service(log, resolveStream)
let projection = "Compacted",snd snapshot
let createServiceWithProjection connection batchSize log =
let store = createCosmosStore connection batchSize
let resolveStream = CosmosResolver(store, codec, fold, initial, AccessStrategy.Snapshot snapshot).Resolve
let resolveStream = CosmosResolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve
Backend.Cart.Service(log, resolveStream)
let createServiceWithProjectionAndCaching connection batchSize log cache =
let store = createCosmosStore connection batchSize
let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let resolveStream = CosmosResolver(store, codec, fold, initial, AccessStrategy.Snapshot snapshot, sliding20m).Resolve
let resolveStream = CosmosResolver(store, codec, fold, initial, sliding20m, AccessStrategy.Snapshot snapshot).Resolve
Backend.Cart.Service(log, resolveStream)

module ContactPreferences =
let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial
let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let createServiceWithoutOptimization createGateway defaultBatchSize log _ignoreWindowSize _ignoreCompactionPredicate =
let gateway = createGateway defaultBatchSize
let resolveStream = CosmosResolver(gateway, codec, fold, initial).Resolve
let resolveStream = CosmosResolver(gateway, codec, fold, initial, CachingStrategy.NoCaching).Resolve
Backend.ContactPreferences.Service(log, resolveStream)
let createService createGateway log =
let resolveStream = CosmosResolver(createGateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType).Resolve
let resolveStream = CosmosResolver(createGateway 1, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.AnyKnownEventType).Resolve
Backend.ContactPreferences.Service(log, resolveStream)

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
Expand Down

0 comments on commit f468420

Please sign in to comment.