Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: Tasks / Struct #337

Merged
merged 14 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,30 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `Equinox`: `Decider.Transact`, `TransactAsync`, `TransactExAsync` overloads [#325](https://github.com/jet/equinox/pull/325)
- `Equinox.ISyncContext.StreamEventBytes`: Exposes stored size of events in the stream (initial impl provides it for `DynamoStore` only) [#326](https://github.com/jet/equinox/pull/326)
- `Equinox.Core`: `Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337)
- `CosmosStore.Prometheus`: Add `rut` tag to enable filtering/grouping by Read vs Write activity as per `DynamoStore` [#321](https://github.com/jet/equinox/pull/321)
- `DynamoStore`/`DynamoStore.Prometheus`: Implements the majority of the `CosmosStore` functionality via `FSharp.AWS.DynamoDB` [#321](https://github.com/jet/equinox/pull/321)
- `EventStoreDb`: As per `EventStore` module, but using the modern `EventStore.Client.Grpc.Streams` client [#196](https://github.com/jet/equinox/pull/196)
- `eqx dump`: `-s` flag is now optional

### Changed

- Performance: Switch surface APIs to `struct` Tuples and Options where relevant, some due to `struct` changes in [`FsCodec` #82](https://github.com/jet/FsCodec/pull/82), and use `task` in hot paths [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308)
- `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
- `Equinox.Decider`: `log` is now supplied via `Equinox.Category` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core`: push `FsCodec` dependency out to concrete stores [#337](https://github.com/jet/equinox/pull/337)
- `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.27.0` [#310](https://github.com/jet/equinox/pull/310)
- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305)
- `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317)
- `Equinox.Tool`/`samples/`: switched to use `Equinox.EventStoreDb` [#196](https://github.com/jet/equinox/pull/196)
- Update all non-Client dependencies except `FSharp.Core`, `FSharp.Control.AsyncSeq` [#310](https://github.com/jet/equinox/pull/310)
- Update all Stores to use `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory<byte>`, see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323)
- `FSharp.Core` requirement to `6.0.0` [#337](https://github.com/jet/equinox/pull/337)
- Update all Stores to use `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory<byte>` and/or `JsonElement` see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323)
- `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish)

### Removed
Expand Down
41 changes: 14 additions & 27 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Stored Procedure | JavaScript code stored in a Container that (repeatedl

Term | Description
--------------------------|------------
Category | Group of Streams bearing a common prefix `{Category}-{StreamId}`
Category | Group of Streams bearing a common prefix `{category}-{streamId}`
Event | json or blob payload, together with an Event Type name representing an Event
EventStore | [Open source](https://eventstore.org) Event Sourcing-optimized data store server and programming model with powerful integrated projection facilities
Rolling Snapshot | Event written to an EventStore stream in order to ensure minimal store roundtrips when there is a Cache miss
Expand Down Expand Up @@ -308,7 +308,7 @@ module Aggregate
(* StreamName section *)

let [<Literal>] Category = "category"
let streamName id = FsCodec.StreamName.create Category (Id.toString id)
let streamName id = struct (Category, Id.toString id)

(* Optionally, Helpers/Types *)

Expand Down Expand Up @@ -372,11 +372,7 @@ type Service internal (resolve : Id -> Equinox.Decider<Events.Event, Fold.State)
let decider = resolve id
decider.Transact(decideX inputs)

let create resolveStream =
let resolve id =
let stream = resolveStream (streamName id)
Equinox.Decider(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 3)
Service(resolve)
let create resolve = Service(streamName >> resolve)
```

- `Service`'s constructor is `internal`; `create` is the main way in which one
Expand Down Expand Up @@ -543,10 +539,8 @@ brevity, that implements all the relevant functions above:
```fsharp
(* Event stream naming + schemas *)

let [<Literal>] Category =
"Favorites"
let streamName (id : ClientId) =
FsCodec.StreamName.create Category (ClientId.toString id)
let [<Literal>] Category = "Favorites"
let streamName (id : ClientId) = struct (Category, ClientId.toString id)

type Item = { id: int; name: string; added: DateTimeOffset }
type Event =
Expand Down Expand Up @@ -593,7 +587,7 @@ let toSnapshot state = [Event.Snapshotted (Array.ofList state)]
* The Service defines operations in business terms, neutral to any concrete
* store selection or implementation supplied only a `resolve` function that can
* be used to map from ids (as supplied to the `streamName` function) to an
* Equinox Stream typically the service should be a stateless Singleton
* Equinox.Decider; Typically the service should be a stateless Singleton
*)

type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =
Expand All @@ -615,10 +609,8 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.
member _.List clientId : Async<Events.Favorited []> =
read clientId

let create resolveStream : Service =
let resolve id =
Equinox.Decider(Serilog.Log.ForContext<Service>(), resolveStream (streamName id), maxAttempts = 3)
Service(resolve)
let create resolve : Service =
Service(streamName >> resolve)
```

<a name="api"></a>
Expand Down Expand Up @@ -697,13 +689,13 @@ Equinox’s Command Handling consists of < 200 lines including interfaces and
comments in https://github.com/jet/equinox/tree/master/src/Equinox - the
elements you'll touch in a normal application are:

- [`module Flow`](https://github.com/jet/equinox/blob/master/src/Equinox/Core.fs#L34) -
- [`module Impl`](https://github.com/jet/equinox/blob/master/src/Equinox/Core.fs#L33) -
internal implementation of Optimistic Concurrency Control / retry loop used
by `Decider`. It's recommended to at least scan this file as it defines the
Transaction semantics that are central to Equinox and the overall `Decider` concept.
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L11) -
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L7) -
surface API one uses to `Transact` or `Query` against a specific stream's state
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L59) -
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L110) -
used to specify optimization overrides to be applied when a `Decider`'s `Query` or `Transact` operations establishes the state of the stream

