Skip to content

Commit

Permalink
Async cleanup + test flakiness reduction (#392)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Jun 6, 2023
1 parent 5cd8829 commit c454817
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 180 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338)
- `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317)
- `Equinox.Tool`/`samples/`: switched to use `Equinox.EventStoreDb` [#196](https://github.com/jet/equinox/pull/196)
- Replace `AsyncSeq` usage with `FSharp.Control.TaskSeq` v `0.4.0` [#361](https://github.com/jet/equinox/pull/361)
- Replace `AsyncSeq` usage with `FSharp.Control.TaskSeq` v `0.4.0` [#361](https://github.com/jet/equinox/pull/361) [#391](https://github.com/jet/equinox/pull/391)
- Raise `FSharp.Core` requirement to `6.0.7` [#337](https://github.com/jet/equinox/pull/337) [#33](https://github.com/jet/equinox/pull/362)
- Update all Stores to use `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory<byte>` and/or `JsonElement` see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323)
- Update all non-Client dependencies except `FSharp.Core`, `FSharp.Control.AsyncSeq` [#310](https://github.com/jet/equinox/pull/310)
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ _If you're looking to learn more about and/or discuss Event Sourcing and it's my
- `Box.Codec`: lightweight [non-serializing substitute equivalent to `NewtonsoftJson.Codec` for use in unit and integration tests](https://github.com/jet/FsCodec#boxcodec)
- `Codec`: an explicitly coded pair of `encode` and `tryDecode` functions for when you need to customize
- Caching using the .NET `MemoryCache` to:
- Minimize round trips (pluggable via [`ICache`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Cache.fs#L22) :pray: [@DSilence](https://github.com/jet/equinox/pull/161)
- Minimize latency and bandwidth / Request Charges by maintaining the folded state, without making the Domain Model folded state serializable
- Minimize round trips; consistent implementation across stores :pray: [@DSilence](https://github.com/jet/equinox/pull/161)
- Minimize latency and bandwidth / Request Charges by maintaining the folded state, without needing the Domain Model folded state to be serializable
- Enable read through caching, coalescing concurrent reads via opt-in `LoadOption.AllowStale`
- Mature and comprehensive logging (using [Serilog](https://github.com/serilog/serilog) internally), with optimal performance and pluggable integration with your apps hosting context (we ourselves typically feed log info to Splunk and the metrics embedded in the `Serilog.Events.LogEvent` Properties to Prometheus; see relevant tests for examples)
- OpenTelemetry Integration (presently only implemented in `Equinox.Core` and `Equinox.MessageDb` ... `#help-wanted`)
- **`Equinox.EventStore`, `Equinox.SqlStreamStore`: In-stream Rolling Snapshots**:
Expand Down
8 changes: 4 additions & 4 deletions diagrams/container.puml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ frame "Consistent Event Stores" as stores <<Expanded>> {
interface IStream <<Component>>
rectangle "Equinox.Core" as core <<External>> {
rectangle "System.MemoryCache" <<External Container>>
interface ICache <<Component>>
interface Cache <<Component>>
}
frame "SqlStreamStore" as ssss <<Internal>> {
rectangle "Equinox.SqlStreamStore.MsSql" <<Component>> as ssm
Expand Down Expand Up @@ -78,13 +78,13 @@ publishers <-- cr : can feed from
publishers <-- er : can feed from

ms .> IStream : implements
es -> ICache
es -> Cache
es .> IStream : implements
es -> esc
cs --> ICache
cs --> Cache
cs .> IStream : implements
cs <-> cc
sss -> ICache
sss -> Cache
sss .> IStream : implements

ssm -> sss : is a
Expand Down
3 changes: 2 additions & 1 deletion src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace Equinox.Core

open System
open System.Threading
open System.Threading.Tasks

Expand Down Expand Up @@ -31,7 +32,7 @@ type AsyncLazy<'T>(workflow: unit -> Task<'T>) =
member _.Await() = workflow.Value

/// Singleton Empty value
static member val Empty = AsyncLazy(fun () -> Task.FromException<'T>(System.InvalidOperationException "Uninitialized AsyncLazy"))
static member val Empty = AsyncLazy(fun () -> Task.FromException<'T>(InvalidOperationException "Uninitialized AsyncLazy"))

/// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics.
/// If `workflow` fails, all readers entering while the load/refresh is in progress will share the failure
Expand Down
1 change: 1 addition & 0 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace Equinox
open Equinox.Core
open Equinox.Core.Tracing
open System
open System.Threading

type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, initialTimestamp: int64) =
let mutable currentToken = initialToken
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.Core/Equinox.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
<Compile Include="Tracing.fs" />
<Compile Include="Category.fs" />
<Compile Include="StopwatchInterval.fs" />
<Compile Include="Infrastructure.fs" />
<Compile Include="AsyncCacheCell.fs" />
<Compile Include="Cache.fs" />
<Compile Include="Caching.fs" />
<Compile Include="Infrastructure.fs" />
<Compile Include="Retry.fs" />
<Compile Include="AsyncBatchingGate.fs" />
</ItemGroup>
Expand Down
3 changes: 3 additions & 0 deletions src/Equinox.Core/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type Async with
module Async =

let inline startImmediateAsTask ct computation = Async.StartImmediateAsTask(computation, ct)
let inline call (f : System.Threading.CancellationToken -> Task<'T>) = async {
let! ct = Async.CancellationToken
return! f ct |> Async.AwaitTaskCorrect }

module ValueTuple =

Expand Down
91 changes: 41 additions & 50 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -523,29 +523,26 @@ module Initialization =
let private (|ThroughputProperties|) = function
| Throughput.Manual rus -> ThroughputProperties.CreateManualThroughput(rus)
| Throughput.Autoscale maxRus -> ThroughputProperties.CreateAutoscaleThroughput(maxRus)
let private createOrProvisionDatabase (client: CosmosClient) dName mode = async {
let! ct = Async.CancellationToken
let createDatabaseIfNotExists maybeTp = async {
let! r = client.CreateDatabaseIfNotExistsAsync(dName, throughputProperties = Option.toObj maybeTp, cancellationToken = ct) |> Async.AwaitTaskCorrect
return r.Database }
match mode with
| Provisioning.Container _ | Provisioning.Serverless -> return! createDatabaseIfNotExists None
| Provisioning.Database (ThroughputProperties tp) ->
let! d = createDatabaseIfNotExists (Some tp)
let! _ = d.ReplaceThroughputAsync(tp, cancellationToken = ct) |> Async.AwaitTaskCorrect
let private createDatabaseIfNotExists (client: CosmosClient) maybeTp dName = async {
let! r = Async.call (fun ct -> client.CreateDatabaseIfNotExistsAsync(dName, throughputProperties = Option.toObj maybeTp, cancellationToken = ct))
return r.Database }
let private createOrProvisionDatabase (client: CosmosClient) dName = function
| Provisioning.Container _ | Provisioning.Serverless -> createDatabaseIfNotExists client None dName
| Provisioning.Database (ThroughputProperties tp) -> async {
let! d = createDatabaseIfNotExists client (Some tp) dName
let! _ = Async.call (fun ct -> d.ReplaceThroughputAsync(tp, cancellationToken = ct))
return d }
let private createOrProvisionContainer (d: Database) (cName, pkPath, customizeContainer) mode = async {
let private createContainerIfNotExists (d: Database) cp maybeTp = async {
let! r = Async.call (fun ct -> d.CreateContainerIfNotExistsAsync(cp, throughputProperties = Option.toObj maybeTp, cancellationToken = ct))
return r.Container }
let private createOrProvisionContainer (d: Database) (cName, pkPath, customizeContainer) mode =
let cp = ContainerProperties(id = cName, partitionKeyPath = pkPath)
customizeContainer cp
let! ct = Async.CancellationToken
let createContainerIfNotExists maybeTp = async {
let! r = d.CreateContainerIfNotExistsAsync(cp, throughputProperties = Option.toObj maybeTp, cancellationToken = ct) |> Async.AwaitTaskCorrect
return r.Container }
match mode with
| Provisioning.Database _ | Provisioning.Serverless -> return! createContainerIfNotExists None
| Provisioning.Container (ThroughputProperties throughput) ->
let! c = createContainerIfNotExists (Some throughput)
let! _ = c.ReplaceThroughputAsync(throughput, cancellationToken = ct) |> Async.AwaitTaskCorrect
| Provisioning.Database _ | Provisioning.Serverless -> createContainerIfNotExists d cp None
| Provisioning.Container (ThroughputProperties throughput) -> async {
let! c = createContainerIfNotExists d cp (Some throughput)
let! _ = Async.call (fun ct -> c.ReplaceThroughputAsync(throughput, cancellationToken = ct))
return c }

let private createStoredProcIfNotExists (c: Container) (name, body) ct: Task<float> = task {
Expand Down Expand Up @@ -723,7 +720,7 @@ module internal Query =
let! events =
batchesBackward
|> TaskSeq.collectSeq (fun (events, maybePos, r) ->
if maybeTipPos = None then maybeTipPos <- maybePos
if Option.isNone maybeTipPos then maybeTipPos <- maybePos
lastResponse <- Some events; ru <- ru + r
responseCount <- responseCount + 1
seq { for x in events -> struct (x, tryDecode x) })
Expand Down Expand Up @@ -1176,11 +1173,10 @@ type CosmosClientFactory
| Discovery.ConnectionString cs -> new CosmosClient(cs, x.Options)

/// Creates and validates a Client [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitialize(discovery: Discovery, containers) = async {
let! ct = Async.CancellationToken
member x.CreateAndInitialize(discovery: Discovery, containers) =
match discovery with
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> return! CosmosClient.CreateAndInitializeAsync(string uri, key, containers, x.Options, ct) |> Async.AwaitTaskCorrect
| Discovery.ConnectionString cs -> return! CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct) |> Async.AwaitTaskCorrect }
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> Async.call (fun ct -> CosmosClient.CreateAndInitializeAsync(string uri, key, containers, x.Options, ct))
| Discovery.ConnectionString cs -> Async.call (fun ct -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct))

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosStoreConnector
Expand Down Expand Up @@ -1404,6 +1400,7 @@ type CosmosStoreCategory<'event, 'state, 'context> internal (resolveInner, empty
namespace Equinox.CosmosStore.Core

open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
open Equinox.Core
open FsCodec
Expand Down Expand Up @@ -1470,9 +1467,8 @@ type EventsContext internal

/// Establishes the current position of the stream in as efficient a manner as possible
/// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency validation in the case of an unchanged Tip)
member _.Sync(stream, [<O; D null>] ?position: Position): Async<Position> = async {
let! ct = Async.CancellationToken
let! Token.Unpack pos' = store.GetPosition(log, stream, ct, ?pos = position) |> Async.AwaitTaskCorrect
member _.Sync(stream, ct, [<O; D null>] ?position: Position): Task<Position> = task {
let! Token.Unpack pos' = store.GetPosition(log, stream, ct, ?pos = position)
return pos' }

/// Query (with MaxItems set to `queryMaxItems`) from the specified `Position`, allowing the reader to efficiently walk away from a running query
Expand Down Expand Up @@ -1501,9 +1497,8 @@ type EventsContext internal

/// Low level, non-idempotent call appending events to a stream without a concurrency control mechanism in play
/// NB Should be used sparingly; Equinox.Decider enables building equivalent equivalent idempotent handling with minimal code.
member x.NonIdempotentAppend(stream, events: IEventData<_>[]): Async<Position> = async {
let! ct = Async.CancellationToken
match! x.Sync(stream, Position.fromAppendAtEnd, events, ct) |> Async.AwaitTaskCorrect with
member x.NonIdempotentAppend(stream, events: IEventData<_>[], ct): Task<Position> = task {
match! x.Sync(stream, Position.fromAppendAtEnd, events, ct) with
| AppendResult.Ok token -> return token
| x -> return x |> sprintf "Conflict despite it being disabled %A" |> invalidOp }

Expand All @@ -1520,16 +1515,16 @@ type EventData() =
module Events =

let private (|PositionIndex|) (x: Position) = x.index
let private stripSyncResult (f: Task<AppendResult<Position>>): Async<AppendResult<int64>> = async {
match! f |> Async.AwaitTaskCorrect with
| AppendResult.Ok (PositionIndex index)-> return AppendResult.Ok index
let private stripSyncResult (f: Task<AppendResult<Position>>): Task<AppendResult<int64>> = task {
match! f with
| AppendResult.Ok (PositionIndex index) -> return AppendResult.Ok index
| AppendResult.Conflict (PositionIndex index, events) -> return AppendResult.Conflict (index, events)
| AppendResult.ConflictUnknown (PositionIndex index) -> return AppendResult.ConflictUnknown index }
let private stripPosition (f: Async<Position>): Async<int64> = async {
let private stripPosition (f: Task<Position>): Task<int64> = task {
let! (PositionIndex index) = f
return index }
let private dropPosition (f: Task<Position * ITimelineEvent<EventBody>[]>): Async<ITimelineEvent<EventBody>[]> = async {
let! _, xs = f |> Async.AwaitTaskCorrect
let private dropPosition (f: Task<Position * ITimelineEvent<EventBody>[]>): Task<ITimelineEvent<EventBody>[]> = task {
let! _, xs = f
return xs }
let (|MinPosition|) = function
| 0L -> None
Expand All @@ -1550,31 +1545,28 @@ module Events =
/// number of events to read is specified by batchSize
/// Returns an empty sequence if the stream is empty or if the sequence number is larger than the largest
/// sequence number in the stream.
let get (ctx: EventsContext) (streamName: string) (MinPosition index: int64) (maxCount: int): Async<ITimelineEvent<EventBody>[]> = async {
let! ct = Async.CancellationToken
return! ctx.Read(ctx.StreamId streamName, ct, ?position = index, maxCount = maxCount) |> dropPosition }
let get (ctx: EventsContext) (streamName: string) (MinPosition index: int64) (maxCount: int): Async<ITimelineEvent<EventBody>[]> =
Async.call (fun ct -> ctx.Read(ctx.StreamId streamName, ct, ?position = index, maxCount = maxCount) |> dropPosition)

/// Appends a batch of events to a stream at the specified expected sequence number.
/// If the specified expected sequence number does not match the stream, the events are not appended
/// and a failure is returned.
let append (ctx: EventsContext) (streamName: string) (index: int64) (events: IEventData<_>[]): Async<AppendResult<int64>> = async {
let! ct = Async.CancellationToken
return! ctx.Sync(ctx.StreamId streamName, Position.fromI index, events, ct) |> stripSyncResult }
let append (ctx: EventsContext) (streamName: string) (index: int64) (events: IEventData<_>[]): Async<AppendResult<int64>> =
Async.call (fun ct -> ctx.Sync(ctx.StreamId streamName, Position.fromI index, events, ct) |> stripSyncResult)

/// Appends a batch of events to a stream at the the present Position without any conflict checks.
/// NB typically, it is recommended to ensure idempotency of operations by using the `append` and related API as
/// this facilitates ensuring consistency is maintained, and yields reduced latency and Request Charges impacts
/// (See equivalent APIs on `Context` that yield `Position` values)
let appendAtEnd (ctx: EventsContext) (streamName: string) (events: IEventData<_>[]): Async<int64> =
ctx.NonIdempotentAppend(ctx.StreamId streamName, events) |> stripPosition
Async.call (fun ct -> ctx.NonIdempotentAppend(ctx.StreamId streamName, events, ct) |> stripPosition)

/// Requests deletion of events up and including the specified <c>index</c>.
/// Due to the need to preserve ordering of data in the stream, only complete Batches will be removed.
/// If the <c>index</c> is within the Tip, events are removed via an etag-checked update. Does not alter the unfolds held in the Tip, or remove the Tip itself.
/// Returns count of events deleted this time, events that could not be deleted due to partial batches, and the stream's lowest remaining sequence number.
let pruneUntil (ctx: EventsContext) (streamName: string) (index: int64): Async<int * int * int64> = async {
let! ct = Async.CancellationToken
return! ctx.Prune(ctx.StreamId streamName, index, ct) |> Async.AwaitTaskCorrect }
let pruneUntil (ctx: EventsContext) (streamName: string) (index: int64): Async<int * int * int64> =
Async.call (fun ct -> ctx.Prune(ctx.StreamId streamName, index, ct))

/// Returns an async sequence of events in the stream backwards starting from the specified sequence number,
/// reading in batches of the specified size.
Expand All @@ -1588,10 +1580,9 @@ module Events =
/// number of events to read is specified by batchSize
/// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest
/// sequence number in the stream.
let getBackwards (ctx: EventsContext) (streamName: string) (MaxPosition index: int64) (maxCount: int): Async<ITimelineEvent<EventBody>[]> = async {
let! ct = Async.CancellationToken
return! ctx.Read(ctx.StreamId streamName, ct, ?position = index, maxCount = maxCount, direction = Direction.Backward) |> dropPosition }
let getBackwards (ctx: EventsContext) (streamName: string) (MaxPosition index: int64) (maxCount: int): Async<ITimelineEvent<EventBody>[]> =
Async.call (fun ct -> ctx.Read(ctx.StreamId streamName, ct, ?position = index, maxCount = maxCount, direction = Direction.Backward) |> dropPosition)

/// Obtains the `index` from the current write Position
let getNextIndex (ctx: EventsContext) (streamName: string): Async<int64> =
ctx.Sync(ctx.StreamId streamName) |> stripPosition
Async.call (fun ct -> ctx.Sync(ctx.StreamId streamName, ct = ct) |> stripPosition)
Loading

0 comments on commit c454817

Please sign in to comment.