Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BatcherCache, BatcherDictionary #390

Merged
merged 10 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox.LoadOption.RequireLeader`: support for requesting a consistent read of a stream [#341](https://github.com/jet/equinox/pull/341)
- `Equinox.LoadOption.AllowStale`: Read mode that limits reads to a maximum of one retrieval per the defined time window [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Core`: `Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Batching`: `BatcherDictionary`, `BatcherCache` to host concurrent `Batchers` [#390](https://github.com/jet/equinox/pull/390)
- `Equinox.DeciderCore`: C# friendly equivalent of `Decider` (i.e. `Func` and `Task`) [#338](https://github.com/jet/equinox/pull/338)
- `Equinox.ISyncContext.StreamEventBytes`: Exposes stored size of events in the stream (initial impl provides it for `DynamoStore` only) [#326](https://github.com/jet/equinox/pull/326)
- `CosmosStore.Prometheus`: Add `rut` tag to enable filtering/grouping by Read vs Write activity as per `DynamoStore` [#321](https://github.com/jet/equinox/pull/321)
Expand All @@ -33,6 +34,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core`: push `FsCodec` dependency out to concrete stores [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core.AsyncBatchingGate`: renamed to `Batching.Batcher` [#390](https://github.com/jet/equinox/pull/390)
- `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.27.0` [#310](https://github.com/jet/equinox/pull/310)
- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

## Data Store libraries

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`AsyncBatchingGate`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncBatchingGate.fs#L41). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc))
- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc))
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox.Core`, `FsCodec`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox.Core`, `Microsoft.Azure.Cosmos` >= `3.27`, `FsCodec`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
Expand Down
49 changes: 0 additions & 49 deletions src/Equinox.Core/AsyncBatchingGate.fs

This file was deleted.

10 changes: 5 additions & 5 deletions src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ open System.Threading
open System.Threading.Tasks

/// Asynchronous Lazy<'T> used to gate a workflow to ensure at most once execution of a computation.
type AsyncLazy<'T>(workflow: unit -> Task<'T>) =
type AsyncLazy<'T>(startTask: Func<Task<'T>>) =

// NOTE due to `Lazy<T>` semantics, failed attempts will cache any exception; AsyncCacheCell compensates for this by rolling over to a new instance
let workflow = lazy workflow ()
let workflow = lazy startTask.Invoke()

/// Synchronously peek at what's been previously computed (if it's not Empty, or the last attempt Faulted).
member _.TryCompleted() =
Expand Down Expand Up @@ -37,9 +37,9 @@ type AsyncLazy<'T>(workflow: unit -> Task<'T>) =
/// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics.
/// If `workflow` fails, all readers entering while the load/refresh is in progress will share the failure
/// The first caller through the gate triggers a recomputation attempt if the previous attempt ended in failure
type AsyncCacheCell<'T>(workflow, ?isExpired: 'T -> bool) =
type AsyncCacheCell<'T>(startWorkflow : Func<CancellationToken, Task<'T>>, [<O; D null>]?isExpired: Func<'T, bool>) =

let isValid = match isExpired with Some f -> f >> not | None -> fun _ -> true
let isValid = match isExpired with Some f -> not << f.Invoke | None -> fun _ -> true
let mutable cell = AsyncLazy<'T>.Empty

/// Synchronously check the value remains valid (to enable short-circuiting an Await step where value not required)
Expand All @@ -54,7 +54,7 @@ type AsyncCacheCell<'T>(workflow, ?isExpired: 'T -> bool) =
| ValueSome res when isValid res -> return res
| _ ->
// Prepare a new instance, with cancellation under our control (it won't start until the first Await on the LazyTask triggers it though)
let newInstance = AsyncLazy<'T>(fun () -> workflow ct)
let newInstance = AsyncLazy<'T>(fun () -> startWorkflow.Invoke ct)
// If there are concurrent executions, the first through the gate wins; everybody else awaits the instance the winner wrote
let _ = Interlocked.CompareExchange(&cell, newInstance, current)
return! cell.Await() }
109 changes: 109 additions & 0 deletions src/Equinox.Core/Batching.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Manages grouping of concurrent requests (typically within a projection scenario) into batches
// Typically to reduce contention on a target resource
namespace Equinox.Core.Batching

open Equinox.Core
open System
open System.Threading
open System.Threading.Tasks

/// Thread-safe coordinator that batches concurrent requests for a single <c>dispatch</> invocation such that:
/// - requests arriving together can be coalesced into the batch during the linger period via TryAdd
/// - callers that have had items admitted can concurrently await the shared fate of the dispatch via Await
/// - callers whose TryAdd has been denied can await the completion of the in-flight batch via AwaitCompletion
type internal AsyncBatch<'Req, 'Res>() =
let queue = new System.Collections.Concurrent.BlockingCollection<'Req>()
let tryEnqueue item =
if queue.IsAddingCompleted then false
else
// there's a race between the IsAddingCompleted check outcome and the CompleteAdding
// sadly there's no way to detect without a try/catch
try queue.TryAdd(item)
with :? InvalidOperationException -> false
let mutable attempt = Unchecked.defaultof<Lazy<Task<'Res[]>>>