Its recommended to read the examples in conjunction with perusing the code in
Expand Down Expand Up @@ -846,11 +838,7 @@ type Service internal (resolve : string -> Equinox.Decider<Events.Event, Fold.St
let decider = resolve clientId
decider.Query id

let create resolve =
let resolve clientId =
let streamName = streamName clientId
Equinox.Decider(log, resolve streamName, maxAttempts = 3)
Service(resolve)
let create resolve = Service(streamName >> resolve)
```

`Read` above will do a roundtrip to the Store in order to fetch the most recent
Expand Down Expand Up @@ -921,9 +909,8 @@ result in you ending up with a model that's potentially both:

- the `resolve` parameter affords one a sufficient
[_seam_](http://www.informit.com/articles/article.aspx?p=359417) that
facilitates testing independently with a mocked or stubbed `IStream` (without
adding any references), or a `MemoryStore` (which does necessitate a
reference to a separate Assembly for clarity) as desired.
facilitates testing independently with `MemoryStore` (which does necessitate a
reference to a separate Assembly] as desired.

### Todo[Backend] walkthrough

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

## Core library

- `Equinox` [![NuGet](https://img.shields.io/nuget/v/Equinox.svg)](https://www.nuget.org/packages/Equinox/): Store-agnostic decision flow runner that manages the optimistic concurrency protocol. ([depends](https://www.fuget.org/packages/Equinox) on `FsCodec` (for the `StreamName` type-contract), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc))
- `Equinox` [![NuGet](https://img.shields.io/nuget/v/Equinox.svg)](https://www.nuget.org/packages/Equinox/): Store-agnostic decision flow runner that manages the optimistic concurrency protocol and application-level API surface. ([depends](https://www.fuget.org/packages/Equinox) only on `FSharp.Core` v `6.0.0`

## Serialization support

Expand All @@ -150,7 +150,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

## Data Store libraries

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`AsyncBatchingGate`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncBatchingGate.fs#L41). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`)
- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`AsyncBatchingGate`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncBatchingGate.fs#L41). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc))
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox.Core`, `FsCodec`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox.Core`, `Microsoft.Azure.Cosmos` >= `3.27`, `FsCodec`, `System.Text.Json`, `FSharp.Control.AsyncSeq` >= `2.0.23`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
Expand Down
3 changes: 2 additions & 1 deletion samples/Infrastructure/Infrastructure.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="FSharp.Core" Version="6.0.0" />

