Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResolveOption/FromMemento -> LoadOption #308

Merged
merged 20 commits into from
Mar 4, 2022
Merged
Prev Previous commit
Next Next commit
Inline Flow.run
  • Loading branch information
bartelink committed Mar 4, 2022
commit 4429d6ec9113e1baf71ec4cea0cc129e2b0b2c44
37 changes: 0 additions & 37 deletions src/Equinox/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,3 @@ and [<NoEquality; NoComparison; RequireQualifiedAccess>] SyncResult<'state> =

/// Store-specific opaque token to be used for synchronization purposes
and [<NoComparison>] StreamToken = { value : obj; version: int64 }

/// Internal implementation of the Optimistic Concurrency Control loop within which a decider function runs. See Decider.fs for App-facing APIs.
module internal Flow =

/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
/// 2a. if no changes required, exit with known state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
/// 2c. if saved without conflict, exit with updated state
let transact (originState : StreamToken * 'state)
(decide : StreamToken * 'state -> Async<'result * 'event list>)
(log : Serilog.ILogger)
(trySync : Serilog.ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>)
(maxSyncAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(mapResult : 'result -> StreamToken * 'state -> 'view) : Async<'view> =

// Runs one decision loop, potentially recursing with resynced state if there's a conflict on the write
let rec loop (token, state) attempt : Async<'view> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
match! decide (token, state) with
| result, [] ->
log.Debug "No events generated"
return mapResult result (token, state)
| result, events ->
match! trySync (log, token, state, events) with
| SyncResult.Conflict resync ->
if attempt <> maxSyncAttempts then
let! streamState' = resyncRetryPolicy log attempt resync
log.Debug "Resyncing and retrying"
return! loop streamState' (attempt + 1)
else
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
| SyncResult.Written (token', streamState') ->
return mapResult result (token', streamState') }
// Commence, processing based on the incoming state
loop originState 1
37 changes: 27 additions & 10 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace Equinox

open Equinox.Core
open System.Runtime.InteropServices

/// Exception yielded by Decider.Transact after `count` attempts have yielded conflicts at the point of syncing with the Store
Expand All @@ -9,12 +8,12 @@ type MaxResyncsExhaustedException(count) =

/// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic
type Decider<'event, 'state>
( log, stream : IStream<'event, 'state>, maxAttempts : int,
( log, stream : Core.IStream<'event, 'state>, maxAttempts : int,
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException : int -> exn,
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy) =

do if maxAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxAttempts", maxAttempts, "should be >= 1")
let fetch : LoadOption<'state> option -> (Serilog.ILogger -> Async<StreamToken * 'state>) = function
let fetch : LoadOption<'state> option -> (Serilog.ILogger -> Async<Core.StreamToken * 'state>) = function
| None | Some RequireLoad -> fun log -> stream.Load(log, allowStale = false)
| Some AllowStale -> fun log -> stream.Load(log, allowStale = true)
| Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() }
Expand All @@ -23,12 +22,30 @@ type Decider<'event, 'state>
let! tokenAndState = fetch maybeOption log
return project tokenAndState }
let transact maybeOption decide mapResult = async {
let! tokenAndState = fetch maybeOption log
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let! originTokenAndState = fetch maybeOption log
let resyncRetryPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
return! Flow.transact tokenAndState decide log stream.TrySync (maxAttempts, resyncPolicy, createAttemptsExhaustedException) mapResult }
let (|Context|) (token, state) =
let createMaxAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
let rec loop (token, state) attempt : Async<'view> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
match! decide (token, state) with
| result, [] ->
log.Debug "No events generated"
return mapResult result (token, state)
| result, events ->
match! stream.TrySync (log, token, state, events) with
| Core.SyncResult.Conflict resync ->
if attempt <> maxAttempts then
let! streamState' = resyncRetryPolicy log attempt resync
log.Debug "Resyncing and retrying"
return! loop streamState' (attempt + 1)
else
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
| Core.SyncResult.Written (token', streamState') ->
return mapResult result (token', streamState') }
return! loop originTokenAndState 1 }
let (|Context|) (token : Core.StreamToken, state) =
{ new ISyncContext<'state> with
member _.State = state
member _.Version = token.version
Expand Down Expand Up @@ -81,7 +98,7 @@ and [<NoComparison; NoEquality>] LoadOption<'state> =
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet, and we will be generating events
| AssumeEmpty
/// <summary>Instead of loading from database, seed the loading process with the supplied memento, obtained via <c>ISyncContext.CreateMemento()</c></summary>
| FromMemento of memento : (StreamToken * 'state)
| FromMemento of memento : (Core.StreamToken * 'state)

/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required
and ISyncContext<'state> =
Expand All @@ -96,4 +113,4 @@ and ISyncContext<'state> =
abstract member State : 'state

/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via LoadOption.FromMemento
abstract member CreateMemento : unit -> StreamToken * 'state
abstract member CreateMemento : unit -> Core.StreamToken * 'state