Skip to content

Commit

Permalink
Add BatcherCache, BatcherDictionary (#390)
Browse files Browse the repository at this point in the history
* Align AsyncBatchingGate with CacheCell
* Expose inner Cache, hide Load/Save
* Tweak cache test cache timings
* Add BatcherCache, BatcherDictionary
  • Loading branch information
bartelink authored Jun 6, 2023
1 parent c454817 commit 0faa64a
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 81 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox.LoadOption.RequireLeader`: support for requesting a consistent read of a stream [#341](https://github.com/jet/equinox/pull/341)
- `Equinox.LoadOption.AllowStale`: Read mode that limits reads to a maximum of one retrieval per the defined time window [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Core`: `Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Batching`: `BatcherDictionary`, `BatcherCache` to host concurrent `Batchers` [#390](https://github.com/jet/equinox/pull/390)
- `Equinox.DeciderCore`: C# friendly equivalent of `Decider` (i.e. `Func` and `Task`) [#338](https://github.com/jet/equinox/pull/338)
- `Equinox.ISyncContext.StreamEventBytes`: Exposes stored size of events in the stream (initial impl provides it for `DynamoStore` only) [#326](https://github.com/jet/equinox/pull/326)
- `CosmosStore.Prometheus`: Add `rut` tag to enable filtering/grouping by Read vs Write activity as per `DynamoStore` [#321](https://github.com/jet/equinox/pull/321)
Expand All @@ -33,6 +34,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core`: push `FsCodec` dependency out to concrete stores [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core.AsyncBatchingGate`: renamed to `Batching.Batcher` [#390](https://github.com/jet/equinox/pull/390)
- `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.27.0` [#310](https://github.com/jet/equinox/pull/310)
- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

## Data Store libraries

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`AsyncBatchingGate`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncBatchingGate.fs#L41). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc))
- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc))
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox.Core`, `FsCodec`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox.Core`, `Microsoft.Azure.Cosmos` >= `3.27`, `FsCodec`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
Expand Down
49 changes: 0 additions & 49 deletions src/Equinox.Core/AsyncBatchingGate.fs

This file was deleted.

10 changes: 5 additions & 5 deletions src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ open System.Threading
open System.Threading.Tasks

/// Asynchronous Lazy<'T> used to gate a workflow to ensure at most once execution of a computation.
type AsyncLazy<'T>(workflow: unit -> Task<'T>) =
type AsyncLazy<'T>(startTask: Func<Task<'T>>) =

// NOTE due to `Lazy<T>` semantics, failed attempts will cache any exception; AsyncCacheCell compensates for this by rolling over to a new instance
let workflow = lazy workflow ()
let workflow = lazy startTask.Invoke()

/// Synchronously peek at what's been previously computed (if it's not Empty, or the last attempt Faulted).
member _.TryCompleted() =
Expand Down Expand Up @@ -37,9 +37,9 @@ type AsyncLazy<'T>(workflow: unit -> Task<'T>) =
/// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics.
/// If `workflow` fails, all readers entering while the load/refresh is in progress will share the failure
/// The first caller through the gate triggers a recomputation attempt if the previous attempt ended in failure
type AsyncCacheCell<'T>(workflow, ?isExpired: 'T -> bool) =
type AsyncCacheCell<'T>(startWorkflow : Func<CancellationToken, Task<'T>>, [<O; D null>]?isExpired: Func<'T, bool>) =

let isValid = match isExpired with Some f -> f >> not | None -> fun _ -> true
let isValid = match isExpired with Some f -> not << f.Invoke | None -> fun _ -> true
let mutable cell = AsyncLazy<'T>.Empty

/// Synchronously check the value remains valid (to enable short-circuiting an Await step where value not required)
Expand All @@ -54,7 +54,7 @@ type AsyncCacheCell<'T>(workflow, ?isExpired: 'T -> bool) =
| ValueSome res when isValid res -> return res
| _ ->
// Prepare a new instance, with cancellation under our control (it won't start until the first Await on the LazyTask triggers it though)
let newInstance = AsyncLazy<'T>(fun () -> workflow ct)
let newInstance = AsyncLazy<'T>(fun () -> startWorkflow.Invoke ct)
// If there are concurrent executions, the first through the gate wins; everybody else awaits the instance the winner wrote
let _ = Interlocked.CompareExchange(&cell, newInstance, current)
return! cell.Await() }
109 changes: 109 additions & 0 deletions src/Equinox.Core/Batching.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Manages grouping of concurrent requests (typically within a projection scenario) into batches
// Typically to reduce contention on a target resource
namespace Equinox.Core.Batching

open Equinox.Core
open System
open System.Threading
open System.Threading.Tasks

/// Thread-safe coordinator that batches concurrent requests for a single <c>dispatch</> invocation such that:
/// - requests arriving together can be coalesced into the batch during the linger period via TryAdd
/// - callers that have had items admitted can concurrently await the shared fate of the dispatch via Await
/// - callers whose TryAdd has been denied can await the completion of the in-flight batch via AwaitCompletion
type internal AsyncBatch<'Req, 'Res>() =
let queue = new System.Collections.Concurrent.BlockingCollection<'Req>()
let tryEnqueue item =
if queue.IsAddingCompleted then false
else
// there's a race between the IsAddingCompleted check outcome and the CompleteAdding
// sadly there's no way to detect without a try/catch
try queue.TryAdd(item)
with :? InvalidOperationException -> false
let mutable attempt = Unchecked.defaultof<Lazy<Task<'Res[]>>>

