Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResolveOption/FromMemento -> LoadOption #308

Merged
merged 20 commits into from
Mar 4, 2022
Merged
Prev Previous commit
Next Next commit
Reorg
  • Loading branch information
bartelink committed Mar 4, 2022
commit 675a073016a4d84dc63aaf88b413f1ee1cd635b0
86 changes: 26 additions & 60 deletions src/Equinox/Core.fs
Original file line number Diff line number Diff line change
@@ -1,68 +1,39 @@
namespace Equinox.Core

/// Store-specific opaque token to be used for synchronization purposes
type [<NoComparison>] StreamToken = { value : obj; version: int64 }

namespace Equinox

/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required
type ISyncContext<'state> =

/// Exposes the underlying Store's internal Version for the underlying stream.
/// An empty stream is Version 0; one with a single event is Version 1 etc.
/// It's important to consider that this Version is more authoritative than inspecting the `Index` of the last event passed to
/// your `fold` function - the codec may opt to ignore it
abstract member Version : int64

/// The present State of the stream within the context of this Flow
abstract member State : 'state

/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via LoadOption.Memento
abstract member CreateMemento : unit -> Core.StreamToken * 'state

/// Internal data structures/impl. While these are intended to be legible, understanding the abstractions involved is only necessary if you are implementing a Store or a decorator thereof.
/// Internal data structures/impl. While these are intended to be legible, understanding the abstractions involved is only necessary if you are implementing a Store or a decorator thereof.
/// i.e., if you're seeking to understand the main usage flows of the Equinox library, that's in Decider.fs, not here
namespace Equinox.Core

open Serilog

/// Internal type used to represent the outcome of a TrySync operation
[<NoEquality; NoComparison; RequireQualifiedAccess>]
type SyncResult<'state> =
/// The write succeeded (the supplied token and state can be used to efficiently continue the processing if, and only if, desired)
| Written of StreamToken * 'state
/// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store
/// The inner is Async as some stores (and/or states) are such that determining the conflicting state (if, and only if, required) needs an extra trip to obtain
| Conflict of Async<StreamToken * 'state>

/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code.
type IStream<'event, 'state> =

/// Generate a stream token that represents a stream one believes to be empty to use as a Null Object when optimizing out the initial load roundtrip
abstract LoadEmpty : unit -> StreamToken * 'state

/// Obtain the state from the target stream
abstract Load : log: ILogger * allowStale : bool -> Async<StreamToken * 'state>
abstract Load : log: Serilog.ILogger * allowStale : bool -> Async<StreamToken * 'state>

/// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream
/// SyncResult.Written: implies the state is now the value represented by the Result's value
/// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry
abstract TrySync : log: ILogger * token: StreamToken * originState: 'state * events: 'event list -> Async<SyncResult<'state>>
abstract TrySync : log: Serilog.ILogger * token: StreamToken * originState: 'state * events: 'event list -> Async<SyncResult<'state>>

/// Internal type used to represent the outcome of a TrySync operation
and [<NoEquality; NoComparison; RequireQualifiedAccess>] SyncResult<'state> =
/// The write succeeded (the supplied token and state can be used to efficiently continue the processing if, and only if, desired)
| Written of StreamToken * 'state
/// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store
/// The inner is Async as some stores (and/or states) are such that determining the conflicting state (if, and only if, required) needs an extra trip to obtain
| Conflict of Async<StreamToken * 'state>

/// Store-specific opaque token to be used for synchronization purposes
and [<NoComparison>] StreamToken = { value : obj; version: int64 }

/// Internal implementation of the Store agnostic load + run/render. See Decider.fs for App-facing APIs.
module internal Flow =

