Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: Tasks / Struct #337

Merged
merged 14 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleanup/review
  • Loading branch information
bartelink committed Sep 2, 2022
commit a7f6e33b06534ae3c60221614e69c3a1c8a925bf
10 changes: 5 additions & 5 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Stored Procedure | JavaScript code stored in a Container that (repeatedl

Term | Description
--------------------------|------------
Category | Group of Streams bearing a common prefix `{Category}-{StreamId}`
Category | Group of Streams bearing a common prefix `{category}-{streamId}`
Event | json or blob payload, together with an Event Type name representing an Event
EventStore | [Open source](https://eventstore.org) Event Sourcing-optimized data store server and programming model with powerful integrated projection facilities
Rolling Snapshot | Event written to an EventStore stream in order to ensure minimal store roundtrips when there is a Cache miss
Expand Down Expand Up @@ -587,7 +587,7 @@ let toSnapshot state = [Event.Snapshotted (Array.ofList state)]
* The Service defines operations in business terms, neutral to any concrete
* store selection or implementation supplied only a `resolve` function that can
* be used to map from ids (as supplied to the `streamName` function) to an
* Equinox Decider; Typically the service should be a stateless Singleton
* Equinox.Decider; Typically the service should be a stateless Singleton
*)

type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =
Expand Down Expand Up @@ -689,13 +689,13 @@ Equinox’s Command Handling consists of < 200 lines including interfaces and
comments in https://github.com/jet/equinox/tree/master/src/Equinox - the
elements you'll touch in a normal application are:

- [`module Impl`](https://github.com/jet/equinox/blob/master/src/Equinox/Core.fs#L34) -
- [`module Impl`](https://github.com/jet/equinox/blob/master/src/Equinox/Core.fs#L33) -
internal implementation of Optimistic Concurrency Control / retry loop used
by `Decider`. It's recommended to at least scan this file as it defines the
Transaction semantics that are central to Equinox and the overall `Decider` concept.
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L11) -
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L7) -
surface API one uses to `Transact` or `Query` against a specific stream's state
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L59) -
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L110) -
used to specify optimization overrides to be applied when a `Decider`'s `Query` or `Transact` operations establishes the state of the stream

Its recommended to read the examples in conjunction with perusing the code in
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Domain/InventoryItem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ type Service internal (resolve : InventoryItemId -> Equinox.Decider<Events.Event
let decider = resolve itemId
decider.Query id

let create resolveStream =
Service(streamName >> resolveStream >> Equinox.Decider)
let create resolve =
Service(streamName >> resolve)
3 changes: 1 addition & 2 deletions samples/TodoBackend/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,4 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.
let! state' = handle clientId (Command.Update item)
return List.find (fun x -> x.id = item.id) state' }

let create resolve =
Service(streamName >> resolve)
let create resolve = Service(streamName >> resolve)
2 changes: 1 addition & 1 deletion samples/Tutorial/AsAt.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ module EventStore =
// rig snapshots to be injected as events into the stream every `snapshotWindow` events
let accessStrategy = AccessStrategy.RollingSnapshots (Fold.isValid,Fold.snapshot)
let cat = EventStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = streamName >> cat.Resolve Log.log ()
let resolve = streamName >> Equinox.Decider.resolve log

module Cosmos =

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Cosmos.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module Favorites =
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let category = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create <| category.Resolve Log.log ()
create <| Equinox.Decider.resolve Log.log cat

let [<Literal>] appName = "equinox-tutorial"

Expand Down
4 changes: 2 additions & 2 deletions samples/Tutorial/Counter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Service internal (resolve : string -> Equinox.Decider<Event, State>) =
open Serilog
let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
let logEvents c s (events : FsCodec.ITimelineEvent<_>[]) =
log.Information("Committed to {categoryName}-{aggregateId}, events: {@events}", c, s, seq { for x in events -> x.EventType })
log.Information("Committed to {categoryName}-{streamId}, events: {@events}", c, s, seq { for x in events -> x.EventType })

(* We can integration test using an in-memory store
See other examples such as Cosmos.fsx to see how we integrate with CosmosDB and/or other concrete stores *)
Expand All @@ -93,7 +93,7 @@ let store = Equinox.MemoryStore.VolatileStore()
let _ = store.Committed.Subscribe(fun (c, s, xs) -> logEvents c s xs)
let codec = FsCodec.Box.Codec.Create()
let cat = Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial)
let resolve = cat.Resolve log ()
let resolve = cat |> Equinox.Decider.resolve log
let service = Service(streamName >> resolve)

let clientId = "ClientA"
Expand Down
4 changes: 2 additions & 2 deletions samples/Tutorial/Favorites.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ let codec =
// Each store has a <Store>Category that is used to resolve IStream instances binding to a specific stream in a specific store
// ... because the nature of the contract with the handler is such that the store hands over State, we also pass the `initial` and `fold` as we used above
let cat = Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial)
let decider = cat.Resolve log ()
let decider = Equinox.Decider.resolve log cat

