Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResolveOption/FromMemento -> LoadOption #308

Merged
merged 20 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove options from Resolve
  • Loading branch information
bartelink committed Mar 4, 2022
commit ab6179c59c932229fd1fb3e6dfece24fc885f517
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `eqx`/`Equinox.Tool`: Flip `-P` option to opt _in_ to pretty printing [#313](https://github.com/jet/equinox/pull/313)
- `Equinox`: rename `Decider.TransactAsync` to `Transact` [#314](https://github.com/jet/equinox/pull/314)
- `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308)
- `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.0.25` [#310](https://github.com/jet/equinox/pull/310)
- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
Expand Down
2 changes: 1 addition & 1 deletion DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ elements you'll touch in a normal application are:
Transaction semantics that are central to Equinox and the overall `Decider` concept.
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L11) -
surface API one uses to `Transact` or `Query` against a specific stream's state
- [`type ResolveOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L59) -
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L59) -
used to specify optimization overrides to be applied when
`resolve` hydrates a `Decider`

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ and Equinox will supply the _initial_ value for the `project` function to render

> Side note: the original question is for a read operation, but there's an interesting consideration if we are doing a `Transact`. Say,
> for instance, that there's a PUT API endpoint where the code would register a fresh customer order for the customer in its order list
> via the Decider's `Transact` operation. As an optimization, one can utilize the `AssumeEmpty` hint as the `Equinox.ResolveOption` to
> via the Decider's `Transact` operation. As an optimization, one can utilize the `AssumeEmpty` hint as the `Equinox.LoadOption` to
> hint that it's worth operating on the assumption that the stream is empty. When the internal sync operation attempts to perform the write,
> that assumption will be tested; every write is always version checked.
> In the scenario where we are dealing with a rerun of an attempt to create an order (lets say the call timed out, but the processing actually
Expand Down
8 changes: 4 additions & 4 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ let interpretMany fold interpreters (state : 'state) : 'state * 'event list =
state', acc @ events)
#endif

type Service internal (resolve : CartId * Equinox.ResolveOption option -> Equinox.Decider<Events.Event, Fold.State>) =
type Service internal (resolve : CartId * Equinox.LoadOption option -> Equinox.Decider<Events.Event, Fold.State>) =

member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
Expand All @@ -159,11 +159,11 @@ type Service internal (resolve : CartId * Equinox.ResolveOption option -> Equino
let decider = resolve (cartId,None)
decider.Query id
member _.ReadStale cartId =
let decider = resolve (cartId,Some Equinox.ResolveOption.AllowStale)
let decider = resolve (cartId,Some Equinox.LoadOption.AllowStale)
decider.Query id

let create log resolveStream =
let resolve (id, opt) =
let stream = resolveStream (streamName id, opt)
Equinox.Decider(log, stream, maxAttempts = 3)
let stream = resolveStream (streamName id)
Equinox.Decider(log, stream, maxAttempts = 3, ?defaultOption = opt)
Service(resolve)
10 changes: 5 additions & 5 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot

let createMemoryStore () = MemoryStore.VolatileStore<byte[]>()
let createServiceMemory log store =
Cart.create log (fun (id,opt) -> MemoryStore.MemoryStoreCategory(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))
Cart.create log (MemoryStore.MemoryStoreCategory(store, Cart.Events.codec, fold, initial).Resolve)

let codec = Cart.Events.codec
let codecStj = Cart.Events.codecStj

let resolveGesStreamWithRollingSnapshots context =
fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve
let resolveGesStreamWithoutCustomAccessStrategy context =
fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial).Resolve(id,?option=opt)
EventStore.EventStoreCategory(context, codec, fold, initial).Resolve

let resolveCosmosStreamWithSnapshotStrategy context =
fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt)
CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve
let resolveCosmosStreamWithoutCustomAccessStrategy context =
fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve(id,?option=opt)
CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Cart.Service) count =
service.ExecuteManyAsync(cartId, false, seq {
Expand Down
11 changes: 5 additions & 6 deletions src/Equinox.Core/StoreCategory.fs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
/// Low level stream builders, generally consumed via Store-specific Stream Builders that layer policies such as Caching in at the Category level
namespace Equinox.Core
module Equinox.Core.Stream

/// Represents a specific stream in a ICategory
type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'event, 'state, 'streamId, 'context>, streamId: 'streamId, opt, context) =
type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'event, 'state, 'streamId, 'context>, streamId: 'streamId, context) =
interface IStream<'event, 'state> with
member _.Load log =
member _.Load(log, opt) =
category.Load(log, streamId, opt)

member _.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
category.TrySync(log, streamId, token, originState, events, context)

module Stream =

let create (category : ICategory<'event, 'state, 'streamId, 'context>) streamId opt context : IStream<'event, 'state> = Stream(category, streamId, opt, context) :> _
let create (category : ICategory<'event, 'state, 'streamId, 'context>) streamId context : IStream<'event, 'state> =
Stream(category, streamId, context) :> _
2 changes: 1 addition & 1 deletion src/Equinox.Core/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ open System.Diagnostics
/// Store-agnostic interface representing interactions an Application can have with a set of streams with a common event type
type ICategory<'event, 'state, 'streamId, 'context> =
/// Obtain the state from the target stream
abstract Load : log: ILogger * 'streamId * ResolveOption option -> Async<StreamToken * 'state>
abstract Load : log: ILogger * 'streamId * LoadOption -> Async<StreamToken * 'state>

/// Given the supplied `token`, attempt to sync to the proposed updated `state'` by appending the supplied `events` to the underlying stream, yielding:
/// - Written: signifies synchronization has succeeded, implying the included StreamState should now be assumed to be the state of the stream
Expand Down
16 changes: 7 additions & 9 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ module internal Caching =
member _.Load(log, streamName, opt) : Async<StreamToken * 'state> = async {
match! tryReadCache streamName with
| None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName
| Some tokenAndState when opt = Some Equinox.AllowStale -> return tokenAndState // read already updated TTL, no need to write
| Some tokenAndState when opt = Equinox.AllowStale -> return tokenAndState // read already updated TTL, no need to write
| Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName }
member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context)
: Async<SyncResult<'state>> = async {
Expand Down Expand Up @@ -1380,14 +1380,14 @@ type CachingStrategy =
/// Retain a single 'state per streamName, together with the associated etag.
/// Each cache hit for a stream renews the retention period for the defined <c>window</c>.
/// Upon expiration of the defined <c>window</c> from the point at which the cache was entry was last used, a full reload is triggered.
/// Unless <c>ResolveOption.AllowStale</c> is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip
| SlidingWindow of ICache * window : TimeSpan
/// Retain a single 'state per streamName, together with the associated etag.
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Typically combined with `Equinox.ResolveOption.AllowStale` to minimize loads.
/// Unless <c>ResolveOption.AllowStale</c> is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
/// Typically combined with `Equinox.LoadOption.AllowStale` to minimize loads.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
| FixedTimeSpan of ICache * period : TimeSpan

