Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResolveOption/FromMemento -> LoadOption #308

Merged
merged 20 commits into from
Mar 4, 2022
Merged
Prev Previous commit
Next Next commit
Tidy LoadOptions
  • Loading branch information
bartelink committed Mar 4, 2022
commit 16afb4fa543591abbc088cccd412f3e6e63aa297
2 changes: 1 addition & 1 deletion samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
@@ -148,7 +148,7 @@ type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.St
return interpretMany Fold.fold (Seq.map interpret commands) state }
#endif
let decider = resolve cartId
decider.Transact(interpret, (if optimistic then Equinox.AllowStale else Equinox.Load))
decider.Transact(interpret, (if optimistic then Equinox.AllowStale else Equinox.RequireLoad))

member x.ExecuteManyAsync(cartId, optimistic, commands : Command seq, ?prepare) : Async<unit> =
x.Run(cartId, optimistic, commands, ?prepare=prepare) |> Async.Ignore
8 changes: 2 additions & 6 deletions src/Equinox.Core/StoreCategory.fs
Original file line number Diff line number Diff line change
@@ -6,12 +6,8 @@ namespace Equinox.Core
type private Stream<'event, 'state, 'context>(category : ICategory<'event, 'state, string, 'context>, streamId: string, empty : StreamToken * 'state, ?context : 'context, ?init : unit -> Async<unit>) =

interface IStream<'event, 'state> with
member _.Load(log, opt) =
match opt with
| Equinox.LoadOption.AssumeEmpty -> async { return empty }
| Equinox.LoadOption.FromMemento (streamToken, state) -> async { return (streamToken, state) }
| Equinox.LoadOption.AllowStale -> category.Load(log, streamId, true)
| Equinox.LoadOption.Load -> category.Load(log, streamId, false)
member _.LoadEmpty() = empty
member _.Load(log, allowStale) = category.Load(log, streamId, allowStale)
member _.TrySync(log, token: StreamToken, originState: 'state, events: 'event list) =
let sync = category.TrySync(log, streamId, token, originState, events, context)
match init with
26 changes: 7 additions & 19 deletions src/Equinox/Core.fs
Original file line number Diff line number Diff line change
@@ -5,20 +5,6 @@ type [<NoComparison>] StreamToken = { value : obj; version: int64 }

namespace Equinox

/// Store-agnostic Loading Options
[<NoComparison; NoEquality>]
type LoadOption<'state> =
/// No special requests; Obtain latest state from store based on consistency level configured
| Load
/// If the Cache holds any state, use that without checking the backing store for updates, implying:
/// - maximizing how much we lean on Optimistic Concurrency Control when doing a `Transact` (you're still guaranteed a consistent outcome)
/// - enabling stale reads [in the face of multiple writers (either in this process or in other processes)] when doing a `Query`
| AllowStale
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet
| AssumeEmpty
/// <summary>Instead of loading from database, seed the loading process with the supplied memento, obtained via <c>ISyncContext.CreateMemento()</c></summary>
| FromMemento of memento : (Core.StreamToken * 'state)

/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required
type ISyncContext<'state> =

@@ -52,8 +38,10 @@ type SyncResult<'state> =
/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code.
type IStream<'event, 'state> =

abstract LoadEmpty : unit -> StreamToken * 'state

/// Obtain the state from the target stream
abstract Load : log: ILogger * opt : Equinox.LoadOption<'state> -> Async<StreamToken * 'state>
abstract Load : log: ILogger * allowStale : bool -> Async<StreamToken * 'state>

/// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream
/// SyncResult.Written: implies the state is now the value represented by the Result's value
@@ -131,12 +119,12 @@ module internal Flow =
// Commence, processing based on the incoming state
loop 1

let transact opt (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async {
let! streamState = stream.Load(log, opt)
let transact load (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async {
let! streamState = load log
let context = SyncContext(streamState, stream.TrySync)
return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) context decide mapResult }

let query opt (stream : IStream<'event, 'state>, log : ILogger) (project: SyncContext<'event, 'state> -> 'result) : Async<'result> = async {
let! streamState = stream.Load(log, opt)
let query load (stream : IStream<'event, 'state>, log : ILogger) (project: SyncContext<'event, 'state> -> 'result) : Async<'result> = async {
let! streamState = load log
let context = SyncContext(streamState, stream.TrySync)
return project context }
29 changes: 23 additions & 6 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
@@ -3,6 +3,20 @@
open Equinox.Core
open System.Runtime.InteropServices

/// Store-agnostic Loading Options
[<NoComparison; NoEquality>]
type LoadOption<'state> =
/// No special requests; Obtain latest state from store based on consistency level configured
| RequireLoad
/// If the Cache holds any state, use that without checking the backing store for updates, implying:
/// - maximizing how much we lean on Optimistic Concurrency Control when doing a `Transact` (you're still guaranteed a consistent outcome)
/// - enabling stale reads [in the face of multiple writers (either in this process or in other processes)] when doing a `Query`
| AllowStale
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet
| AssumeEmpty
/// <summary>Instead of loading from database, seed the loading process with the supplied memento, obtained via <c>ISyncContext.CreateMemento()</c></summary>
| FromMemento of memento : (StreamToken * 'state)

/// Exception yielded by Decider.Transact after `count` attempts have yielded conflicts at the point of syncing with the Store
type MaxResyncsExhaustedException(count) =
inherit exn(sprintf "Concurrency violation; aborting after %i attempts." count)
@@ -12,19 +26,22 @@ type Decider<'event, 'state>
( log, stream : IStream<'event, 'state>, maxAttempts : int,
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException : int -> exn,
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy,
?defaultOption) =
?allowStale) =

let resolveOptions : LoadOption<'state> option -> LoadOption<'state> = function
| None -> defaultArg defaultOption LoadOption.Load
| Some o -> o
let load : LoadOption<'state> option -> _ = function
| None when allowStale = Some true -> fun log -> stream.Load(log, true)
| None | Some RequireLoad -> fun log -> stream.Load(log, false)
| Some AllowStale -> fun log -> stream.Load(log, true)
| Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() }
| Some (FromMemento (streamToken, state)) -> fun _log -> async { return (streamToken, state) }

let transact maybeOverride decide mapResult =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
Flow.transact (resolveOptions maybeOverride) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult
Flow.transact (load maybeOverride) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult

let query option args = Flow.query (resolveOptions option) args
let query option args = Flow.query (load option) args

/// 0. Invoke the supplied <c>interpret</c> function with the present state
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
2 changes: 1 addition & 1 deletion tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
@@ -410,7 +410,7 @@ module Dump =
with e -> log.ForContext("str", System.Text.Encoding.UTF8.GetString data).Warning(e, "Parse failure"); reraise()
let readStream (streamName : FsCodec.StreamName) = async {
let stream = cat.Resolve(idCodec,fold,initial,isOriginAndSnapshot) streamName
let! _token, events = stream.Load(storeLog, Equinox.LoadOption.Load)
let! _token, events = stream.Load(storeLog, allowStale = false)
let mutable prevTs = None
for x in events |> Seq.filter (fun e -> (e.IsUnfold && doU) || (not e.IsUnfold && doE)) do
let ty,render = if x.IsUnfold then "U", render Newtonsoft.Json.Formatting.Indented else "E", render fo