diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a4524558..dfdbe8377 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,7 +40,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338) - `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317) - `Equinox.Tool`/`samples/`: switched to use `Equinox.EventStoreDb` [#196](https://github.com/jet/equinox/pull/196) -- Replace `AsyncSeq` usage with `FSharp.Control.TaskSeq` v `0.4.0` [#361](https://github.com/jet/equinox/pull/361) +- Replace `AsyncSeq` usage with `FSharp.Control.TaskSeq` v `0.4.0` [#361](https://github.com/jet/equinox/pull/361) [#391](https://github.com/jet/equinox/pull/391) - Raise `FSharp.Core` requirement to `6.0.7` [#337](https://github.com/jet/equinox/pull/337) [#33](https://github.com/jet/equinox/pull/362) - Update all Stores to use `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory` and/or `JsonElement` see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323) - Update all non-Client dependencies except `FSharp.Core`, `FSharp.Control.AsyncSeq` [#310](https://github.com/jet/equinox/pull/310) diff --git a/README.md b/README.md index 2eed79113..728d6359d 100644 --- a/README.md +++ b/README.md @@ -78,8 +78,9 @@ _If you're looking to learn more about and/or discuss Event Sourcing and it's my - `Box.Codec`: lightweight [non-serializing substitute equivalent to `NewtonsoftJson.Codec` for use in unit and integration tests](https://github.com/jet/FsCodec#boxcodec) - `Codec`: an explicitly coded pair of `encode` and `tryDecode` functions for when you need to customize - Caching using the .NET `MemoryCache` to: - - Minimize round trips (pluggable via [`ICache`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Cache.fs#L22) :pray: [@DSilence](https://github.com/jet/equinox/pull/161) - - Minimize latency and bandwidth / Request Charges by maintaining the folded state, without making the Domain Model folded state serializable + - Minimize round trips; consistent implementation across stores :pray: [@DSilence](https://github.com/jet/equinox/pull/161) + - Minimize latency and bandwidth / Request Charges by maintaining the folded state, without needing the Domain Model folded state to be serializable + - Enable read through caching, coalescing concurrent reads via opt-in `LoadOption.AllowStale` - Mature and comprehensive logging (using [Serilog](https://github.com/serilog/serilog) internally), with optimal performance and pluggable integration with your apps hosting context (we ourselves typically feed log info to Splunk and the metrics embedded in the `Serilog.Events.LogEvent` Properties to Prometheus; see relevant tests for examples) - OpenTelemetry Integration (presently only implemented in `Equinox.Core` and `Equinox.MessageDb` ... `#help-wanted`) - **`Equinox.EventStore`, `Equinox.SqlStreamStore`: In-stream Rolling Snapshots**: diff --git a/diagrams/container.puml b/diagrams/container.puml index 761d3c4b6..9ebc07210 100644 --- a/diagrams/container.puml +++ b/diagrams/container.puml @@ -38,7 +38,7 @@ frame "Consistent Event Stores" as stores <> { interface IStream <> rectangle "Equinox.Core" as core <> { rectangle "System.MemoryCache" <> - interface ICache <> + interface Cache <> } frame "SqlStreamStore" as ssss <> { rectangle "Equinox.SqlStreamStore.MsSql" <> as ssm @@ -78,13 +78,13 @@ publishers <-- cr : can feed from publishers <-- er : can feed from ms .> IStream : implements -es -> ICache +es -> Cache es .> IStream : implements es -> esc -cs --> ICache +cs --> Cache cs .> IStream : implements cs <-> cc -sss -> ICache +sss -> Cache sss .> IStream : implements ssm -> sss : is a diff --git a/src/Equinox.Core/AsyncCacheCell.fs b/src/Equinox.Core/AsyncCacheCell.fs index 0ef68ebad..7cc1c1f86 100755 --- a/src/Equinox.Core/AsyncCacheCell.fs +++ b/src/Equinox.Core/AsyncCacheCell.fs @@ -1,5 +1,6 @@ namespace Equinox.Core +open System open System.Threading open System.Threading.Tasks @@ -31,7 +32,7 @@ type AsyncLazy<'T>(workflow: unit -> Task<'T>) = member _.Await() = workflow.Value /// Singleton Empty value - static member val Empty = AsyncLazy(fun () -> Task.FromException<'T>(System.InvalidOperationException "Uninitialized AsyncLazy")) + static member val Empty = AsyncLazy(fun () -> Task.FromException<'T>(InvalidOperationException "Uninitialized AsyncLazy")) /// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics. /// If `workflow` fails, all readers entering while the load/refresh is in progress will share the failure diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index d5633329e..f0b5a4a5d 100755 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -3,6 +3,7 @@ namespace Equinox open Equinox.Core open Equinox.Core.Tracing open System +open System.Threading type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, initialTimestamp: int64) = let mutable currentToken = initialToken diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj index 23598df33..0dd09d7f9 100644 --- a/src/Equinox.Core/Equinox.Core.fsproj +++ b/src/Equinox.Core/Equinox.Core.fsproj @@ -8,10 +8,10 @@ + - diff --git a/src/Equinox.Core/Infrastructure.fs b/src/Equinox.Core/Infrastructure.fs index abe5aecbf..0eed4d802 100755 --- a/src/Equinox.Core/Infrastructure.fs +++ b/src/Equinox.Core/Infrastructure.fs @@ -54,6 +54,9 @@ type Async with module Async = let inline startImmediateAsTask ct computation = Async.StartImmediateAsTask(computation, ct) + let inline call (f : System.Threading.CancellationToken -> Task<'T>) = async { + let! ct = Async.CancellationToken + return! f ct |> Async.AwaitTaskCorrect } module ValueTuple = diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index f9663965f..a5f097546 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -523,29 +523,26 @@ module Initialization = let private (|ThroughputProperties|) = function | Throughput.Manual rus -> ThroughputProperties.CreateManualThroughput(rus) | Throughput.Autoscale maxRus -> ThroughputProperties.CreateAutoscaleThroughput(maxRus) - let private createOrProvisionDatabase (client: CosmosClient) dName mode = async { - let! ct = Async.CancellationToken - let createDatabaseIfNotExists maybeTp = async { - let! r = client.CreateDatabaseIfNotExistsAsync(dName, throughputProperties = Option.toObj maybeTp, cancellationToken = ct) |> Async.AwaitTaskCorrect - return r.Database } - match mode with - | Provisioning.Container _ | Provisioning.Serverless -> return! createDatabaseIfNotExists None - | Provisioning.Database (ThroughputProperties tp) -> - let! d = createDatabaseIfNotExists (Some tp) - let! _ = d.ReplaceThroughputAsync(tp, cancellationToken = ct) |> Async.AwaitTaskCorrect + let private createDatabaseIfNotExists (client: CosmosClient) maybeTp dName = async { + let! r = Async.call (fun ct -> client.CreateDatabaseIfNotExistsAsync(dName, throughputProperties = Option.toObj maybeTp, cancellationToken = ct)) + return r.Database } + let private createOrProvisionDatabase (client: CosmosClient) dName = function + | Provisioning.Container _ | Provisioning.Serverless -> createDatabaseIfNotExists client None dName + | Provisioning.Database (ThroughputProperties tp) -> async { + let! d = createDatabaseIfNotExists client (Some tp) dName + let! _ = Async.call (fun ct -> d.ReplaceThroughputAsync(tp, cancellationToken = ct)) return d } - let private createOrProvisionContainer (d: Database) (cName, pkPath, customizeContainer) mode = async { + let private createContainerIfNotExists (d: Database) cp maybeTp = async { + let! r = Async.call (fun ct -> d.CreateContainerIfNotExistsAsync(cp, throughputProperties = Option.toObj maybeTp, cancellationToken = ct)) + return r.Container } + let private createOrProvisionContainer (d: Database) (cName, pkPath, customizeContainer) mode = let cp = ContainerProperties(id = cName, partitionKeyPath = pkPath) customizeContainer cp - let! ct = Async.CancellationToken - let createContainerIfNotExists maybeTp = async { - let! r = d.CreateContainerIfNotExistsAsync(cp, throughputProperties = Option.toObj maybeTp, cancellationToken = ct) |> Async.AwaitTaskCorrect - return r.Container } match mode with - | Provisioning.Database _ | Provisioning.Serverless -> return! createContainerIfNotExists None - | Provisioning.Container (ThroughputProperties throughput) -> - let! c = createContainerIfNotExists (Some throughput) - let! _ = c.ReplaceThroughputAsync(throughput, cancellationToken = ct) |> Async.AwaitTaskCorrect + | Provisioning.Database _ | Provisioning.Serverless -> createContainerIfNotExists d cp None + | Provisioning.Container (ThroughputProperties throughput) -> async { + let! c = createContainerIfNotExists d cp (Some throughput) + let! _ = Async.call (fun ct -> c.ReplaceThroughputAsync(throughput, cancellationToken = ct)) return c } let private createStoredProcIfNotExists (c: Container) (name, body) ct: Task = task { @@ -723,7 +720,7 @@ module internal Query = let! events = batchesBackward |> TaskSeq.collectSeq (fun (events, maybePos, r) -> - if maybeTipPos = None then maybeTipPos <- maybePos + if Option.isNone maybeTipPos then maybeTipPos <- maybePos lastResponse <- Some events; ru <- ru + r responseCount <- responseCount + 1 seq { for x in events -> struct (x, tryDecode x) }) @@ -1176,11 +1173,10 @@ type CosmosClientFactory | Discovery.ConnectionString cs -> new CosmosClient(cs, x.Options) /// Creates and validates a Client [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers - member x.CreateAndInitialize(discovery: Discovery, containers) = async { - let! ct = Async.CancellationToken + member x.CreateAndInitialize(discovery: Discovery, containers) = match discovery with - | Discovery.AccountUriAndKey (accountUri = uri; key = key) -> return! CosmosClient.CreateAndInitializeAsync(string uri, key, containers, x.Options, ct) |> Async.AwaitTaskCorrect - | Discovery.ConnectionString cs -> return! CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct) |> Async.AwaitTaskCorrect } + | Discovery.AccountUriAndKey (accountUri = uri; key = key) -> Async.call (fun ct -> CosmosClient.CreateAndInitializeAsync(string uri, key, containers, x.Options, ct)) + | Discovery.ConnectionString cs -> Async.call (fun ct -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct)) /// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container. type CosmosStoreConnector @@ -1404,6 +1400,7 @@ type CosmosStoreCategory<'event, 'state, 'context> internal (resolveInner, empty namespace Equinox.CosmosStore.Core open System.Collections.Generic +open System.Threading open System.Threading.Tasks open Equinox.Core open FsCodec @@ -1470,9 +1467,8 @@ type EventsContext internal /// Establishes the current position of the stream in as efficient a manner as possible /// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency validation in the case of an unchanged Tip) - member _.Sync(stream, [] ?position: Position): Async = async { - let! ct = Async.CancellationToken - let! Token.Unpack pos' = store.GetPosition(log, stream, ct, ?pos = position) |> Async.AwaitTaskCorrect + member _.Sync(stream, ct, [] ?position: Position): Task = task { + let! Token.Unpack pos' = store.GetPosition(log, stream, ct, ?pos = position) return pos' } /// Query (with MaxItems set to `queryMaxItems`) from the specified `Position`, allowing the reader to efficiently walk away from a running query @@ -1501,9 +1497,8 @@ type EventsContext internal /// Low level, non-idempotent call appending events to a stream without a concurrency control mechanism in play /// NB Should be used sparingly; Equinox.Decider enables building equivalent equivalent idempotent handling with minimal code. - member x.NonIdempotentAppend(stream, events: IEventData<_>[]): Async = async { - let! ct = Async.CancellationToken - match! x.Sync(stream, Position.fromAppendAtEnd, events, ct) |> Async.AwaitTaskCorrect with + member x.NonIdempotentAppend(stream, events: IEventData<_>[], ct): Task = task { + match! x.Sync(stream, Position.fromAppendAtEnd, events, ct) with | AppendResult.Ok token -> return token | x -> return x |> sprintf "Conflict despite it being disabled %A" |> invalidOp } @@ -1520,16 +1515,16 @@ type EventData() = module Events = let private (|PositionIndex|) (x: Position) = x.index - let private stripSyncResult (f: Task>): Async> = async { - match! f |> Async.AwaitTaskCorrect with - | AppendResult.Ok (PositionIndex index)-> return AppendResult.Ok index + let private stripSyncResult (f: Task>): Task> = task { + match! f with + | AppendResult.Ok (PositionIndex index) -> return AppendResult.Ok index | AppendResult.Conflict (PositionIndex index, events) -> return AppendResult.Conflict (index, events) | AppendResult.ConflictUnknown (PositionIndex index) -> return AppendResult.ConflictUnknown index } - let private stripPosition (f: Async): Async = async { + let private stripPosition (f: Task): Task = task { let! (PositionIndex index) = f return index } - let private dropPosition (f: Task[]>): Async[]> = async { - let! _, xs = f |> Async.AwaitTaskCorrect + let private dropPosition (f: Task[]>): Task[]> = task { + let! _, xs = f return xs } let (|MinPosition|) = function | 0L -> None @@ -1550,31 +1545,28 @@ module Events = /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is larger than the largest /// sequence number in the stream. - let get (ctx: EventsContext) (streamName: string) (MinPosition index: int64) (maxCount: int): Async[]> = async { - let! ct = Async.CancellationToken - return! ctx.Read(ctx.StreamId streamName, ct, ?position = index, maxCount = maxCount) |> dropPosition } + let get (ctx: EventsContext) (streamName: string) (MinPosition index: int64) (maxCount: int): Async[]> = + Async.call (fun ct -> ctx.Read(ctx.StreamId streamName, ct, ?position = index, maxCount = maxCount) |> dropPosition) /// Appends a batch of events to a stream at the specified expected sequence number. /// If the specified expected sequence number does not match the stream, the events are not appended /// and a failure is returned. - let append (ctx: EventsContext) (streamName: string) (index: int64) (events: IEventData<_>[]): Async> = async { - let! ct = Async.CancellationToken - return! ctx.Sync(ctx.StreamId streamName, Position.fromI index, events, ct) |> stripSyncResult } + let append (ctx: EventsContext) (streamName: string) (index: int64) (events: IEventData<_>[]): Async> = + Async.call (fun ct -> ctx.Sync(ctx.StreamId streamName, Position.fromI index, events, ct) |> stripSyncResult) /// Appends a batch of events to a stream at the the present Position without any conflict checks. /// NB typically, it is recommended to ensure idempotency of operations by using the `append` and related API as /// this facilitates ensuring consistency is maintained, and yields reduced latency and Request Charges impacts /// (See equivalent APIs on `Context` that yield `Position` values) let appendAtEnd (ctx: EventsContext) (streamName: string) (events: IEventData<_>[]): Async = - ctx.NonIdempotentAppend(ctx.StreamId streamName, events) |> stripPosition + Async.call (fun ct -> ctx.NonIdempotentAppend(ctx.StreamId streamName, events, ct) |> stripPosition) /// Requests deletion of events up and including the specified index. /// Due to the need to preserve ordering of data in the stream, only complete Batches will be removed. /// If the index is within the Tip, events are removed via an etag-checked update. Does not alter the unfolds held in the Tip, or remove the Tip itself. /// Returns count of events deleted this time, events that could not be deleted due to partial batches, and the stream's lowest remaining sequence number. - let pruneUntil (ctx: EventsContext) (streamName: string) (index: int64): Async = async { - let! ct = Async.CancellationToken - return! ctx.Prune(ctx.StreamId streamName, index, ct) |> Async.AwaitTaskCorrect } + let pruneUntil (ctx: EventsContext) (streamName: string) (index: int64): Async = + Async.call (fun ct -> ctx.Prune(ctx.StreamId streamName, index, ct)) /// Returns an async sequence of events in the stream backwards starting from the specified sequence number, /// reading in batches of the specified size. @@ -1588,10 +1580,9 @@ module Events = /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest /// sequence number in the stream. - let getBackwards (ctx: EventsContext) (streamName: string) (MaxPosition index: int64) (maxCount: int): Async[]> = async { - let! ct = Async.CancellationToken - return! ctx.Read(ctx.StreamId streamName, ct, ?position = index, maxCount = maxCount, direction = Direction.Backward) |> dropPosition } + let getBackwards (ctx: EventsContext) (streamName: string) (MaxPosition index: int64) (maxCount: int): Async[]> = + Async.call (fun ct -> ctx.Read(ctx.StreamId streamName, ct, ?position = index, maxCount = maxCount, direction = Direction.Backward) |> dropPosition) /// Obtains the `index` from the current write Position let getNextIndex (ctx: EventsContext) (streamName: string): Async = - ctx.Sync(ctx.StreamId streamName) |> stripPosition + Async.call (fun ct -> ctx.Sync(ctx.StreamId streamName, ct = ct) |> stripPosition) diff --git a/src/Equinox.DynamoStore/DynamoStore.fs b/src/Equinox.DynamoStore/DynamoStore.fs index 7560f6fe6..704f90398 100644 --- a/src/Equinox.DynamoStore/DynamoStore.fs +++ b/src/Equinox.DynamoStore/DynamoStore.fs @@ -1454,8 +1454,8 @@ module Events = match! f with | AppendResult.Ok (PositionIndex index)-> return AppendResult.Ok index | AppendResult.ConflictUnknown -> return AppendResult.ConflictUnknown } - let private stripPosition (f: Task): Async = async { - let! (PositionIndex index) = f |> Async.AwaitTaskCorrect + let private stripPosition (f: Async): Async = async { + let! (PositionIndex index) = f return index } /// Returns an async sequence of events in the stream starting at the specified sequence number, @@ -1469,9 +1469,8 @@ module Events = /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is larger than the largest /// sequence number in the stream. - let get (ctx: EventsContext) (streamName: string) (index: int64) (maxCount: int): Async[]> = async { - let! ct = Async.CancellationToken - return! ctx.Read(ctx.StreamName streamName, ct, ?minIndex = (if index = 0 then None else Some index), maxCount = maxCount) |> Async.AwaitTaskCorrect } + let get (ctx: EventsContext) (streamName: string) (index: int64) (maxCount: int): Async[]> = Async.call <| fun ct -> + ctx.Read(ctx.StreamName streamName, ct, ?minIndex = (if index = 0 then None else Some index), maxCount = maxCount) #if APPEND_SUPPORT /// Appends a batch of events to a stream at the specified expected sequence number. @@ -1485,9 +1484,8 @@ module Events = /// Due to the need to preserve ordering of data in the stream, only complete Batches will be removed. /// If the index is within the Tip, events are removed via an etag-checked update. Does not alter the unfolds held in the Tip, or remove the Tip itself. /// Returns count of events deleted this time, events that could not be deleted due to partial batches, and the stream's lowest remaining sequence number. - let pruneUntil (ctx: EventsContext) (streamName: string) (index: int64): Async = async { - let! ct = Async.CancellationToken - return! ctx.Prune(ctx.StreamName streamName, index, ct) |> Async.AwaitTaskCorrect } + let pruneUntil (ctx: EventsContext) (streamName: string) (index: int64): Async = Async.call <| fun ct -> + ctx.Prune(ctx.StreamName streamName, index, ct) /// Returns an async sequence of events in the stream backwards starting from the specified sequence number, /// reading in batches of the specified size. @@ -1500,11 +1498,10 @@ module Events = /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest /// sequence number in the stream. - let getBackwards (ctx: EventsContext) (streamName: string) (index: int64) (maxCount: int): Async[]> = async { - let! ct = Async.CancellationToken - return! ctx.Read(ctx.StreamName streamName, ct, ?maxIndex = (match index with int64.MaxValue -> None | i -> Some (i + 1L)), maxCount = maxCount, direction = Direction.Backward) |> Async.AwaitTaskCorrect } + let getBackwards (ctx: EventsContext) (streamName: string) (index: int64) (maxCount: int): Async[]> = Async.call <| fun ct -> + ctx.Read(ctx.StreamName streamName, ct, ?maxIndex = (match index with int64.MaxValue -> None | i -> Some (i + 1L)), maxCount = maxCount, direction = Direction.Backward) /// Obtains the `index` from the current write Position - let getNextIndex (ctx: EventsContext) (streamName: string): Async = async { - let! ct = Async.CancellationToken - return! ctx.Sync(ctx.StreamName streamName, ct) |> stripPosition } + let getNextIndex (ctx: EventsContext) (streamName: string): Async = + Async.call (fun ct -> ctx.Sync(ctx.StreamName streamName, ct)) + |> stripPosition diff --git a/src/Equinox.EventStoreDb/EventStoreDb.fs b/src/Equinox.EventStoreDb/EventStoreDb.fs index 9b72e5492..3bbc8490f 100644 --- a/src/Equinox.EventStoreDb/EventStoreDb.fs +++ b/src/Equinox.EventStoreDb/EventStoreDb.fs @@ -1,10 +1,10 @@ namespace Equinox.EventStoreDb -open System.Threading open Equinox.Core open EventStore.Client open Serilog open System +open System.Threading open System.Threading.Tasks type EventBody = ReadOnlyMemory @@ -227,7 +227,7 @@ module ClientCodec = let timelineEvent (x: EventRecord): FsCodec.ITimelineEvent = // TOCONSIDER wire e.Metadata["$correlationId"] and ["$causationId"] into correlationId and causationId - // https://eventstore.org/docs/server/metadata-and-reserved-names/index.html#event-metadata + // https://developers.eventstore.com/server/v21.10/streams.html#reserved-names let n, eu, ts = x.EventNumber, x.EventId, DateTimeOffset x.Created let et, data, meta = x.EventType, x.Data, x.Metadata let size = et.Length + data.Length + meta.Length @@ -237,7 +237,7 @@ module ClientCodec = let eventData (x: FsCodec.IEventData) = // TOCONSIDER wire x.CorrelationId, x.CausationId into x.Meta.["$correlationId"] and .["$causationId"] - // https://eventstore.org/docs/server/metadata-and-reserved-names/index.html#event-metadata + // https://developers.eventstore.com/server/v21.10/streams.html#reserved-names EventData(Uuid.FromGuid x.EventId, x.EventType, contentType = "application/json", data = x.Data, metadata = x.Meta) type Position = { streamVersion: int64; compactionEventNumber: int64 option; batchCapacityLimit: int option } diff --git a/src/Equinox.MessageDb/MessageDb.fs b/src/Equinox.MessageDb/MessageDb.fs index c52e153cd..ba115190b 100644 --- a/src/Equinox.MessageDb/MessageDb.fs +++ b/src/Equinox.MessageDb/MessageDb.fs @@ -208,7 +208,7 @@ module Read = | _ -> () let batchLog = log |> Log.prop "batchIndex" batchCount - let! slice = readSlice pos batchCount batchLog ct |> Async.AwaitTaskCorrect + let! slice = readSlice pos batchCount batchLog ct version <- max version slice.LastVersion result.AddRange(slice.Messages) if not slice.IsEnd then diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 7970a32db..0f30ef69f 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -15,6 +15,12 @@ type IEventStoreConnection = IStreamStore type ResolvedEvent = StreamMessage type StreamEventsSlice = ReadStreamPage +[] +module private Shims = + + type StreamMessage with + member x.JsonData = x.GetJsonData() |> Async.AwaitTaskCorrect |> Async.RunSynchronously + [] type Direction = Forward | Backward with override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward" @@ -50,8 +56,7 @@ module Log = let propResolvedEvents name (events: ResolvedEvent[]) (log: ILogger) = log |> propEvents name (seq { for x in events do - let data = x.GetJsonData() |> Async.AwaitTaskCorrect |> Async.RunSynchronously - yield System.Collections.Generic.KeyValuePair<_, _>(x.Type, data) }) + yield System.Collections.Generic.KeyValuePair<_, _>(x.Type, x.JsonData) }) let withLoggedRetries<'t> retryPolicy (contextLabel: string) (f: ILogger -> CancellationToken -> Task<'t>) log ct: Task<'t> = match retryPolicy with @@ -172,9 +177,7 @@ module private Read = match direction with | Direction.Forward -> conn.ReadStreamForwards(streamName, int startPos, batchSize, ct) | Direction.Backward -> conn.ReadStreamBackwards(streamName, int startPos, batchSize, ct) - let (|ResolvedEventLen|) (x: StreamMessage) = - let data = x.GetJsonData() |> Async.AwaitTaskCorrect |> Async.RunSynchronously - match data, x.JsonMetadata with Log.StrLen bytes, Log.StrLen metaBytes -> bytes + metaBytes + let (|ResolvedEventLen|) (x: StreamMessage) = match x.JsonData, x.JsonMetadata with Log.StrLen bytes, Log.StrLen metaBytes -> bytes + metaBytes let private loggedReadSlice conn streamName direction batchSize startPos (log: ILogger) ct: Task = task { let! t, slice = readSliceAsync conn streamName direction batchSize startPos |> Stopwatch.time ct let bytes, count = slice.Messages |> Array.sumBy (|ResolvedEventLen|), slice.Messages.Length @@ -274,8 +277,7 @@ module UnionEncoderAdapters = let (|Bytes|) = function null -> null | (s: string) -> System.Text.Encoding.UTF8.GetBytes s let encodedEventOfResolvedEvent (e: StreamMessage): FsCodec.ITimelineEvent = - let (Bytes data) = e.GetJsonData() |> Async.AwaitTaskCorrect |> Async.RunSynchronously - let (Bytes meta) = e.JsonMetadata + let Bytes data, Bytes meta = e.JsonData, e.JsonMetadata let ts = e.CreatedUtc |> DateTimeOffset let inline len (xs: byte[]) = if xs = null then 0 else xs.Length let size = len data + len meta + e.Type.Length @@ -345,9 +347,7 @@ type GatewaySyncResult = Written of StreamToken | ConflictUnknown type SqlStreamStoreContext(connection: SqlStreamStoreConnection, batchOptions: BatchOptions) = - let isResolvedEventEventType (tryDecode, predicate) (e: StreamMessage) = - let data = e.GetJsonData() |> Async.AwaitTaskCorrect |> Async.RunSynchronously - predicate (tryDecode data) + let isResolvedEventEventType (tryDecode, predicate) (e: StreamMessage) = predicate (tryDecode e.JsonData) let tryIsResolvedEventEventType predicateOption = predicateOption |> Option.map isResolvedEventEventType let conn requireLeader = if requireLeader then connection.WriteConnection else connection.ReadConnection diff --git a/src/Equinox/Decider.fs b/src/Equinox/Decider.fs index 6dcd04175..c620343df 100755 --- a/src/Equinox/Decider.fs +++ b/src/Equinox/Decider.fs @@ -15,101 +15,89 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) = /// 1. Invoke the supplied interpret function with the present state to determine whether any write is to occur. /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) - member _.Transact(interpret: 'state -> 'event list, ?load, ?attempts): Async = async { - let! ct = Async.CancellationToken - return! inner.Transact(interpret >> Seq.ofList, ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + member _.Transact(interpret: 'state -> 'event list, ?load, ?attempts): Async = Async.call <| fun ct -> + inner.Transact(interpret >> Seq.ofList, ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied interpret function with the present state /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Uses render to generate a 'view from the persisted final state - member _.Transact(interpret: 'state -> 'event list, render: 'state -> 'view, ?load, ?attempts): Async<'view> = async { - let! ct = Async.CancellationToken - return! inner.Transact(interpret >> Seq.ofList, render, ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + member _.Transact(interpret: 'state -> 'event list, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> + inner.Transact(interpret >> Seq.ofList, render, ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied decide function with the present state, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yield result - member _.Transact(decide: 'state -> 'result * 'event list, ?load, ?attempts): Async<'result> = async { - let! ct = Async.CancellationToken + member _.Transact(decide: 'state -> 'result * 'event list, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> let inline decide' s = let r, es = decide s in struct (r, Seq.ofList es) - return! inner.Transact(decide', ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + inner.Transact(decide', ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied decide function with the present state, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yields a final 'view produced by mapResult from the 'result and/or the final persisted 'state - member _.Transact(decide: 'state -> 'result * 'event list, mapResult: 'result -> 'state -> 'view, ?load, ?attempts): Async<'view> = async { - let! ct = Async.CancellationToken + member _.Transact(decide: 'state -> 'result * 'event list, mapResult: 'result -> 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> let inline decide' s = let r, es = decide s in struct (r, Seq.ofList es) - return! inner.Transact(decide', mapResult, ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + inner.Transact(decide', mapResult, ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yields result - member _.TransactEx(decide: ISyncContext<'state> -> 'result * 'event list, ?load, ?attempts): Async<'result> = async { - let! ct = Async.CancellationToken + member _.TransactEx(decide: ISyncContext<'state> -> 'result * 'event list, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> let inline decide' c = let r, es = decide c in struct (r, Seq.ofList es) - return! inner.TransactEx(decide = decide', ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + inner.TransactEx(decide = decide', ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yields a final 'view produced by mapResult from the 'result and/or the final persisted ISyncContext member _.TransactEx(decide: ISyncContext<'state> -> 'result * 'event list, mapResult: 'result -> ISyncContext<'state> -> 'view, - ?load, ?attempts): Async<'view> = async { - let! ct = Async.CancellationToken + ?load, ?attempts): Async<'view> = Async.call <| fun ct -> let inline decide' c = let r, es = decide c in struct (r, Seq.ofList es) - return! inner.TransactEx(decide = decide', mapResult = mapResult, ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + inner.TransactEx(decide = decide', mapResult = mapResult, ?load = load, ?attempts = attempts, ct = ct) /// Project from the folded 'state, but without executing a decision flow as Transact does - member _.Query(render: 'state -> 'view, ?load): Async<'view> = async { - let! ct = Async.CancellationToken - return! inner.Query(render, ?load = load, ct = ct) |> Async.AwaitTaskCorrect } + member _.Query(render: 'state -> 'view, ?load): Async<'view> = Async.call <| fun ct -> + inner.Query(render, ?load = load, ct = ct) /// Project from the stream's complete context, but without executing a decision flow as TransactEx does - member _.QueryEx(render: ISyncContext<'state> -> 'view, ?load): Async<'view> = async { - let! ct = Async.CancellationToken - return! inner.QueryEx(render, ?load = load, ct = ct) |> Async.AwaitTaskCorrect } + member _.QueryEx(render: ISyncContext<'state> -> 'view, ?load): Async<'view> = Async.call <| fun ct -> + inner.QueryEx(render, ?load = load, ct = ct) /// 1. Invoke the supplied Async interpret function with the present state /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Uses render to generate a 'view from the persisted final state - member _.TransactAsync(interpret: 'state -> Async<'event list>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = async { - let! ct = Async.CancellationToken + member _.TransactAsync(interpret: 'state -> Async<'event list>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> let inline interpret' s ct = task { let! es = Async.StartImmediateAsTask(interpret s, ct) in return Seq.ofList es } - return! inner.TransactAsync(interpret', render, ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + inner.TransactAsync(interpret', render, ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied Async decide function with the present state, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yield result - member _.TransactAsync(decide: 'state -> Async<'result * 'event list>, ?load, ?attempts): Async<'result> = async { - let! ct = Async.CancellationToken + member _.TransactAsync(decide: 'state -> Async<'result * 'event list>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> let inline decide' s ct = task { let! r, es = Async.StartImmediateAsTask(decide s, ct) in return struct (r, Seq.ofList es) } - return! inner.TransactAsync(decide = decide', ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + inner.TransactAsync(decide = decide', ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yield result - member _.TransactExAsync(decide: ISyncContext<'state> -> Async<'result * 'event list>, ?load, ?attempts): Async<'result> = async { - let! ct = Async.CancellationToken + member _.TransactExAsync(decide: ISyncContext<'state> -> Async<'result * 'event list>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> let decide' c ct = task { let! r, es = Async.StartImmediateAsTask(decide c, ct) in return struct (r, Seq.ofList es) } - return! inner.TransactExAsync(decide = decide', ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + inner.TransactExAsync(decide = decide', ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yields a final 'view produced by mapResult from the 'result and/or the final persisted ISyncContext member _.TransactExAsync(decide: ISyncContext<'state> -> Async<'result * 'event list>, mapResult: 'result -> ISyncContext<'state> -> 'view, - ?load, ?attempts): Async<'view> = async { - let! ct = Async.CancellationToken + ?load, ?attempts): Async<'view> = Async.call <| fun ct -> let inline decide' c ct = task { let! r, es = Async.StartImmediateAsTask(decide c, ct) in return struct (r, Seq.ofList es) } - return! inner.TransactExAsync(decide = decide', mapResult = mapResult, ?load = load, ?attempts = attempts, ct = ct) |> Async.AwaitTaskCorrect } + inner.TransactExAsync(decide = decide', mapResult = mapResult, ?load = load, ?attempts = attempts, ct = ct) /// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic /// For F#, the async and FSharpFunc signatures in Decider tend to work better, but the API set is equivalent diff --git a/tests/Equinox.Core.Tests/AsyncBatchingGateTests.fs b/tests/Equinox.Core.Tests/AsyncBatchingGateTests.fs index a6fcee647..7dbf5df8c 100644 --- a/tests/Equinox.Core.Tests/AsyncBatchingGateTests.fs +++ b/tests/Equinox.Core.Tests/AsyncBatchingGateTests.fs @@ -21,11 +21,11 @@ let ``AsyncBatchingGate correctness`` () = async { 0 =! concurrency return reqs } - let cell = AsyncBatchingGate(dispatch, linger = TimeSpan.FromMilliseconds 5) + let cell = AsyncBatchingGate(dispatch, linger = TimeSpan.FromMilliseconds 40) let! results = [1 .. 100] |> Seq.map cell.Execute |> Async.Parallel test <@ set (Seq.collect id results) = set [1 .. 100] @> - // Linger of 5ms makes this tend strongly to only be 1 batch - test <@ batches < 2 @> + // Linger of 40ms makes this tend strongly to only be 1 batch + 1 =! batches } [] diff --git a/tests/Equinox.Core.Tests/CachingTests.fs b/tests/Equinox.Core.Tests/CachingTests.fs index 4674b7273..ad588757e 100644 --- a/tests/Equinox.Core.Tests/CachingTests.fs +++ b/tests/Equinox.Core.Tests/CachingTests.fs @@ -28,8 +28,7 @@ type SpyCategory() = member x.Load(_log, _cat, _sid, _sn, _maxAge, _requireLeader, ct) = task { Interlocked.Increment &loads |> ignore do! Task.Delay(x.Delay, ct) - Interlocked.Increment &state |> ignore - return struct (mkToken(), state) + return struct (mkToken(), Interlocked.Increment &state) } member _.TrySync(_log, _cat, _sid, _sn, _ctx, _maybeInit, _originToken, originState, events, _ct) = task { return Equinox.Core.SyncResult.Written (mkToken(), originState + events.Length) @@ -39,8 +38,7 @@ type SpyCategory() = member x.Reload(_log, _sn, _requireLeader, _streamToken, _baseState, ct) = task { Interlocked.Increment &reloads |> ignore do! Task.Delay(x.Delay, ct) - Interlocked.Increment &state |> ignore - return struct (mkToken(), state) + return struct (mkToken(), Interlocked.Increment &state) } let load sn maxAge (sut: Equinox.Core.ICategory<_, _, _>) = @@ -75,10 +73,10 @@ type Tests() = let sn = Guid.NewGuid |> string let requireLoad () = load sn TimeSpan.Zero sut - let allowStale () = load sn TimeSpan.MaxValue sut + let anyCachedValue () = load sn TimeSpan.MaxValue sut let write () = write sn sut - let [] ``requireLoad vs allowStale`` () = task { + let [] ``requireLoad vs anyCachedValue`` () = task { let! struct (_token, state) = requireLoad () test <@ (1, 1, 0) = (state, cat.Loads, cat.Reloads) @> let! struct (_token, state) = requireLoad () // This time, the cache entry should be used as the base state for the load @@ -100,114 +98,116 @@ type Tests() = do! write () // a stale read sees the written value - let! struct (_token, state) = allowStale () + let! struct (_token, state) = anyCachedValue () test <@ (expectedWriteState, 1, 4) = (state, cat.Loads, cat.Reloads) @> // a raw read updates the cache let! struct (_token, state) = requireLoad () test <@ (6, 1, 5) = (state, cat.Loads, cat.Reloads) @> // and the stale read sees same directly - let! struct (_token, state) = allowStale () + let! struct (_token, state) = anyCachedValue () test <@ (6, 1, 5) = (state, cat.Loads, cat.Reloads) @> } let [] ``requireLoad does not unify loads`` () = task { - cat.Delay <- TimeSpan.FromMilliseconds 50 + cat.Delay <- TimeSpan.FromMilliseconds 20 let t1 = requireLoad () do! Task.Delay 10 test <@ (1, 0) = (cat.Loads, cat.Reloads) @> - do! Task.Delay 60 // wait for the loaded value to get cached (50 should do, but MacOS CI...) + do! Task.Delay 60 // wait for the loaded value to get cached (35 should do, but MacOS CI cold start disagrees...) let! struct (_token, state) = requireLoad () test <@ 2 = state && (1, 1) = (cat.Loads, cat.Reloads) @> let! struct (_token, state) = t1 test <@ 1 = state && (1, 1) = (cat.Loads, cat.Reloads) @> } - let loadReadThrough toleranceMs = load sn (TimeSpan.FromMilliseconds toleranceMs) sut + let allowStale toleranceMs = load sn (TimeSpan.FromMilliseconds toleranceMs) sut - let [] ``readThrough unifies compatible concurrent loads`` () = task { + let [] ``allowStale unifies compatible concurrent loads`` () = task { cat.Delay <- TimeSpan.FromMilliseconds 50 - let t1 = loadReadThrough 1 + let t1 = allowStale 1 do! Task.Delay 10 test <@ (1, 0) = (cat.Loads, cat.Reloads) @> - let! struct (_token, state) = loadReadThrough 200 + let! struct (_token, state) = allowStale 200 test <@ (1, 1, 0) = (state, cat.Loads, cat.Reloads) @> let! struct (_token, state) = t1 test <@ (1, 1, 0) = (state, cat.Loads, cat.Reloads) @> } - let [] ``readThrough handles concurrent incompatible loads correctly`` () = task { + let [] ``allowStale handles concurrent incompatible loads correctly`` () = task { cat.Delay <- TimeSpan.FromMilliseconds 50 - let t1 = loadReadThrough 1 - do! Task.Delay 5 - test <@ (1, 0) = (cat.Loads, cat.Reloads) @> - let! struct (_token, state) = loadReadThrough 4 - test <@ (2, 2, 0) = (state, cat.Loads, cat.Reloads) @> + let t1 = allowStale 1 + do! Task.Delay 20 // Give the load a chance to start + let t2 = allowStale 1 // any cached value should be at least 50 old (and the overlapping call should not have started 45 late) + let! struct (_token, state) = t2 + test <@ (2, 2) = (state, cat.Loads + cat.Reloads) @> let! struct (_token, state) = t1 - test <@ (1, 2, 0) = (state, cat.Loads, cat.Reloads) @> } + test <@ (1, 2) = (state, cat.Loads + cat.Reloads) @> } - let [] ``readThrough handles overlapped incompatible loads correctly`` () = task { + let [] ``allowStale handles overlapped incompatible loads correctly`` () = task { cat.Delay <- TimeSpan.FromMilliseconds 50 - let t1 = loadReadThrough 1 - do! Task.Delay 10 + let t1 = allowStale 1 + do! Task.Delay 20 test <@ (1, 0) = (cat.Loads, cat.Reloads) @> do! Task.Delay 50 - let! struct (_token, state) = loadReadThrough 59 + let! struct (_token, state) = allowStale 79 test <@ (2, 1, 1) = (state, cat.Loads, cat.Reloads) @> let! struct (_token, state) = t1 test <@ (1, 1, 1) = (state, cat.Loads, cat.Reloads) @> } - let [] ``readThrough scenarios`` () = task { + let [] ``readThrough scenarios`` () = task { let! struct (_token, state) = requireLoad () test <@ (1, 1, 0) = (state, cat.Loads, cat.Reloads) @> - let! struct (_token, state) = loadReadThrough 1000 // Existing cached entry is used, as fresh enough + let! struct (_token, state) = allowStale 1000 // Existing cached entry is used, as fresh enough test <@ (1, 1, 0) = (state, cat.Loads, cat.Reloads) @> - do! Task.Delay 100 - let! struct (_token, state) = loadReadThrough 1000 // Does not load, or extend lifetime + do! Task.Delay 50 + let! struct (_token, state) = allowStale 1000 // Does not load, or extend lifetime test <@ (1, 1, 0) = (state, cat.Loads, cat.Reloads) @> - let! struct (_token, state) = loadReadThrough 50 // Triggers reload as delay of 100 above has rendered entry stale + let! struct (_token, state) = allowStale 50 // Triggers reload as delay of 50 above has rendered entry stale test <@ (2, 1, 1) = (state, cat.Loads, cat.Reloads) @> + cat.Delay <- TimeSpan.FromMilliseconds 50 - let t3 = requireLoad () - do! Task.Delay 2 // Make the main read enter a delay state (of 500); ensure readThrough values are expired + let load1 = requireLoad () + do! Task.Delay 10 // Make the load1 read enter a delay state (of 50) cat.Delay <- TimeSpan.FromMilliseconds 75 // Next read picks up the longer delay // These reads start after the first read so replace the older value in the cache - let t1 = loadReadThrough 1 - let t2 = loadReadThrough 1 // NB this read overlaps with t1 task, ReadThrough should coalesce - let! struct (_t2, r2) = t2 // started last, awaited first - should be same result as r1 (and should be the winning cache entry) - let! struct (_t1, r1) = t1 // started first, but should be same as r2 - let! struct (_t3, r3) = t3 // We awaited it last, but it returned a result first - test <@ 3 = r3 && 4 = r1 && (1, 3) = (cat.Loads, cat.Reloads) && r1 = r2 @> - let! struct (_token, state) = loadReadThrough 150 // Delay of 75 overlapped with delay of 50 should not have expired the entry + let load2 = allowStale 1 // NB this read overlaps with load1 task, ReadThrough should coalesce with next ... + let load3 = allowStale 10 // ... should wind up internally sharing with load2 (despite taking 75, it's valid if it starts within 10) + let! struct (_t1, r1) = load2 // should be reused by load3 (and should be the winning cache entry) + let! struct (_t2, r2) = load3 // requested last - should be same result as r1 + let! struct (_t3, r3) = load1 // We awaited it last, but expect it to have completed first + test <@ (4, 4, 3) = (r1, r2, r3) && (1, 3) = (cat.Loads, cat.Reloads) @> + // NOTE While 90 should be fine in next statement, it's not fine on a cold CI rig, don't adjust! + let! struct (_token, state) = allowStale 200 // Delay of 75 overlapped with delay of 50+10 should not have expired the entry test <@ (4, 1, 3) = (state, cat.Loads, cat.Reloads) @> // The newer cache entry won cat.Delay <- TimeSpan.FromMilliseconds 10 // Reduce the delay, but we do want to overlap a write - let t4 = loadReadThrough 1000 // Delay of 1000 in t1/t2 should have aged the read result, so should trigger a read - do! Task.Delay 2 + let t4 = allowStale 200 // Delay of 75 in load2/load3 should not have aged the read result beyond 200 + do! Task.Delay 10 // ensure delay has been picked up, before... cat.Delay <- TimeSpan.Zero // no further delays required for the rest of the tests // Trigger a concurrent write (it should lose the cache update race) do! write () let! struct (_token, state) = t4 - test <@ (4, 1, 3) = (state, cat.Loads, cat.Reloads) @> + test <@ (4, 1, 3) = (state, cat.Loads, cat.Reloads) @> // read outcome is cached (does not see overlapping write) // a raw read updates the cache, even though it's very fresh let! struct (_token, state) = requireLoad () test <@ (5, 1, 4) = (state, cat.Loads, cat.Reloads) @> // a readThrough re-uses that - let! struct (_token, state) = loadReadThrough 10 + let! struct (_token, state) = allowStale 10 test <@ (5, 1, 4) = (state, cat.Loads, cat.Reloads) @> - // allowStale is just a special case of read through in this implementation + // anyCachedValue is just a special case of read through in this implementation // ... so it works the same - let! struct (_token, state) = allowStale () + let! struct (_token, state) = anyCachedValue () test <@ (5, 1, 4) = (state, cat.Loads, cat.Reloads) @> // a write overwrites it (with an older value because our predicate is broken) do! write () - // a readThrough sees the written value - let! struct (_token, state) = loadReadThrough 10 + // an allowStale sees the written value + let! struct (_token, state) = allowStale 30 test <@ (expectedWriteState, 1, 4) = (state, cat.Loads, cat.Reloads) @> // a raw read updates the cache let! struct (_token, state) = requireLoad () test <@ (6, 1, 5) = (state, cat.Loads, cat.Reloads) @> - // and the readThrough / allowStale sees same - let! struct (_token, state) = loadReadThrough 10 + // and the allowStale / anyCachedValue sees same + let! struct (_token, state) = allowStale 10 test <@ (6, 1, 5) = (state, cat.Loads, cat.Reloads) @> - let! struct (_token, state) = allowStale () + let! struct (_token, state) = anyCachedValue () test <@ (6, 1, 5) = (state, cat.Loads, cat.Reloads) @> } diff --git a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs index 58318ffd0..e4381695f 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs @@ -1,6 +1,6 @@ module Equinox.CosmosStore.Integration.CosmosCoreIntegration -open Equinox.Core // TaskSeq extensions +open Equinox.Core // Async extensions open Equinox.CosmosStore.Core open FsCodec open FSharp.Control @@ -146,18 +146,18 @@ type Tests(testOutputHelper) = let stream = ctx.StreamId streamName let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100 - let! _pos = ctx.NonIdempotentAppend(stream, TestEvents.Create (int pos,extrasCount)) + let! _pos = Async.call (fun ct -> ctx.NonIdempotentAppend(stream, TestEvents.Create (int pos,extrasCount), ct)) test <@ [EqxAct.Append] = capture.ExternalCalls @> if eventsInTip then verifyRequestChargesMax 451 // 450.03 else verifyRequestChargesMax 448 // 447.5 // 463.01 observed capture.Clear() - let! pos = ctx.Sync(stream, ?position = None) + let! pos = Async.call (fun ct -> ctx.Sync(stream, ct, ?position = None)) test <@ [EqxAct.Tip] = capture.ExternalCalls @> verifyRequestChargesMax 5 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded) capture.Clear() - let! _pos = ctx.Sync(stream, pos) + let! _pos = Async.call (fun ct -> ctx.Sync(stream, ct, pos)) test <@ [EqxAct.TipNotModified] = capture.ExternalCalls @> verifyRequestChargesMax 1 // for a 304 by definition - when an etag IfNotMatch is honored, you only pay one RU }