// We hand the streamId to the resolver
let clientAStream = decider clientAFavoritesStreamName
Expand Down Expand Up @@ -159,7 +159,7 @@ type Service(deciderFor : string -> Handler) =
(* See Counter.fsx and Cosmos.fsx for a more compact representation which makes the Handler wiring less obtrusive *)
let streamFor (clientId: string) =
let streamIds = struct ("Favorites", clientId)
let decider = cat.Resolve log () streamIds
let decider = Equinox.Decider.resolve log cat streamIds
Handler(decider)

let service = Service(streamFor)
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/FulfilmentCenter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ module Store =
open FulfilmentCenter

let category = CosmosStoreCategory(Store.context, Events.codec, Fold.fold, Fold.initial, Store.cacheStrategy, AccessStrategy.Unoptimized)
let resolve = category.Resolve Log.log ()
let resolve = Equinox.Decider.resolve Log.log category
let service = Service(streamName >> resolve)

let fc = "fc0"
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Todo.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ module TodosCategory =

let access = AccessStrategy.Snapshot (isOrigin,snapshot)
let category = CosmosStoreCategory(Store.context, codec, fold, initial, Store.cacheStrategy, access=access)
let resolve = category.Resolve log ()
let resolve = Equinox.Decider.resolve log category

let service = Service(streamName >> TodosCategory.resolve)

Expand Down
3 changes: 1 addition & 2 deletions samples/Tutorial/Upload.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type Service internal (resolve : CompanyId * PurchaseOrderId -> Equinox.Decider<
let decider = resolve (companyId, purchaseOrderId)
decider.Transact(decide value)

let create resolve =
Service(streamName >> resolve)
let create resolve = Service(streamName >> resolve)

module Cosmos =

Expand Down
14 changes: 7 additions & 7 deletions src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ open System.Threading.Tasks
/// Asynchronous Lazy<'T> used to gate a workflow to ensure at most once execution of a computation.
type AsyncLazy<'T>(workflow : unit -> Task<'T>) =

/// NOTE due to `Lazy<T>` semantics, failed attempts will cache any exception; AsyncCacheCell compensates for this by rolling over to a new instance
let workflow = lazy workflow ()

/// Synchronously check whether the value has been computed (and/or remains valid)
Expand All @@ -16,7 +17,7 @@ type AsyncLazy<'T>(workflow : unit -> Task<'T>) =
if t.Status <> TaskStatus.RanToCompletion then false else

match isExpired with
| ValueSome f -> not (f t.Result)
| ValueSome isExpired -> not (isExpired t.Result)
| _ -> true

/// Used to rule out values where the computation yielded an exception or the result has now expired
Expand All @@ -32,7 +33,6 @@ type AsyncLazy<'T>(workflow : unit -> Task<'T>) =
| _ -> return ValueSome res }

/// Await the outcome of the computation.
/// NOTE due to `Lazy<T>` semantics, failed attempts will cache any exception; AsyncCacheCell compensates for this
member _.Await() = workflow.Value

/// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics.
Expand All @@ -41,6 +41,7 @@ type AsyncLazy<'T>(workflow : unit -> Task<'T>) =
type AsyncCacheCell<'T>(workflow : CancellationToken -> Task<'T>, ?isExpired : 'T -> bool) =

let isExpired = match isExpired with Some x -> ValueSome x | None -> ValueNone
// we can't pre-initialize as we need the invocation to be tied to a CancellationToken
let mutable cell = AsyncLazy(fun () -> Task.FromException<'T>(System.InvalidOperationException "AsyncCacheCell Not Yet initialized"))

/// Synchronously check the value remains valid (to short-circuit an Await step where value not required)
Expand All @@ -53,9 +54,8 @@ type AsyncCacheCell<'T>(workflow : CancellationToken -> Task<'T>, ?isExpired : '
match! current.TryAwaitValid(isExpired) with
| ValueSome res -> return res // ... if it's already / still valid, we're done
| ValueNone ->
// Prepare to do the work, with cancellation under our control (it won't start until the first Await on the LazyTask triggers it though)
let execute () = workflow ct
// avoid unnecessary recomputation in cases where competing threads detect expiry;
// the first write attempt wins, and everybody else reads off that value
let _ = Interlocked.CompareExchange(&cell, AsyncLazy execute, current)
// Prepare a new instance, with cancellation under our control (it won't start until the first Await on the LazyTask triggers it though)
let newInstance = AsyncLazy(fun () -> workflow ct)
// If there are concurrent executions, the first through the gate wins; everybody else awaits the instance the winner wrote
let _ = Interlocked.CompareExchange(&cell, newInstance, current)
return! cell.Await() }
4 changes: 2 additions & 2 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type Cache(name, sizeMb : int) =
member _.UpdateIfNewer(key, options, entry) =
let policy = toPolicy options
match cache.AddOrGetExisting(key, box entry, policy) with
| null -> Task.FromResult ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry; Task.FromResult ()
| null -> Task.FromResult()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry; Task.FromResult()
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x