/// Attempt to add a request to the flight
/// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult)
/// Fails if this flight has closed (caller should initialize a fresh Batch, potentially holding off until the current attempt completes)
member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs: int, ct) =
if not (tryEnqueue req) then false else

// Prepare a new instance, with cancellation under our control (it won't start until the Force triggers it though)
let newInstance : Lazy<Task<'Res[]>> = lazy task {
do! Task.Delay(lingerMs, ct)
queue.CompleteAdding()
return! dispatch.Invoke(queue.ToArray(), ct) }
// If there are concurrent executions, the first through the gate wins; everybody else awaits the attempt the winner wrote
let _ = Interlocked.CompareExchange(&attempt, newInstance, null)
true

/// Await the outcome of dispatching the batch (on the basis that the caller has a stake due to a successful tryEnqueue)
member _.Await() = attempt.Value

/// Manages concurrent work such that requests arriving while a batch is in flight converge to wait for the next window
type Batcher<'Req, 'Res>(dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, [<O; D null>]?linger: TimeSpan) =
let lingerMs = match linger with None -> 1 | Some x -> int x.TotalMilliseconds
let mutable cell = AsyncBatch<'Req, 'Res>()

new (dispatch: 'Req[] -> Async<'Res[]>, ?linger) = Batcher((fun reqs ct -> dispatch reqs |> Async.startImmediateAsTask ct), ?linger = linger)

/// Include an item in the batch; await the collective dispatch (subject to the configured linger time)
member x.ExecuteAsync(req, ct) = task {
let current = cell
// If current has not yet been dispatched, hop on and join
if current.TryAdd(req, dispatch, lingerMs, ct) then return! current.Await()
else // Any thread that discovers a batch in flight, needs to wait for it to conclude first
do! current.Await().ContinueWith<unit>(fun (_: Task) -> ()) // wait for, but don't observe the exception or result from the in-flight batch
// where competing threads discover a closed flight, we only want a single one to regenerate it
let _ = Interlocked.CompareExchange(&cell, AsyncBatch(), current)
return! x.ExecuteAsync(req, ct) } // but everyone attempts to merge their requests into the batch during the linger period

/// Include an item in the batch; await the collective dispatch (subject to the configured linger time)
member x.Execute(req) = Async.call (fun ct -> x.ExecuteAsync(req, ct))

/// <summary>Thread Safe collection intended to manage a collection of <c>Batchers</c> (or instances of an equivalent type)
/// NOTE the memory usage is unbounded; if there are not a small stable number of entries, it's advised to employ a <c>BatcherCache</c></summary>
type BatcherDictionary<'Id, 'Entry>(create: Func<'Id, 'Entry>) =

// Its important we don't risk >1 instance https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/
// while it would be safe, there would be a risk of incurring the cost of multiple initialization loops
let entries = System.Collections.Concurrent.ConcurrentDictionary<'Id, Lazy<'Entry>>()
let build id = lazy create.Invoke id

member _.GetOrAdd(id: 'Id): 'Entry =
entries.GetOrAdd(id, build).Value

/// <summary>Thread Safe helper that maintains a set of <c>Batchers</c> (or instances of an equivalent type) within a MemoryCache
/// NOTE if the number of items is bounded, <c>BatcherDictionary</c> is significantly more efficient</summary>
type BatcherCache<'Id, 'Entry> private (cache: System.Runtime.Caching.MemoryCache, toKey: Func<'Id, string>, create: Func<'Id, 'Entry>, ?cacheWindow) =
let tryGet key =
match cache.Get key with
| null -> ValueNone
| existingEntry -> ValueSome (existingEntry :?> 'Entry)
let cacheWindow = defaultArg cacheWindow (TimeSpan.FromMinutes 1)
let cachePolicy = Caching.policySlidingExpiration cacheWindow ()
let addOrGet key entry =
match cache.AddOrGetExisting(key, entry, policy = cachePolicy) with
| null -> Ok entry
| existingEntry -> Error (existingEntry :?> 'Entry)

/// Stores entries in the supplied cache, with entries identified by keys of the form "$Batcher-{id}"
new(cache: System.Runtime.Caching.MemoryCache, createEntry: Func<'Id, 'Entry>, ?cacheWindow) =
let mapKey = Func<'Id, string>(fun id -> "$Batcher-" + string id)
BatcherCache(cache, mapKey, createEntry, ?cacheWindow = cacheWindow)
/// Maintains the entries in an internal cache limited to the specified size, with entries identified by "{id}"
new(name, create: Func<'Id, 'Entry>, sizeMb: int, ?cacheWindow) =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb)
BatcherCache(new System.Runtime.Caching.MemoryCache(name, config), Func<'Id, string>(string), create, ?cacheWindow = cacheWindow)

member _.GetOrAdd(id : 'Id) : 'Entry =
// Optimise for low allocations on happy path
let key = toKey.Invoke(id)
match tryGet key with
| ValueSome entry -> entry
| ValueNone ->

match addOrGet key (create.Invoke id) with
| Ok entry -> entry
| Error entry -> entry
Loading