Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResolveOption/FromMemento -> LoadOption #308

Merged
merged 20 commits into from
Mar 4, 2022
Merged
Prev Previous commit
Next Next commit
Polish Core
  • Loading branch information
bartelink committed Mar 4, 2022
commit 2c7bfb6a5f4d7d81d8c6632f3feeac2143997788
7 changes: 4 additions & 3 deletions src/Equinox/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ and [<NoEquality; NoComparison; RequireQualifiedAccess>] SyncResult<'state> =
/// Store-specific opaque token to be used for synchronization purposes
and [<NoComparison>] StreamToken = { value : obj; version: int64 }

/// Internal implementation of the Store agnostic load + run/render. See Decider.fs for App-facing APIs.
/// Internal implementation of the Optimistic Concurrency Control loop within which a decider function runs. See Decider.fs for App-facing APIs.
module internal Flow =

/// Represents stream and folding state between the load and run/render phases
/// Represents current token and state within the OCC retry loop
type SyncContext<'event, 'state>
( originState : StreamToken * 'state,
trySync : Serilog.ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>) =

let mutable tokenAndState = originState

let trySyncOr log events (handleFailureResync : Async<StreamToken * 'state> -> Async<bool>) : Async<bool> = async {
Expand All @@ -45,6 +46,7 @@ module internal Flow =
tokenAndState <- token', streamState'
return true }

member _.CurrentTokenAndState = tokenAndState
member _.TryWithoutResync(log : Serilog.ILogger, events) : Async<bool> =
trySyncOr log events (fun _resync -> async { return false })
member _.TryOrResync(runResync, attemptNumber: int, log : Serilog.ILogger, events) : Async<bool> =
Expand All @@ -53,7 +55,6 @@ module internal Flow =
tokenAndState <- streamState'
return false }
trySyncOr log events resyncInPreparationForRetry
member _.CurrentTokenAndState = tokenAndState

/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
Expand Down
30 changes: 15 additions & 15 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ type Decider<'event, 'state>
( log, stream : IStream<'event, 'state>, maxAttempts : int,
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException : int -> exn,
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy,
?allowStale) =
[<Optional; DefaultParameterValue(null)>] ?allowStale) =

let load : LoadOption<'state> option -> (Serilog.ILogger -> Async<StreamToken * 'state>) = function
| None when allowStale = Some true -> fun log -> stream.Load(log, true)
| None | Some RequireLoad -> fun log -> stream.Load(log, false)
| Some AllowStale -> fun log -> stream.Load(log, true)
| Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() }
let fetch : LoadOption<'state> option -> (Serilog.ILogger -> Async<StreamToken * 'state>) = function
| None when allowStale = Some true -> fun log -> stream.Load(log, allowStale = true)
| None | Some RequireLoad -> fun log -> stream.Load(log, allowStale = false)
| Some AllowStale -> fun log -> stream.Load(log, allowStale = true)
| Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() }
| Some (FromMemento (streamToken, state)) -> fun _log -> async { return (streamToken, state) }
let query option project = async {
let! tokenAndState = load option log
let query maybeOption project = async {
let! tokenAndState = fetch maybeOption log
return project tokenAndState }
let transact option decide mapResult =
let transact maybeOption decide mapResult =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
Flow.transact (load option) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult
Flow.transact (fetch maybeOption) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult
let (|Context|) (token, state) =
{ new ISyncContext<'state> with
member _.State = state
Expand Down Expand Up @@ -72,13 +72,13 @@ type Decider<'event, 'state>

/// Store-agnostic Loading Options
and [<NoComparison; NoEquality>] LoadOption<'state> =
/// No special requests; Obtain latest state from store based on consistency level configured
/// Default policy; Obtain latest state from store based on consistency level configured
| RequireLoad
/// If the Cache holds any state, use that without checking the backing store for updates, implying:
/// - maximizing how much we lean on Optimistic Concurrency Control when doing a `Transact` (you're still guaranteed a consistent outcome)
/// - enabling stale reads [in the face of multiple writers (either in this process or in other processes)] when doing a `Query`
| AllowStale
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet, and we will be generating events
| AssumeEmpty
/// <summary>Instead of loading from database, seed the loading process with the supplied memento, obtained via <c>ISyncContext.CreateMemento()</c></summary>
| FromMemento of memento : (StreamToken * 'state)
Expand All @@ -88,12 +88,12 @@ and ISyncContext<'state> =

/// Exposes the underlying Store's internal Version for the underlying stream.
/// An empty stream is Version 0; one with a single event is Version 1 etc.
/// It's important to consider that this Version is more authoritative than inspecting the `Index` of the last event passed to
/// your `fold` function - the codec may opt to ignore it
/// It's important to consider that this Version is more authoritative than counting the events seen, or adding 1 to
/// the `Index` of the last event passed to your `fold` function - the codec may opt to ignore events
abstract member Version : int64

/// The present State of the stream within the context of this Flow
abstract member State : 'state

/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via LoadOption.Memento
/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via LoadOption.FromMemento
abstract member CreateMemento : unit -> StreamToken * 'state