let query (load : Async<StreamToken * 'state>) (project: Equinox.ISyncContext<'state> -> 'result) : Async<'result> = async {
let! tokenAndState = load
let context = { new Equinox.ISyncContext<'state> with
member _.State = snd tokenAndState
member _.Version = (fst tokenAndState).version
member _.CreateMemento() = tokenAndState }
return project context }

/// Represents stream and folding state between the load and run/render phases
type SyncContext<'event, 'state>
( originState : StreamToken * 'state,
trySync : ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>) =
trySync : Serilog.ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>) =
let mutable tokenAndState = originState

let trySyncOr log events (handleFailureResync : Async<StreamToken * 'state> -> Async<bool>) : Async<bool> = async {
Expand All @@ -74,55 +45,50 @@ module internal Flow =
tokenAndState <- token', streamState'
return true }

interface Equinox.ISyncContext<'state> with
member _.State = snd tokenAndState
member _.Version = (fst tokenAndState).version
member _.CreateMemento() = tokenAndState

member _.TryWithoutResync(log : ILogger, events) : Async<bool> =
member _.TryWithoutResync(log : Serilog.ILogger, events) : Async<bool> =
trySyncOr log events (fun _resync -> async { return false })
member _.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async<bool> =
member _.TryOrResync(runResync, attemptNumber: int, log : Serilog.ILogger, events) : Async<bool> =
let resyncInPreparationForRetry resync = async {
let! streamState' = runResync log attemptNumber resync
tokenAndState <- streamState'
return false }
trySyncOr log events resyncInPreparationForRetry
member _.CurrentTokenAndState = tokenAndState