member _.TryGet key =
Expand Down
13 changes: 6 additions & 7 deletions src/Equinox.Core/Category.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,22 @@ type Category<'event, 'state, 'context>(
resolveInner : struct (string * string) -> struct (Core.ICategory<'event, 'state, 'context> * string * (CancellationToken -> Task<unit>) voption),
empty : struct (Core.StreamToken * 'state)) =

member _.Stream(log : Serilog.ILogger, context : 'context, categoryName, aggregateId) =
member _.Stream(log : Serilog.ILogger, context : 'context, categoryName, streamId) =
let struct (inner, streamName, init) = resolveInner (categoryName, streamId)
{ new Core.IStream<'event, 'state> with
member _.LoadEmpty() =
empty
member x.Load(allowStale, ct) =
let struct (inner, streamName, _init) = resolveInner (categoryName, aggregateId)
inner.Load(log, categoryName, aggregateId, streamName, allowStale, ct)
inner.Load(log, categoryName, streamId, streamName, allowStale, ct)
member _.TrySync(attempt, (token, originState), events, ct) =
let struct (inner, streamName, init) = resolveInner (categoryName, aggregateId)
let log = if attempt = 1 then log else log.ForContext("attempts", attempt)
inner.TrySync(log, categoryName, aggregateId, streamName, context, init, token, originState, events, ct) }
inner.TrySync(log, categoryName, streamId, streamName, context, init, token, originState, events, ct) }

module Stream =

let resolveWithContext (ctx : 'context) log (cat : Category<'event, 'state, 'context>) : struct (string * string) -> Core.IStream<'event, 'state> =
fun struct (categoryName, aggregateId) ->
cat.Stream(log, ctx, categoryName, aggregateId)
fun struct (categoryName, streamId) ->
cat.Stream(log, ctx, categoryName, streamId)

let resolve log (cat : Category<'event, 'state, unit>) =
resolveWithContext () log cat
Expand Down
8 changes: 4 additions & 4 deletions src/Equinox.Core/Types.fs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
namespace Equinox.Core

open System.Threading
open System.Threading.Tasks
open Serilog
open System
open System.Diagnostics
open System.Threading
open System.Threading.Tasks

/// Store-agnostic interface representing interactions an Application can have with a set of streams with a common event type
type ICategory<'event, 'state, 'context> =
/// Obtain the state from the target stream
abstract Load : log: ILogger * categoryName: string * aggregateId: string * streamName: string * allowStale: bool * ct: CancellationToken -> Task<struct (StreamToken * 'state)>
abstract Load : log: ILogger * categoryName: string * streamId: string * streamName: string * allowStale: bool * ct: CancellationToken -> Task<struct (StreamToken * 'state)>

/// Given the supplied `token`, attempt to sync to the proposed updated `state'` by appending the supplied `events` to the underlying stream, yielding:
/// - Written: signifies synchronization has succeeded, implying the included StreamState should now be assumed to be the state of the stream
/// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State
/// NB the central precondition upon which the sync is predicated is that the stream has not diverged from the `originState` represented by `token`
/// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store)
abstract TrySync : log: ILogger * categoryName: string * aggregateId: string * streamName: string * 'context * maybeInit: (CancellationToken -> Task<unit>) voption
abstract TrySync : log: ILogger * categoryName: string * streamId: string * streamName: string * 'context * maybeInit: (CancellationToken -> Task<unit>) voption
* StreamToken * 'state * events: 'event list * CancellationToken -> Task<SyncResult<'state>>

/// Represents a time measurement of a computation that includes stopwatch tick metadata
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
namespace Equinox.CosmosStore.Core

open System.Threading
open Equinox.Core
open FsCodec
open FSharp.Control
open Microsoft.Azure.Cosmos
open Serilog
open System
open System.Text.Json
open System.Threading
open System.Threading.Tasks

type EventBody = JsonElement
Expand Down
3 changes: 1 addition & 2 deletions src/Equinox.DynamoStore/Equinox.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.0.0" PrivateAssets="All" />

<!-- FSharp.AWS.DynamoDB has 4.7.2 as a minimum requirement, but we need tasks -->
<PackageReference Include="FSharp.Core" Version="6.0.0" />

<PackageReference Include="FsCodec" Version="3.0.0-rc.5" />
<PackageReference Include="FsCodec" Version="3.0.0-rc.5" />
<PackageReference Include="FSharp.AWS.DynamoDB" Version="0.11.1-beta" />
<PackageReference Include="FSharp.Control.AsyncSeq" Version="2.0.23" />
</ItemGroup>
Expand Down
Loading