Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResolveOption/FromMemento -> LoadOption #308

Merged
merged 20 commits into from
Mar 4, 2022
Merged
Prev Previous commit
Next Next commit
Simplify Query
  • Loading branch information
bartelink committed Mar 4, 2022
commit 012971a2e477cbaec714958e70e263125e586ac3
3 changes: 1 addition & 2 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.
// NOTE not a real world example - used for an integration test; TODO get a better example where it's actually relevant
member _.UnfavoriteWithPostVersion(clientId, sku) =
let decider = resolve clientId
decider.TransactEx((fun c -> async { return (), decideUnfavorite sku c.State }),
fun _r c -> c.Version)
decider.TransactEx((fun c -> async { return (), decideUnfavorite sku c.State }), fun _r c -> c.Version)

let create log resolveStream =
let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3)
Expand Down
13 changes: 8 additions & 5 deletions src/Equinox/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ type IStream<'event, 'state> =
/// Internal implementation of the Store agnostic load + run/render. See Decider.fs for App-facing APIs.
module internal Flow =

let query (load : Async<StreamToken * 'state>) (project: Equinox.ISyncContext<'state> -> 'result) : Async<'result> = async {
let! tokenAndState = load
let context = { new Equinox.ISyncContext<'state> with
member _.State = snd tokenAndState
member _.Version = (fst tokenAndState).version
member _.CreateMemento() = tokenAndState }
return project context }

/// Represents stream and folding state between the load and run/render phases
type SyncContext<'event, 'state>
( originState : StreamToken * 'state,
Expand Down Expand Up @@ -123,8 +131,3 @@ module internal Flow =
let! streamState = load log
let context = SyncContext(streamState, stream.TrySync)
return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) context decide mapResult }

let query load (stream : IStream<'event, 'state>, log : ILogger) (project: SyncContext<'event, 'state> -> 'result) : Async<'result> = async {
let! streamState = load log
let context = SyncContext(streamState, stream.TrySync)
return project context }
12 changes: 5 additions & 7 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@ type Decider<'event, 'state>
| Some AllowStale -> fun log -> stream.Load(log, true)
| Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() }
| Some (FromMemento (streamToken, state)) -> fun _log -> async { return (streamToken, state) }

let transact maybeOverride decide mapResult =
let query option = Flow.query (load option log)
let transact option decide mapResult =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
Flow.transact (load maybeOverride) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult

let query option args = Flow.query (load option) args
Flow.transact (load option) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult

/// 0. Invoke the supplied <c>interpret</c> function with the present state
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
Expand Down Expand Up @@ -73,8 +71,8 @@ type Decider<'event, 'state>

/// Project from the folded <c>'state</c>, without executing a decision flow as <c>Transact</c> does
member _.Query(projection : 'state -> 'view, ?option) : Async<'view> =
query option (stream, log) (fun context -> projection (context :> ISyncContext<'state>).State)
query option (fun context -> projection context.State)

/// Project from the stream's <c>'state<c> (including extended context), without executing a decision flow as <c>Transact<c> does
member _.QueryEx(projection : ISyncContext<'state> -> 'view, ?option) : Async<'view> =
query option (stream, log) projection
query option projection