Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: Tasks / Struct #337

Merged
merged 14 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Perf wip
  • Loading branch information
bartelink committed Sep 1, 2022
commit 9705c52c064e078fdc3839d4b74fe78c5317cc77
8 changes: 2 additions & 6 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ module Aggregate
(* StreamName section *)

let [<Literal>] Category = "category"
let streamName id = FsCodec.StreamName.create Category (Id.toString id)
let streamName id = struct (Category, Id.toString id)

(* Optionally, Helpers/Types *)

Expand Down Expand Up @@ -372,11 +372,7 @@ type Service internal (resolve : Id -> Equinox.Decider<Events.Event, Fold.State)
let decider = resolve id
decider.Transact(decideX inputs)

let create resolveStream =
let resolve id =
let stream = resolveStream (streamName id)
Equinox.Decider(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 3)
Service(resolve)
let create resolve = Service(streamName >> resolve (Serilog.Log.ForContext<Service>()) ())
```

- `Service`'s constructor is `internal`; `create` is the main way in which one
Expand Down
24 changes: 24 additions & 0 deletions Equinox.sln
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Integra
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Prometheus", "src\Equinox.DynamoStore.Prometheus\Equinox.DynamoStore.Prometheus.fsproj", "{A9AF41B3-AB28-4296-B4A4-B90DA7821476}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FsCodec", "..\FsCodec\src\FsCodec\FsCodec.fsproj", "{F36E7938-EA60-4B57-AF15-2A67994A4C54}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FsCodec.SystemTextJson", "..\FsCodec\src\FsCodec.SystemTextJson\FsCodec.SystemTextJson.fsproj", "{AC221574-BCE8-40BB-A556-3C65DB883F39}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FsCodec.NewtonsoftJson", "..\FsCodec\src\FsCodec.NewtonsoftJson\FsCodec.NewtonsoftJson.fsproj", "{1C994EA1-B81B-40AB-9CAC-969AC07903B5}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FsCodec.Box", "..\FsCodec\src\FsCodec.Box\FsCodec.Box.fsproj", "{EC86BF4B-3BA3-4D99-8DEB-DD09D7561C24}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -227,6 +235,22 @@ Global
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.Build.0 = Release|Any CPU
{F36E7938-EA60-4B57-AF15-2A67994A4C54}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F36E7938-EA60-4B57-AF15-2A67994A4C54}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F36E7938-EA60-4B57-AF15-2A67994A4C54}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F36E7938-EA60-4B57-AF15-2A67994A4C54}.Release|Any CPU.Build.0 = Release|Any CPU
{AC221574-BCE8-40BB-A556-3C65DB883F39}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AC221574-BCE8-40BB-A556-3C65DB883F39}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AC221574-BCE8-40BB-A556-3C65DB883F39}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AC221574-BCE8-40BB-A556-3C65DB883F39}.Release|Any CPU.Build.0 = Release|Any CPU
{1C994EA1-B81B-40AB-9CAC-969AC07903B5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1C994EA1-B81B-40AB-9CAC-969AC07903B5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1C994EA1-B81B-40AB-9CAC-969AC07903B5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1C994EA1-B81B-40AB-9CAC-969AC07903B5}.Release|Any CPU.Build.0 = Release|Any CPU
{EC86BF4B-3BA3-4D99-8DEB-DD09D7561C24}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EC86BF4B-3BA3-4D99-8DEB-DD09D7561C24}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EC86BF4B-3BA3-4D99-8DEB-DD09D7561C24}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EC86BF4B-3BA3-4D99-8DEB-DD09D7561C24}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
33 changes: 17 additions & 16 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
@@ -1,49 +1,50 @@
module Samples.Infrastructure.Services

open Domain
open Equinox
open FsCodec.SystemTextJson.Interop // use ToJsonElementCodec because we are doing an overkill example
open Microsoft.Extensions.DependencyInjection
open System

type StreamResolver(storage) =
member _.Resolve
( codec : FsCodec.IEventCodec<'event, ReadOnlyMemory<byte>, _>,
type Store(store) =
member _.Category
( codec : FsCodec.IEventCodec<'event, ReadOnlyMemory<byte>, unit>,
fold: 'state -> 'event seq -> 'state,
initial : 'state,
snapshot : ('event -> bool) * ('state -> 'event)) =
match storage with
snapshot : ('event -> bool) * ('state -> 'event)) : Category<'event, 'state, unit> =
match store with
| Storage.StorageConfig.Memory store ->
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial).Resolve
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial)
| Storage.StorageConfig.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy).Resolve
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy)
| Storage.StorageConfig.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.DynamoStore.AccessStrategy.Snapshot snapshot else Equinox.DynamoStore.AccessStrategy.Unoptimized
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, caching, accessStrategy).Resolve
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, caching, accessStrategy)
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStoreDb.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy)
| Storage.StorageConfig.Sql (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy)

type ServiceBuilder(storageConfig, handlerLog) =
let cat = StreamResolver(storageConfig)
let store = Store storageConfig

member _.CreateFavoritesService() =
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot
Favorites.create handlerLog (cat.Resolve(Favorites.Events.codec,fold,initial,snapshot))
let snapshot = Favorites.Fold.isOrigin, Favorites.Fold.snapshot
Favorites.create <| store.Category(Favorites.Events.codec, fold, initial, snapshot).Resolve handlerLog

member _.CreateSaveForLaterService() =
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact
SavedForLater.create 50 handlerLog (cat.Resolve(SavedForLater.Events.codec,fold,initial,snapshot))
let snapshot = SavedForLater.Fold.isOrigin, SavedForLater.Fold.compact
SavedForLater.create 50 <| store.Category(SavedForLater.Events.codec, fold, initial, snapshot).Resolve handlerLog

member _.CreateTodosService() =
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot
TodoBackend.create handlerLog (cat.Resolve(TodoBackend.Events.codec,fold,initial,snapshot))
TodoBackend.create <| store.Category(TodoBackend.Events.codec, fold, initial, snapshot).Resolve handlerLog

let register (services : IServiceCollection, storageConfig, handlerLog) =
let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore
Expand Down
9 changes: 3 additions & 6 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Domain.Cart

let streamName (id: CartId) = FsCodec.StreamName.create "Cart" (CartId.toString id)
let streamName (id: CartId) = struct ("Cart", CartId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand Down Expand Up @@ -164,8 +164,5 @@ type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.St
let decider = resolve cartId
decider.Query(id, Equinox.LoadOption.AllowStale)

let create log resolveStream =
let resolve id =
let stream = resolveStream (streamName id)
Equinox.Decider(log, stream, maxAttempts = 3)
Service(resolve)
let create resolve =
Service(streamName >> resolve)
7 changes: 3 additions & 4 deletions samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Domain.ContactPreferences

type Id = Id of email: string
let streamName (Id email) = FsCodec.StreamName.create "ContactPreferences" email // TODO hash >> base64
let streamName (Id email) = struct ("ContactPreferences", email) // TODO hash >> base64

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -56,6 +56,5 @@ type Service internal (resolve : Id -> Equinox.Decider<Events.Event, Fold.State>
let decider = resolve email
decider.Query(id, Equinox.AllowStale)

let create log resolveStream =
let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3)
Service(resolve)
let create resolve =
Service(streamName >> resolve)
8 changes: 5 additions & 3 deletions samples/Store/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="FSharp.Core" Version="4.5.4" />
<!-- <PackageReference Include="FSharp.Core" Version="4.5.4" />-->

<PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0-rc.4" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.4" />
<!-- <PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0-rc.4" />-->
<!-- <PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.4" />-->
<ProjectReference Include="..\..\..\..\FsCodec\src\FsCodec.SystemTextJson\FsCodec.SystemTextJson.fsproj" />
<ProjectReference Include="..\..\..\..\FsCodec\src\FsCodec.NewtonsoftJson\FsCodec.NewtonsoftJson.fsproj" />

<ProjectReference Include="..\..\..\src\Equinox\Equinox.fsproj" />
</ItemGroup>
Expand Down
7 changes: 3 additions & 4 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Domain.Favorites

let streamName (id: ClientId) = FsCodec.StreamName.create "Favorites" (ClientId.toString id)
let streamName (id: ClientId) = struct ("Favorites", ClientId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -77,6 +77,5 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.
let decider = resolve clientId
decider.TransactEx((fun c -> (), decideUnfavorite sku c.State), fun () c -> c.Version)

let create log resolveStream =
let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3)
Service(resolve)
let create resolve =
Service(streamName >> resolve)
6 changes: 2 additions & 4 deletions samples/Store/Domain/InventoryItem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Domain.InventoryItem

open System

let streamName (id : InventoryItemId) = FsCodec.StreamName.create "InventoryItem" (InventoryItemId.toString id)
let streamName (id : InventoryItemId) = struct ("InventoryItem", InventoryItemId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -67,6 +67,4 @@ type Service internal (resolve : InventoryItemId -> Equinox.Decider<Events.Event
decider.Query id

let create resolveStream =
let resolve id =
Equinox.Decider(Serilog.Log.ForContext<Service>(), resolveStream (streamName id), maxAttempts = 3)
Service(resolve)
Service(streamName >> resolveStream >> Equinox.Decider)
7 changes: 3 additions & 4 deletions samples/Store/Domain/SavedForLater.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
open System
open System.Collections.Generic

let streamName (id: ClientId) = FsCodec.StreamName.create "SavedForLater" (ClientId.toString id)
let streamName (id: ClientId) = struct ("SavedForLater", ClientId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -142,6 +142,5 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.
let! state = read clientId
return! execute targetId (Merge state) }

let create maxSavedItems log resolveStream =
let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3)
Service(resolve, maxSavedItems)
let create maxSavedItems resolve =
Service(streamName >> resolve, maxSavedItems)
36 changes: 19 additions & 17 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@ let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot

let createMemoryStore () = MemoryStore.VolatileStore<ReadOnlyMemory<byte>>()
let createServiceMemory log store =
Cart.create log (MemoryStore.MemoryStoreCategory(store, Cart.Events.codec, fold, initial).Resolve)
MemoryStore.MemoryStoreCategory(store, Cart.Events.codec, fold, initial)
|> Decider.resolve Serilog.Log.Logger |> Cart.create


let codec = Cart.Events.codec
let codecJe = Cart.Events.codecJe

let resolveGesStreamWithRollingSnapshots context =
EventStoreDb.EventStoreCategory(context, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot).Resolve
let resolveGesStreamWithoutCustomAccessStrategy context =
EventStoreDb.EventStoreCategory(context, codec, fold, initial).Resolve
let categoryGesStreamWithRollingSnapshots context =
EventStoreDb.EventStoreCategory(context, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot)
let categoryGesStreamWithoutCustomAccessStrategy context =
EventStoreDb.EventStoreCategory(context, codec, fold, initial)

let resolveCosmosStreamWithSnapshotStrategy context =
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve
let resolveCosmosStreamWithoutCustomAccessStrategy context =
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve
let categoryCosmosStreamWithSnapshotStrategy context =
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot)
let categoryCosmosStreamWithoutCustomAccessStrategy context =
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized)

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Cart.Service) count =
service.ExecuteManyAsync(cartId, false, seq {
Expand All @@ -51,35 +53,35 @@ type Tests(testOutputHelper) =
do! act service args
}

let arrangeEs connect choose resolveStream = async {
let arrangeEs connect choose createCategory = async {
let client = connect log
let context = choose client defaultBatchSize
return Cart.create log (resolveStream context) }
return Cart.create (createCategory context |> Decider.resolve log) }

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrangeEs connectToLocalEventStoreNode createContext resolveGesStreamWithoutCustomAccessStrategy
let! service = arrangeEs connectToLocalEventStoreNode createContext categoryGesStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async {
let! service = arrangeEs connectToLocalEventStoreNode createContext resolveGesStreamWithRollingSnapshots
let! service = arrangeEs connectToLocalEventStoreNode createContext categoryGesStreamWithRollingSnapshots
do! act service args
}

let arrangeCosmos connect resolveStream =
let arrangeCosmos connect createCategory =
let context : CosmosStore.CosmosStoreContext = connect log defaultQueryMaxItems
Cart.create log (resolveStream context)
Cart.create (createCategory context |> Decider.resolve log)

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = Async.RunSynchronously <| async {
let service = arrangeCosmos createPrimaryContext resolveCosmosStreamWithoutCustomAccessStrategy
let service = arrangeCosmos createPrimaryContext categoryCosmosStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with With Snapshotting`` args = Async.RunSynchronously <| async {
let service = arrangeCosmos createPrimaryContext resolveCosmosStreamWithSnapshotStrategy
let service = arrangeCosmos createPrimaryContext categoryCosmosStreamWithSnapshotStrategy
do! act service args
}
4 changes: 2 additions & 2 deletions samples/Store/Integration/CodecIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ let codec = FsCodec.NewtonsoftJson.Codec.Create()

[<AutoData(MaxTest=100)>]
let ``Can roundtrip, rendering correctly`` (x: SimpleDu) =
let serialized = codec.Encode(None, x)
let serialized = codec.Encode((), x)
let d = serialized.Data
render x =! if d.IsEmpty then null else System.Text.Encoding.UTF8.GetString(d.Span)
let adapted = FsCodec.Core.TimelineEvent.Create(-1L, serialized.EventType, d)
let deserialized = codec.TryDecode adapted |> Option.get
let deserialized = codec.TryDecode adapted |> ValueOption.get
deserialized =! x
Loading