<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="FSharp.Core" Version="4.7.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="5.1.1" />
Expand Down
33 changes: 17 additions & 16 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
@@ -1,49 +1,50 @@
module Samples.Infrastructure.Services

open Domain
open Equinox
open FsCodec.SystemTextJson.Interop // use ToJsonElementCodec because we are doing an overkill example
open Microsoft.Extensions.DependencyInjection
open System

type StreamResolver(storage) =
member _.Resolve
( codec : FsCodec.IEventCodec<'event, ReadOnlyMemory<byte>, _>,
type Store(store) =
member _.Category
( codec : FsCodec.IEventCodec<'event, ReadOnlyMemory<byte>, unit>,
fold: 'state -> 'event seq -> 'state,
initial : 'state,
snapshot : ('event -> bool) * ('state -> 'event)) =
match storage with
snapshot : ('event -> bool) * ('state -> 'event)) : Category<'event, 'state, unit> =
match store with
| Storage.StorageConfig.Memory store ->
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial).Resolve
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial)
| Storage.StorageConfig.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy).Resolve
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy)
| Storage.StorageConfig.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.DynamoStore.AccessStrategy.Snapshot snapshot else Equinox.DynamoStore.AccessStrategy.Unoptimized
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, caching, accessStrategy).Resolve
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, caching, accessStrategy)
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStoreDb.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy)
| Storage.StorageConfig.Sql (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy)

type ServiceBuilder(storageConfig, handlerLog) =
let cat = StreamResolver(storageConfig)
let store = Store storageConfig

member _.CreateFavoritesService() =
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot
Favorites.create handlerLog (cat.Resolve(Favorites.Events.codec,fold,initial,snapshot))
let snapshot = Favorites.Fold.isOrigin, Favorites.Fold.snapshot
Favorites.create <| store.Category(Favorites.Events.codec, fold, initial, snapshot).Resolve handlerLog

member _.CreateSaveForLaterService() =
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact
SavedForLater.create 50 handlerLog (cat.Resolve(SavedForLater.Events.codec,fold,initial,snapshot))
let snapshot = SavedForLater.Fold.isOrigin, SavedForLater.Fold.compact
SavedForLater.create 50 <| store.Category(SavedForLater.Events.codec, fold, initial, snapshot).Resolve handlerLog

member _.CreateTodosService() =
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot
TodoBackend.create handlerLog (cat.Resolve(TodoBackend.Events.codec,fold,initial,snapshot))
TodoBackend.create <| store.Category(TodoBackend.Events.codec, fold, initial, snapshot).Resolve handlerLog

let register (services : IServiceCollection, storageConfig, handlerLog) =
let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore
Expand Down
9 changes: 3 additions & 6 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Domain.Cart

let streamName (id: CartId) = FsCodec.StreamName.create "Cart" (CartId.toString id)
let streamName (id: CartId) = struct ("Cart", CartId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand Down Expand Up @@ -164,8 +164,5 @@ type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.St
let decider = resolve cartId
decider.Query(id, Equinox.LoadOption.AllowStale)

let create log resolveStream =
let resolve id =
let stream = resolveStream (streamName id)
Equinox.Decider(log, stream, maxAttempts = 3)
Service(resolve)
let create resolve =
Service(streamName >> resolve)
7 changes: 3 additions & 4 deletions samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Domain.ContactPreferences

type Id = Id of email: string
let streamName (Id email) = FsCodec.StreamName.create "ContactPreferences" email // TODO hash >> base64
let streamName (Id email) = struct ("ContactPreferences", email) // TODO hash >> base64

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -56,6 +56,5 @@ type Service internal (resolve : Id -> Equinox.Decider<Events.Event, Fold.State>
let decider = resolve email
decider.Query(id, Equinox.AllowStale)

let create log resolveStream =
let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3)
Service(resolve)
let create resolve =
Service(streamName >> resolve)
Loading