/// Attempt to add a request to the flight
/// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult)
/// Fails if this flight has closed (caller should initialize a fresh Batch, potentially holding off until the current attempt completes)
member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs: int, ct) =
if not (tryEnqueue req) then false else

// Prepare a new instance, with cancellation under our control (it won't start until the Force triggers it though)
let newInstance : Lazy<Task<'Res[]>> = lazy task {
do! Task.Delay(lingerMs, ct)
queue.CompleteAdding()
return! dispatch.Invoke(queue.ToArray(), ct) }
// If there are concurrent executions, the first through the gate wins; everybody else awaits the attempt the winner wrote
let _ = Interlocked.CompareExchange(&attempt, newInstance, null)
true

/// Await the outcome of dispatching the batch (on the basis that the caller has a stake due to a successful tryEnqueue)
member _.Await() = attempt.Value

/// Manages concurrent work such that requests arriving while a batch is in flight converge to wait for the next window
type Batcher<'Req, 'Res>(dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, [<O; D null>]?linger: TimeSpan) =
let lingerMs = match linger with None -> 1 | Some x -> int x.TotalMilliseconds
let mutable cell = AsyncBatch<'Req, 'Res>()

new (dispatch: 'Req[] -> Async<'Res[]>, ?linger) = Batcher((fun reqs ct -> dispatch reqs |> Async.startImmediateAsTask ct), ?linger = linger)

/// Include an item in the batch; await the collective dispatch (subject to the configured linger time)
member x.ExecuteAsync(req, ct) = task {
let current = cell
// If current has not yet been dispatched, hop on and join
if current.TryAdd(req, dispatch, lingerMs, ct) then return! current.Await()
else // Any thread that discovers a batch in flight, needs to wait for it to conclude first
do! current.Await().ContinueWith<unit>(fun (_: Task) -> ()) // wait for, but don't observe the exception or result from the in-flight batch
// where competing threads discover a closed flight, we only want a single one to regenerate it
let _ = Interlocked.CompareExchange(&cell, AsyncBatch(), current)
return! x.ExecuteAsync(req, ct) } // but everyone attempts to merge their requests into the batch during the linger period

/// Include an item in the batch; await the collective dispatch (subject to the configured linger time)
member x.Execute(req) = Async.call (fun ct -> x.ExecuteAsync(req, ct))

/// <summary>Thread Safe collection intended to manage a collection of <c>Batchers</c> (or instances of an equivalent type)
/// NOTE the memory usage is unbounded; if there are not a small stable number of entries, it's advised to employ a <c>BatcherCache</c></summary>
type BatcherDictionary<'Id, 'Entry>(create: Func<'Id, 'Entry>) =

// Its important we don't risk >1 instance https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/
// while it would be safe, there would be a risk of incurring the cost of multiple initialization loops
let entries = System.Collections.Concurrent.ConcurrentDictionary<'Id, Lazy<'Entry>>()
let build id = lazy create.Invoke id

member _.GetOrAdd(id: 'Id): 'Entry =
entries.GetOrAdd(id, build).Value

/// <summary>Thread Safe helper that maintains a set of <c>Batchers</c> (or instances of an equivalent type) within a MemoryCache
/// NOTE if the number of items is bounded, <c>BatcherDictionary</c> is significantly more efficient</summary>
type BatcherCache<'Id, 'Entry> private (cache: System.Runtime.Caching.MemoryCache, toKey: Func<'Id, string>, create: Func<'Id, 'Entry>, ?cacheWindow) =
let tryGet key =
match cache.Get key with
| null -> ValueNone
| existingEntry -> ValueSome (existingEntry :?> 'Entry)
let cacheWindow = defaultArg cacheWindow (TimeSpan.FromMinutes 1)
let cachePolicy = Caching.policySlidingExpiration cacheWindow ()
let addOrGet key entry =
match cache.AddOrGetExisting(key, entry, policy = cachePolicy) with
| null -> Ok entry
| existingEntry -> Error (existingEntry :?> 'Entry)

/// Stores entries in the supplied cache, with entries identified by keys of the form "$Batcher-{id}"
new(cache: System.Runtime.Caching.MemoryCache, createEntry: Func<'Id, 'Entry>, ?cacheWindow) =
let mapKey = Func<'Id, string>(fun id -> "$Batcher-" + string id)
BatcherCache(cache, mapKey, createEntry, ?cacheWindow = cacheWindow)
/// Maintains the entries in an internal cache limited to the specified size, with entries identified by "{id}"
new(name, create: Func<'Id, 'Entry>, sizeMb: int, ?cacheWindow) =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb)
BatcherCache(new System.Runtime.Caching.MemoryCache(name, config), Func<'Id, string>(string), create, ?cacheWindow = cacheWindow)

member _.GetOrAdd(id : 'Id) : 'Entry =
// Optimise for low allocations on happy path
let key = toKey.Invoke(id)
match tryGet key with
| ValueSome entry -> entry
| ValueNone ->

match addOrGet key (create.Invoke id) with
| Ok entry -> entry
| Error entry -> entry
Loading

0 comments on commit 0faa64a

Please sign in to comment.