diff --git a/CHANGELOG.md b/CHANGELOG.md index dfdbe8377..59bac8b24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Equinox.LoadOption.RequireLeader`: support for requesting a consistent read of a stream [#341](https://github.com/jet/equinox/pull/341) - `Equinox.LoadOption.AllowStale`: Read mode that limits reads to a maximum of one retrieval per the defined time window [#386](https://github.com/jet/equinox/pull/386) - `Equinox.Core`: `Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337) +- `Equinox.Batching`: `BatcherDictionary`, `BatcherCache` to host concurrent `Batchers` [#390](https://github.com/jet/equinox/pull/390) - `Equinox.DeciderCore`: C# friendly equivalent of `Decider` (i.e. `Func` and `Task`) [#338](https://github.com/jet/equinox/pull/338) - `Equinox.ISyncContext.StreamEventBytes`: Exposes stored size of events in the stream (initial impl provides it for `DynamoStore` only) [#326](https://github.com/jet/equinox/pull/326) - `CosmosStore.Prometheus`: Add `rut` tag to enable filtering/grouping by Read vs Write activity as per `DynamoStore` [#321](https://github.com/jet/equinox/pull/321) @@ -33,6 +34,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337) - `Equinox`: push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337) - `Equinox.Core`: push `FsCodec` dependency out to concrete stores [#337](https://github.com/jet/equinox/pull/337) +- `Equinox.Core.AsyncBatchingGate`: renamed to `Batching.Batcher` [#390](https://github.com/jet/equinox/pull/390) - `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.27.0` [#310](https://github.com/jet/equinox/pull/310) - `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) - `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) diff --git a/README.md b/README.md index 728d6359d..53f58e445 100644 --- a/README.md +++ b/README.md @@ -165,7 +165,7 @@ The components within this repository are delivered as multi-targeted Nuget pack ## Data Store libraries -- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`AsyncBatchingGate`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncBatchingGate.fs#L41). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc)) +- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc)) - `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox.Core`, `FsCodec`) - `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox.Core`, `Microsoft.Azure.Cosmos` >= `3.27`, `FsCodec`, `System.Text.Json`, `FSharp.Control.TaskSeq`) - `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`) diff --git a/src/Equinox.Core/AsyncBatchingGate.fs b/src/Equinox.Core/AsyncBatchingGate.fs deleted file mode 100644 index 8d5415256..000000000 --- a/src/Equinox.Core/AsyncBatchingGate.fs +++ /dev/null @@ -1,49 +0,0 @@ -namespace Equinox.Core - -/// Thread-safe coordinator that batches concurrent requests for a single dispatch invocation such that: -/// - requests arriving together can be coalesced into the batch during the linger period via TryAdd -/// - callers that have had items admitted can concurrently await the shared fate of the dispatch via AwaitResult -/// - callers whose TryAdd has been denied can await the completion of the in-flight batch via AwaitCompletion -type internal AsyncBatch<'Req, 'Res>(dispatch: 'Req[] -> Async<'Res>, lingerMs: int) = - // Yes, naive impl in the absence of a cleaner way to have callers sharing the AwaitCompletion coordinate the adding - let queue = new System.Collections.Concurrent.BlockingCollection<'Req>() - let task = lazy if lingerMs = 0 then task { queue.CompleteAdding(); return! dispatch (queue.ToArray()) } - else task { do! System.Threading.Tasks.Task.Delay lingerMs - queue.CompleteAdding() - return! dispatch (queue.ToArray()) } - - /// Attempt to add a request to the flight - /// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult) - /// Fails if this flight has closed (caller should initialize a fresh Batch, potentially after awaiting this.AwaitCompletion) - member _.TryAdd(item) = - if queue.IsAddingCompleted then - false - else - // there's a race between the IsAddingCompleted check outcome and the CompleteAdding - // sadly there's no way to detect without a try/catch - try queue.TryAdd(item) - with :? System.InvalidOperationException -> false - - /// Await the outcome of dispatching the batch (on the basis that the caller has a stake due to a successful TryAdd) - member _.AwaitResult() = Async.AwaitTaskCorrect task.Value - - /// Wait for dispatch to conclude (for any reason: ok/exn/cancel; we only care about the channel being clear) - member _.AwaitCompletion() = - Async.FromContinuations(fun (cont, _, _) -> - task.Value.ContinueWith(fun (_: System.Threading.Tasks.Task<'Res>) -> cont ()) - |> ignore) - -/// Manages concurrent work such that requests arriving while a batch is in flight converge to wait for the next window -type AsyncBatchingGate<'Req, 'Res>(dispatch: 'Req[] -> Async<'Res>, ?linger: System.TimeSpan) = - let linger = match linger with None -> 1 | Some x -> int x.TotalMilliseconds - let mutable cell = AsyncBatch(dispatch, linger) - - member x.Execute req = - let current = cell - // If current has not yet been dispatched, hop on and join - if current.TryAdd req then current.AwaitResult() - else async { // Any thread that discovers a batch in flight, needs to wait for it to conclude first - do! current.AwaitCompletion() // NOTE we don't observe any exception from the preceding batch - // where competing threads discover a closed flight, we only want a single one to regenerate it - let _ = System.Threading.Interlocked.CompareExchange(&cell, AsyncBatch(dispatch, linger), current) - return! x.Execute req } diff --git a/src/Equinox.Core/AsyncCacheCell.fs b/src/Equinox.Core/AsyncCacheCell.fs index 7cc1c1f86..ae78e4167 100755 --- a/src/Equinox.Core/AsyncCacheCell.fs +++ b/src/Equinox.Core/AsyncCacheCell.fs @@ -5,10 +5,10 @@ open System.Threading open System.Threading.Tasks /// Asynchronous Lazy<'T> used to gate a workflow to ensure at most once execution of a computation. -type AsyncLazy<'T>(workflow: unit -> Task<'T>) = +type AsyncLazy<'T>(startTask: Func>) = // NOTE due to `Lazy` semantics, failed attempts will cache any exception; AsyncCacheCell compensates for this by rolling over to a new instance - let workflow = lazy workflow () + let workflow = lazy startTask.Invoke() /// Synchronously peek at what's been previously computed (if it's not Empty, or the last attempt Faulted). member _.TryCompleted() = @@ -37,9 +37,9 @@ type AsyncLazy<'T>(workflow: unit -> Task<'T>) = /// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics. /// If `workflow` fails, all readers entering while the load/refresh is in progress will share the failure /// The first caller through the gate triggers a recomputation attempt if the previous attempt ended in failure -type AsyncCacheCell<'T>(workflow, ?isExpired: 'T -> bool) = +type AsyncCacheCell<'T>(startWorkflow : Func>, []?isExpired: Func<'T, bool>) = - let isValid = match isExpired with Some f -> f >> not | None -> fun _ -> true + let isValid = match isExpired with Some f -> not << f.Invoke | None -> fun _ -> true let mutable cell = AsyncLazy<'T>.Empty /// Synchronously check the value remains valid (to enable short-circuiting an Await step where value not required) @@ -54,7 +54,7 @@ type AsyncCacheCell<'T>(workflow, ?isExpired: 'T -> bool) = | ValueSome res when isValid res -> return res | _ -> // Prepare a new instance, with cancellation under our control (it won't start until the first Await on the LazyTask triggers it though) - let newInstance = AsyncLazy<'T>(fun () -> workflow ct) + let newInstance = AsyncLazy<'T>(fun () -> startWorkflow.Invoke ct) // If there are concurrent executions, the first through the gate wins; everybody else awaits the instance the winner wrote let _ = Interlocked.CompareExchange(&cell, newInstance, current) return! cell.Await() } diff --git a/src/Equinox.Core/Batching.fs b/src/Equinox.Core/Batching.fs new file mode 100644 index 000000000..0b8e359d1 --- /dev/null +++ b/src/Equinox.Core/Batching.fs @@ -0,0 +1,109 @@ +// Manages grouping of concurrent requests (typically within a projection scenario) into batches +// Typically to reduce contention on a target resource +namespace Equinox.Core.Batching + +open Equinox.Core +open System +open System.Threading +open System.Threading.Tasks + +/// Thread-safe coordinator that batches concurrent requests for a single dispatch invocation such that: +/// - requests arriving together can be coalesced into the batch during the linger period via TryAdd +/// - callers that have had items admitted can concurrently await the shared fate of the dispatch via Await +/// - callers whose TryAdd has been denied can await the completion of the in-flight batch via AwaitCompletion +type internal AsyncBatch<'Req, 'Res>() = + let queue = new System.Collections.Concurrent.BlockingCollection<'Req>() + let tryEnqueue item = + if queue.IsAddingCompleted then false + else + // there's a race between the IsAddingCompleted check outcome and the CompleteAdding + // sadly there's no way to detect without a try/catch + try queue.TryAdd(item) + with :? InvalidOperationException -> false + let mutable attempt = Unchecked.defaultof>> + + /// Attempt to add a request to the flight + /// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult) + /// Fails if this flight has closed (caller should initialize a fresh Batch, potentially holding off until the current attempt completes) + member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs: int, ct) = + if not (tryEnqueue req) then false else + + // Prepare a new instance, with cancellation under our control (it won't start until the Force triggers it though) + let newInstance : Lazy> = lazy task { + do! Task.Delay(lingerMs, ct) + queue.CompleteAdding() + return! dispatch.Invoke(queue.ToArray(), ct) } + // If there are concurrent executions, the first through the gate wins; everybody else awaits the attempt the winner wrote + let _ = Interlocked.CompareExchange(&attempt, newInstance, null) + true + + /// Await the outcome of dispatching the batch (on the basis that the caller has a stake due to a successful tryEnqueue) + member _.Await() = attempt.Value + +/// Manages concurrent work such that requests arriving while a batch is in flight converge to wait for the next window +type Batcher<'Req, 'Res>(dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, []?linger: TimeSpan) = + let lingerMs = match linger with None -> 1 | Some x -> int x.TotalMilliseconds + let mutable cell = AsyncBatch<'Req, 'Res>() + + new (dispatch: 'Req[] -> Async<'Res[]>, ?linger) = Batcher((fun reqs ct -> dispatch reqs |> Async.startImmediateAsTask ct), ?linger = linger) + + /// Include an item in the batch; await the collective dispatch (subject to the configured linger time) + member x.ExecuteAsync(req, ct) = task { + let current = cell + // If current has not yet been dispatched, hop on and join + if current.TryAdd(req, dispatch, lingerMs, ct) then return! current.Await() + else // Any thread that discovers a batch in flight, needs to wait for it to conclude first + do! current.Await().ContinueWith(fun (_: Task) -> ()) // wait for, but don't observe the exception or result from the in-flight batch + // where competing threads discover a closed flight, we only want a single one to regenerate it + let _ = Interlocked.CompareExchange(&cell, AsyncBatch(), current) + return! x.ExecuteAsync(req, ct) } // but everyone attempts to merge their requests into the batch during the linger period + + /// Include an item in the batch; await the collective dispatch (subject to the configured linger time) + member x.Execute(req) = Async.call (fun ct -> x.ExecuteAsync(req, ct)) + +/// Thread Safe collection intended to manage a collection of Batchers (or instances of an equivalent type) +/// NOTE the memory usage is unbounded; if there are not a small stable number of entries, it's advised to employ a BatcherCache +type BatcherDictionary<'Id, 'Entry>(create: Func<'Id, 'Entry>) = + + // Its important we don't risk >1 instance https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/ + // while it would be safe, there would be a risk of incurring the cost of multiple initialization loops + let entries = System.Collections.Concurrent.ConcurrentDictionary<'Id, Lazy<'Entry>>() + let build id = lazy create.Invoke id + + member _.GetOrAdd(id: 'Id): 'Entry = + entries.GetOrAdd(id, build).Value + +/// Thread Safe helper that maintains a set of Batchers (or instances of an equivalent type) within a MemoryCache +/// NOTE if the number of items is bounded, BatcherDictionary is significantly more efficient +type BatcherCache<'Id, 'Entry> private (cache: System.Runtime.Caching.MemoryCache, toKey: Func<'Id, string>, create: Func<'Id, 'Entry>, ?cacheWindow) = + let tryGet key = + match cache.Get key with + | null -> ValueNone + | existingEntry -> ValueSome (existingEntry :?> 'Entry) + let cacheWindow = defaultArg cacheWindow (TimeSpan.FromMinutes 1) + let cachePolicy = Caching.policySlidingExpiration cacheWindow () + let addOrGet key entry = + match cache.AddOrGetExisting(key, entry, policy = cachePolicy) with + | null -> Ok entry + | existingEntry -> Error (existingEntry :?> 'Entry) + + /// Stores entries in the supplied cache, with entries identified by keys of the form "$Batcher-{id}" + new(cache: System.Runtime.Caching.MemoryCache, createEntry: Func<'Id, 'Entry>, ?cacheWindow) = + let mapKey = Func<'Id, string>(fun id -> "$Batcher-" + string id) + BatcherCache(cache, mapKey, createEntry, ?cacheWindow = cacheWindow) + /// Maintains the entries in an internal cache limited to the specified size, with entries identified by "{id}" + new(name, create: Func<'Id, 'Entry>, sizeMb: int, ?cacheWindow) = + let config = System.Collections.Specialized.NameValueCollection(1) + config.Add("cacheMemoryLimitMegabytes", string sizeMb) + BatcherCache(new System.Runtime.Caching.MemoryCache(name, config), Func<'Id, string>(string), create, ?cacheWindow = cacheWindow) + + member _.GetOrAdd(id : 'Id) : 'Entry = + // Optimise for low allocations on happy path + let key = toKey.Invoke(id) + match tryGet key with + | ValueSome entry -> entry + | ValueNone -> + + match addOrGet key (create.Invoke id) with + | Ok entry -> entry + | Error entry -> entry diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index f0b5a4a5d..c536ad788 100755 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -25,7 +25,7 @@ type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, if verifiedTimestamp < timestamp then // Don't count attempts to overwrite with stale state as verification verifiedTimestamp <- timestamp // Follows high level flow of AsyncCacheCell.Await - read the comments there, and the AsyncCacheCell tests first! - member x.ReadThrough(maxAge: TimeSpan, isStale, load) = task { + member x.ReadThrough(maxAge: TimeSpan, isStale, load: Func<_, _>) = task { let cacheEntryValidityCheckTimestamp = System.Diagnostics.Stopwatch.GetTimestamp() let isWithinMaxAge cachedValueTimestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - cachedValueTimestamp) <= maxAge.TotalSeconds let fetchStateConsistently () = struct (cell, tryGet (), isWithinMaxAge verifiedTimestamp) @@ -40,8 +40,8 @@ type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, | _ -> // .. it wasn't; join the race to dispatch a request (others following us will share our fate via the TryAwaitValid) - let newInstance = AsyncLazy(load maybeBaseState) - let _ = System.Threading.Interlocked.CompareExchange(&cell, newInstance, ourInitialCellState) + let newInstance = AsyncLazy(fun () -> load.Invoke maybeBaseState) + let _ = Interlocked.CompareExchange(&cell, newInstance, ourInitialCellState) let! timestamp, (token, state as res) = cell.Await() x.MergeUpdates(isStale, timestamp, token, state) // merge observed result into the cache return res } @@ -71,25 +71,28 @@ type Cache private (inner: System.Runtime.Caching.MemoryCache) = config.Add("cacheMemoryLimitMegabytes", string sizeMb); Cache(new System.Runtime.Caching.MemoryCache(name, config)) // if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws) - member _.Load(key, maxAge, isStale, policy, loadOrReload) = task { - let loadOrReload maybeBaseState () = task { + member internal _.Load(key, maxAge, isStale, policy, loadOrReload, ct) = task { + let loadOrReload maybeBaseState = task { let act = System.Diagnostics.Activity.Current if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore let ts = System.Diagnostics.Stopwatch.GetTimestamp() - let! res = loadOrReload maybeBaseState + let! res = loadOrReload ct maybeBaseState return struct (ts, res) } if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it let maybeBaseState = tryLoad key - let! timestamp, res = loadOrReload maybeBaseState () + let! timestamp, res = loadOrReload maybeBaseState addOrMergeCacheEntry isStale key policy timestamp res return res else // ensure we have an entry in the cache for this key; coordinate retrieval through that let cacheSlot = getElseAddEmptyEntry key policy return! cacheSlot.ReadThrough(maxAge, isStale, loadOrReload) } // Newer values get saved; equal values update the last retrieval timestamp - member _.Save(key, isStale, policy, timestamp, token, state) = + member internal _.Save(key, isStale, policy, timestamp, token, state) = addOrMergeCacheEntry isStale key policy timestamp (token, state) + /// Exposes the internal MemoryCache + member val Inner = inner + type [] CachingStrategy = /// Retain a single 'state per streamName. /// Each cache hit for a stream renews the retention period for the defined window. diff --git a/src/Equinox.Core/Caching.fs b/src/Equinox.Core/Caching.fs index 1e7f57097..b5d640919 100644 --- a/src/Equinox.Core/Caching.fs +++ b/src/Equinox.Core/Caching.fs @@ -18,10 +18,10 @@ type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'ev (category: 'cat, cache: Equinox.Cache, isStale, createKey, createOptions) = interface ICategory<'event, 'state, 'context> with member _.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) = task { - let loadOrReload = function + let loadOrReload ct = function | ValueNone -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) | ValueSome (struct (token, state)) -> category.Reload(log, streamName, requireLeader, token, state, ct) - return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload) } + return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload, ct) } member _.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task { let timestamp = System.Diagnostics.Stopwatch.GetTimestamp() // NB take the timestamp before any potential write takes place let save struct (token, state) = cache.Save(createKey streamName, isStale, createOptions (), timestamp, token, state) @@ -35,9 +35,9 @@ type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'ev let private mkKey prefix streamName = prefix + streamName -let private policySlidingExpiration (slidingExpiration: TimeSpan) () = +let internal policySlidingExpiration (slidingExpiration: TimeSpan) () = System.Runtime.Caching.CacheItemPolicy(SlidingExpiration = slidingExpiration) -let private policyFixedTimeSpan (period: TimeSpan) () = +let internal policyFixedTimeSpan (period: TimeSpan) () = let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add period System.Runtime.Caching.CacheItemPolicy(AbsoluteExpiration = expirationPoint) let private mapStrategy = function diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj index 0dd09d7f9..43965761a 100644 --- a/src/Equinox.Core/Equinox.Core.fsproj +++ b/src/Equinox.Core/Equinox.Core.fsproj @@ -13,7 +13,7 @@ - + diff --git a/tests/Equinox.Core.Tests/AsyncBatchingGateTests.fs b/tests/Equinox.Core.Tests/BatchingTests.fs similarity index 82% rename from tests/Equinox.Core.Tests/AsyncBatchingGateTests.fs rename to tests/Equinox.Core.Tests/BatchingTests.fs index 7dbf5df8c..d20844fda 100644 --- a/tests/Equinox.Core.Tests/AsyncBatchingGateTests.fs +++ b/tests/Equinox.Core.Tests/BatchingTests.fs @@ -1,15 +1,15 @@ -module Equinox.Core.Tests.AsyncBatchingGateTests +module Equinox.Core.Tests.BatchingTests -open System -open Equinox.Core +open Equinox.Core.Batching open FsCheck.Xunit open Swensen.Unquote +open System open System.Collections.Concurrent open System.Threading open Xunit [] -let ``AsyncBatchingGate correctness`` () = async { +let ``Batcher correctness`` () = async { let mutable batches = 0 let mutable active = 0 let dispatch (reqs : int[]) = async { @@ -21,15 +21,15 @@ let ``AsyncBatchingGate correctness`` () = async { 0 =! concurrency return reqs } - let cell = AsyncBatchingGate(dispatch, linger = TimeSpan.FromMilliseconds 40) + let cell = Batcher(dispatch, linger = TimeSpan.FromMilliseconds 40) let! results = [1 .. 100] |> Seq.map cell.Execute |> Async.Parallel test <@ set (Seq.collect id results) = set [1 .. 100] @> - // Linger of 40ms makes this tend strongly to only be 1 batch - 1 =! batches + // Linger of 40ms makes this tend strongly to only be 1 batch, but no guarantees + test <@ 1 <= batches && batches < 3 @> } [] -let ``AsyncBatchingGate error handling`` shouldFail = async { +let ``Batcher error handling`` shouldFail = async { let fails = ConcurrentBag() // Could be a ResizeArray per spec, but this removes all doubt let dispatch (reqs : int[]) = async { if shouldFail () then @@ -37,7 +37,7 @@ let ``AsyncBatchingGate error handling`` shouldFail = async { failwith $"failing %A{reqs}" return reqs } - let cell = AsyncBatchingGate dispatch + let cell = Batcher dispatch let input = [1 .. 100] let! results = input |> Seq.map (cell.Execute >> Async.Catch) |> Async.Parallel let oks = results |> Array.choose (function Choice1Of2 r -> Some r | _ -> None) diff --git a/tests/Equinox.Core.Tests/CachingTests.fs b/tests/Equinox.Core.Tests/CachingTests.fs index ad588757e..0fa49fd6b 100644 --- a/tests/Equinox.Core.Tests/CachingTests.fs +++ b/tests/Equinox.Core.Tests/CachingTests.fs @@ -166,16 +166,16 @@ type Tests() = cat.Delay <- TimeSpan.FromMilliseconds 50 let load1 = requireLoad () do! Task.Delay 10 // Make the load1 read enter a delay state (of 50) - cat.Delay <- TimeSpan.FromMilliseconds 75 // Next read picks up the longer delay + cat.Delay <- TimeSpan.FromMilliseconds 90 // Next read picks up the longer delay // These reads start after the first read so replace the older value in the cache let load2 = allowStale 1 // NB this read overlaps with load1 task, ReadThrough should coalesce with next ... - let load3 = allowStale 10 // ... should wind up internally sharing with load2 (despite taking 75, it's valid if it starts within 10) + let load3 = allowStale 10 // ... should wind up internally sharing with load2 (despite taking 80, it's valid if it starts within 10) let! struct (_t1, r1) = load2 // should be reused by load3 (and should be the winning cache entry) let! struct (_t2, r2) = load3 // requested last - should be same result as r1 let! struct (_t3, r3) = load1 // We awaited it last, but expect it to have completed first test <@ (4, 4, 3) = (r1, r2, r3) && (1, 3) = (cat.Loads, cat.Reloads) @> // NOTE While 90 should be fine in next statement, it's not fine on a cold CI rig, don't adjust! - let! struct (_token, state) = allowStale 200 // Delay of 75 overlapped with delay of 50+10 should not have expired the entry + let! struct (_token, state) = allowStale 200 // Delay of 90 overlapped with delay of 50+10 should not have expired the entry test <@ (4, 1, 3) = (state, cat.Loads, cat.Reloads) @> // The newer cache entry won cat.Delay <- TimeSpan.FromMilliseconds 10 // Reduce the delay, but we do want to overlap a write let t4 = allowStale 200 // Delay of 75 in load2/load3 should not have aged the read result beyond 200 diff --git a/tests/Equinox.Core.Tests/Equinox.Core.Tests.fsproj b/tests/Equinox.Core.Tests/Equinox.Core.Tests.fsproj index 0f52847e8..c4e4ccc8c 100644 --- a/tests/Equinox.Core.Tests/Equinox.Core.Tests.fsproj +++ b/tests/Equinox.Core.Tests/Equinox.Core.Tests.fsproj @@ -7,7 +7,7 @@ - +