/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
/// 2a. if no changes required, exit with known state
/// 2b. if saved without conflict, exit with updated state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
let run (log : Serilog.ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(context : SyncContext<'event, 'state>)
(decide : Equinox.ISyncContext<'state> -> Async<'result * 'event list>)
(mapResult : 'result -> SyncContext<'event, 'state> -> 'view)
: Async<'view> =
(decide : StreamToken * 'state -> Async<'result * 'event list>)
(mapResult : 'result -> StreamToken * 'state -> 'view) : Async<'view> =

if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")

/// Run a decision cycle - decide what events should be appended given the presented state
let rec loop attempt : Async<'view> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
let! result, events = decide (context :> Equinox.ISyncContext<'state>)
let! result, events = decide context.CurrentTokenAndState
if List.isEmpty events then
log.Debug "No events generated"
return mapResult result context
return mapResult result context.CurrentTokenAndState
elif attempt = maxSyncAttempts then
// Special case: on final attempt, we won't be `resync`ing; we're giving up
let! committed = context.TryWithoutResync(log, events)
if not committed then
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
else
return mapResult result context
return mapResult result context.CurrentTokenAndState
else
let! committed = context.TryOrResync(resyncRetryPolicy, attempt, log, events)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
else
return mapResult result context }
return mapResult result context.CurrentTokenAndState }

// Commence, processing based on the incoming state
loop 1
Expand Down
65 changes: 43 additions & 22 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,6 @@
open Equinox.Core
open System.Runtime.InteropServices

/// Store-agnostic Loading Options
[<NoComparison; NoEquality>]
type LoadOption<'state> =
/// No special requests; Obtain latest state from store based on consistency level configured
| RequireLoad
/// If the Cache holds any state, use that without checking the backing store for updates, implying:
/// - maximizing how much we lean on Optimistic Concurrency Control when doing a `Transact` (you're still guaranteed a consistent outcome)
/// - enabling stale reads [in the face of multiple writers (either in this process or in other processes)] when doing a `Query`
| AllowStale
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet
| AssumeEmpty
/// <summary>Instead of loading from database, seed the loading process with the supplied memento, obtained via <c>ISyncContext.CreateMemento()</c></summary>
| FromMemento of memento : (StreamToken * 'state)

/// Exception yielded by Decider.Transact after `count` attempts have yielded conflicts at the point of syncing with the Store
type MaxResyncsExhaustedException(count) =
inherit exn(sprintf "Concurrency violation; aborting after %i attempts." count)
Expand All @@ -28,51 +14,86 @@ type Decider<'event, 'state>
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy,
?allowStale) =

let load : LoadOption<'state> option -> _ = function
let load : LoadOption<'state> option -> (Serilog.ILogger -> Async<StreamToken * 'state>) = function
| None when allowStale = Some true -> fun log -> stream.Load(log, true)
| None | Some RequireLoad -> fun log -> stream.Load(log, false)
| Some AllowStale -> fun log -> stream.Load(log, true)
| Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() }
| Some (FromMemento (streamToken, state)) -> fun _log -> async { return (streamToken, state) }
let query option = Flow.query (load option log)
let query option project = async {
let! tokenAndState = load option log
return project tokenAndState }
let transact option decide mapResult =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
Flow.transact (load option) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult
let (|Context|) (token, state) =
{ new ISyncContext<'state> with
member _.State = state
member _.Version = token.version
member _.CreateMemento() = token, state }

/// 0. Invoke the supplied <c>interpret</c> function with the present state
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
member _.Transact(interpret : 'state -> 'event list, ?option) : Async<unit> =
transact option (fun context -> async { return (), interpret context.State }) (fun () _context -> ())
transact option (fun (_token, state) -> async { return (), interpret state }) (fun () _context -> ())

/// 0. Invoke the supplied <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member _.Transact(decide : 'state -> 'result * 'event list, ?option) : Async<'result> =
transact option (fun context -> async { return decide context.State }) (fun result _context -> result)
transact option (fun (_token, state) -> async { return decide state }) (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member _.Transact(decide : 'state -> Async<'result * 'event list>, ?option) : Async<'result> =
transact option (fun context -> decide context.State) (fun result _context -> result)
transact option (fun (_token, state) -> decide state) (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state (including extended context), holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Uses <c>mapResult</c> to render the final 'view from the <c>'result</c> and/or the final <c>ISyncContext</c>
/// 3. Yields the 'view
member _.TransactEx(decide : ISyncContext<'state> -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'view, ?option) : Async<'view> =
transact option decide mapResult
transact option (fun (Context c) -> decide c) (fun r (Context c) -> mapResult r c)

/// Project from the folded <c>'state</c>, without executing a decision flow as <c>Transact</c> does
member _.Query(projection : 'state -> 'view, ?option) : Async<'view> =
query option (fun context -> projection context.State)
query option (fun (_token, state) -> projection state)

/// Project from the stream's <c>'state<c> (including extended context), without executing a decision flow as <c>Transact<c> does
member _.QueryEx(projection : ISyncContext<'state> -> 'view, ?option) : Async<'view> =
query option projection
query option (fun (Context c) -> projection c)

/// Store-agnostic Loading Options
and [<NoComparison; NoEquality>] LoadOption<'state> =
/// No special requests; Obtain latest state from store based on consistency level configured
| RequireLoad
/// If the Cache holds any state, use that without checking the backing store for updates, implying:
/// - maximizing how much we lean on Optimistic Concurrency Control when doing a `Transact` (you're still guaranteed a consistent outcome)
/// - enabling stale reads [in the face of multiple writers (either in this process or in other processes)] when doing a `Query`
| AllowStale
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet
| AssumeEmpty
/// <summary>Instead of loading from database, seed the loading process with the supplied memento, obtained via <c>ISyncContext.CreateMemento()</c></summary>
| FromMemento of memento : (StreamToken * 'state)

/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required
and ISyncContext<'state> =

/// Exposes the underlying Store's internal Version for the underlying stream.
/// An empty stream is Version 0; one with a single event is Version 1 etc.
/// It's important to consider that this Version is more authoritative than inspecting the `Index` of the last event passed to
/// your `fold` function - the codec may opt to ignore it
abstract member Version : int64

/// The present State of the stream within the context of this Flow
abstract member State : 'state

/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via LoadOption.Memento
abstract member CreateMemento : unit -> StreamToken * 'state