[<NoComparison; NoEquality; RequireQualifiedAccess>]
Expand Down Expand Up @@ -1448,11 +1448,11 @@ type CosmosStoreCategory<'event, 'state, 'context>
Caching.CachingCategory<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, tryReadCache, updateCache, checkUnfolds, compressUnfolds, mapUnfolds) :> _
categories.GetOrAdd(categoryName, createCategory)

let resolveStream (StreamName.CategoryAndId (categoryName, streamId) as sn) opt ctx =
let resolveStream (StreamName.CategoryAndId (categoryName, streamId) as sn) ctx =
let container, streamId, maybeContainerInitializationGate = context.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId)
let category = resolveCategory (categoryName, container)
{ new IStream<'event, 'state> with
member _.Load log = category.Load(log, streamId, opt)
member _.Load(log, opt) = category.Load(log, streamId, opt)
member _.TrySync(log : ILogger, token : StreamToken, originState : 'state, events : 'event list) =
match maybeContainerInitializationGate with
| None -> category.TrySync(log, StreamName.toString sn, token, originState, events, ctx)
Expand All @@ -1462,11 +1462,9 @@ type CosmosStoreCategory<'event, 'state, 'context>

member _.Resolve
( streamName : StreamName,
/// Resolver options
[<O; D null>]?option,
/// Context to be passed to IEventCodec
[<O; D null>]?context) =
resolveStream streamName option context
resolveStream streamName context

namespace Equinox.CosmosStore.Core

Expand Down
10 changes: 5 additions & 5 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state
| Some (cache : ICache, prefix : string) -> async {
match! cache.TryGet(prefix + streamName) with
| None -> return! batched log streamName
| Some tokenAndState when opt = Some AllowStale -> return tokenAndState
| Some tokenAndState when opt = AllowStale -> return tokenAndState
| Some (token, state) -> return! category.LoadFromToken fold state streamName token log }

member _.TrySync(log : ILogger, streamName, token, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
Expand All @@ -574,11 +574,11 @@ type CachingStrategy =
/// Retain a single 'state per streamName.
/// Each cache hit for a stream renews the retention period for the defined <c>window</c>.
/// Upon expiration of the defined <c>window</c> from the point at which the cache was entry was last used, a full reload is triggered.
/// Unless <c>ResolveOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
| SlidingWindow of ICache * window : TimeSpan
/// Retain a single 'state per streamName.
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Unless <c>ResolveOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
| FixedTimeSpan of ICache * period : TimeSpan
/// Prefix is used to segregate multiple folds per stream when they are stored in the cache.
/// Semantics are identical to <c>SlidingWindow</c>.
Expand Down Expand Up @@ -618,8 +618,8 @@ type EventStoreCategory<'event, 'state, 'context>
| Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder

member _.Resolve(streamName : FsCodec.StreamName, [<O; D null>] ?option, [<O; D null>] ?context) =
Stream.create category (FsCodec.StreamName.toString streamName) option context
member _.Resolve(streamName : FsCodec.StreamName, [<O; D null>] ?context) =
Stream.create category (FsCodec.StreamName.toString streamName) context

type private SerilogAdapter(log : ILogger) =
interface EventStore.ClientAPI.ILogger with
Expand Down
6 changes: 2 additions & 4 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>,

type MemoryStoreCategory<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event, 'Format, 'context>, fold, initial) =
let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial)
let resolveStream streamName context = Stream.create category streamName None context

