Skip to content

Commit

Permalink
Inline Flow.transact
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 4, 2022
1 parent 0487027 commit e968459
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 9 deletions.
6 changes: 4 additions & 2 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Decider<'event, 'state>
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException : int -> exn,
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy) =

do if maxAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxAttempts", maxAttempts, "should be >= 1")
let fetch : LoadOption<'state> option -> (Serilog.ILogger -> Async<StreamToken * 'state>) = function
| None | Some RequireLoad -> fun log -> stream.Load(log, allowStale = false)
| Some AllowStale -> fun log -> stream.Load(log, allowStale = true)
Expand All @@ -21,11 +22,12 @@ type Decider<'event, 'state>
let query maybeOption project = async {
let! tokenAndState = fetch maybeOption log
return project tokenAndState }
let transact maybeOption decide mapResult =
let transact maybeOption decide mapResult = async {
let! tokenAndState = fetch maybeOption log
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
Flow.transact log (fetch maybeOption) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) decide stream.TrySync mapResult
return! Flow.transact tokenAndState decide log stream.TrySync (maxAttempts, resyncPolicy, createAttemptsExhaustedException) mapResult }
let (|Context|) (token, state) =
{ new ISyncContext<'state> with
member _.State = state
Expand Down
9 changes: 2 additions & 7 deletions src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ module internal Flow =
/// 2a. if no changes required, exit with known state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
/// 2c. if saved without conflict, exit with updated state
let run (log : Serilog.ILogger)
(originState : StreamToken * 'state)
let transact (originState : StreamToken * 'state)
(decide : StreamToken * 'state -> Async<'result * 'event list>)
(log : Serilog.ILogger)
(trySync : Serilog.ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>)
(maxSyncAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(mapResult : 'result -> StreamToken * 'state -> 'view) : Async<'view> =
Expand All @@ -63,8 +63,3 @@ module internal Flow =
return mapResult result (token', streamState') }
// Commence, processing based on the incoming state
loop originState 1

let transact log load (maxSyncAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) decide trySync mapResult : Async<'result> = async {
if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")
let! streamState = load log
return! run log streamState decide trySync (maxSyncAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) mapResult }

0 comments on commit e968459

Please sign in to comment.