Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResolveOption/FromMemento -> LoadOption #308

Merged
merged 20 commits into from
Mar 4, 2022
Merged
Prev Previous commit
Next Next commit
inline type SyncState
  • Loading branch information
bartelink committed Mar 4, 2022
commit 8ade251d47cd01cd1c4d149cb8893b6b9c9df101
82 changes: 26 additions & 56 deletions src/Equinox/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,71 +30,41 @@ and [<NoComparison>] StreamToken = { value : obj; version: int64 }
/// Internal implementation of the Optimistic Concurrency Control loop within which a decider function runs. See Decider.fs for App-facing APIs.
module internal Flow =

/// Represents current token and state within the OCC retry loop
type SyncContext<'event, 'state>
( originState : StreamToken * 'state,
trySync : Serilog.ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>) =

let mutable tokenAndState = originState

let trySyncOr log events (handleFailureResync : Async<StreamToken * 'state> -> Async<bool>) : Async<bool> = async {
let! res = let token, state = tokenAndState in trySync (log,token,state,events)
match res with
| SyncResult.Conflict resync ->
return! handleFailureResync resync
| SyncResult.Written (token', streamState') ->
tokenAndState <- token', streamState'
return true }

member _.CurrentTokenAndState = tokenAndState
member _.TryWithoutResync(log : Serilog.ILogger, events) : Async<bool> =
trySyncOr log events (fun _resync -> async { return false })
member _.TryOrResync(runResync, attemptNumber: int, log : Serilog.ILogger, events) : Async<bool> =
let resyncInPreparationForRetry resync = async {
let! streamState' = runResync log attemptNumber resync
tokenAndState <- streamState'
return false }
trySyncOr log events resyncInPreparationForRetry

/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
/// 2a. if no changes required, exit with known state
/// 2b. if saved without conflict, exit with updated state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
let run (log : Serilog.ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(context : SyncContext<'event, 'state>)
/// 2c. if saved without conflict, exit with updated state
let run (log : Serilog.ILogger)
(originState : StreamToken * 'state)
(decide : StreamToken * 'state -> Async<'result * 'event list>)
(trySync : Serilog.ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>)
(maxSyncAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(mapResult : 'result -> StreamToken * 'state -> 'view) : Async<'view> =

if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")

/// Run a decision cycle - decide what events should be appended given the presented state
let rec loop attempt : Async<'view> = async {
// Runs one decision loop, potentially recursing with resynced state if there's a conflict on the write
let rec loop (token, state) attempt : Async<'view> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
let! result, events = decide context.CurrentTokenAndState
if List.isEmpty events then
match! decide (token, state) with
| result, [] ->
log.Debug "No events generated"
return mapResult result context.CurrentTokenAndState
elif attempt = maxSyncAttempts then
// Special case: on final attempt, we won't be `resync`ing; we're giving up
let! committed = context.TryWithoutResync(log, events)
if not committed then
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
else
return mapResult result context.CurrentTokenAndState
else
let! committed = context.TryOrResync(resyncRetryPolicy, attempt, log, events)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
else
return mapResult result context.CurrentTokenAndState }

return mapResult result (token, state)
| result, events ->
match! trySync (log, token, state, events) with
| SyncResult.Conflict resync ->
if attempt <> maxSyncAttempts then
let! streamState' = resyncRetryPolicy log attempt resync
log.Debug "Resyncing and retrying"
return! loop streamState' (attempt + 1)
else
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
| SyncResult.Written (token', streamState') ->
return mapResult result (token', streamState') }
// Commence, processing based on the incoming state
loop 1
loop originState 1

let transact load (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async {
let transact log load (maxSyncAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) decide trySync mapResult : Async<'result> = async {
if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")
let! streamState = load log
let context = SyncContext(streamState, stream.TrySync)
return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) context decide mapResult }
return! run log streamState decide trySync (maxSyncAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) mapResult }
2 changes: 1 addition & 1 deletion src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Decider<'event, 'state>
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
Flow.transact (fetch maybeOption) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult
Flow.transact log (fetch maybeOption) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) decide stream.TrySync mapResult
let (|Context|) (token, state) =
{ new ISyncContext<'state> with
member _.State = state
Expand Down