member _.Resolve(streamName : FsCodec.StreamName, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context : 'context) =
match FsCodec.StreamName.toString streamName, option with
| sn, (None | Some AllowStale) -> resolveStream sn context
member _.Resolve(streamName : FsCodec.StreamName, [<Optional; DefaultParameterValue null>] ?context : 'context) =
Stream.create category (FsCodec.StreamName.toString streamName) context
10 changes: 5 additions & 5 deletions src/Equinox.SqlStreamStore/SqlStreamStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state
| Some (cache : ICache, prefix : string) -> async {
match! cache.TryGet(prefix + streamName) with
| None -> return! batched log streamName
| Some tokenAndState when opt = Some AllowStale -> return tokenAndState
| Some tokenAndState when opt = AllowStale -> return tokenAndState
| Some (token, state) -> return! category.LoadFromToken fold state streamName token log }
member _.TrySync(log : ILogger, streamName, streamToken, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
let! syncRes = category.TrySync(log, fold, streamName, streamToken, initialState, events, context)
Expand All @@ -533,11 +533,11 @@ type CachingStrategy =
/// Retain a single 'state per streamName.
/// Each cache hit for a stream renews the retention period for the defined <c>window</c>.
/// Upon expiration of the defined <c>window</c> from the point at which the cache was entry was last used, a full reload is triggered.
/// Unless <c>ResolveOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
| SlidingWindow of ICache * window : TimeSpan
/// Retain a single 'state per streamName
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Unless <c>ResolveOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
| FixedTimeSpan of ICache * period : TimeSpan
/// Prefix is used to segregate multiple folds per stream when they are stored in the cache.
/// Semantics are identical to <c>SlidingWindow</c>.
Expand Down Expand Up @@ -573,8 +573,8 @@ type SqlStreamStoreCategory<'event, 'state, 'context>
Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder
| Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder
member _.Resolve(streamName : FsCodec.StreamName, [<O; D null>]?option, [<O; D null>]?context) =
Stream.create category (FsCodec.StreamName.toString streamName) option context
member _.Resolve(streamName : FsCodec.StreamName, [<O; D null>]?context) =
Stream.create category (FsCodec.StreamName.toString streamName) context

[<AbstractClass>]
type ConnectorBase([<O; D(null)>]?readRetryPolicy, [<O; D(null)>]?writeRetryPolicy) =
Expand Down
Loading