Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BatcherCache, BatcherDictionary #390

Merged
merged 10 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Align AsyncBatchingGate with CacheCell
  • Loading branch information
bartelink committed Jun 6, 2023
commit 660d5fd77ff3a4e6bd367dd2d598fe1bf562f52d
73 changes: 42 additions & 31 deletions src/Equinox.Core/AsyncBatchingGate.fs
Original file line number Diff line number Diff line change
@@ -1,49 +1,60 @@
namespace Equinox.Core

open System
open System.Threading
open System.Threading.Tasks

/// Thread-safe coordinator that batches concurrent requests for a single <c>dispatch</> invocation such that:
/// - requests arriving together can be coalesced into the batch during the linger period via TryAdd
/// - callers that have had items admitted can concurrently await the shared fate of the dispatch via AwaitResult
/// - callers that have had items admitted can concurrently await the shared fate of the dispatch via Await
/// - callers whose TryAdd has been denied can await the completion of the in-flight batch via AwaitCompletion
type internal AsyncBatch<'Req, 'Res>(dispatch: 'Req[] -> Async<'Res>, lingerMs: int) =
// Yes, naive impl in the absence of a cleaner way to have callers sharing the AwaitCompletion coordinate the adding
type internal AsyncBatch<'Req, 'Res>() =
let queue = new System.Collections.Concurrent.BlockingCollection<'Req>()
let task = lazy if lingerMs = 0 then task { queue.CompleteAdding(); return! dispatch (queue.ToArray()) }
else task { do! System.Threading.Tasks.Task.Delay lingerMs
queue.CompleteAdding()
return! dispatch (queue.ToArray()) }

/// Attempt to add a request to the flight
/// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult)
/// Fails if this flight has closed (caller should initialize a fresh Batch, potentially after awaiting this.AwaitCompletion)
member _.TryAdd(item) =
if queue.IsAddingCompleted then
false
let tryEnqueue item =
if queue.IsAddingCompleted then false
else
// there's a race between the IsAddingCompleted check outcome and the CompleteAdding
// sadly there's no way to detect without a try/catch
try queue.TryAdd(item)
with :? System.InvalidOperationException -> false
with :? InvalidOperationException -> false
let mutable attempt = Unchecked.defaultof<Lazy<Task<'Res[]>>>

/// Await the outcome of dispatching the batch (on the basis that the caller has a stake due to a successful TryAdd)
member _.AwaitResult() = Async.AwaitTaskCorrect task.Value
/// Attempt to add a request to the flight
/// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult)
/// Fails if this flight has closed (caller should initialize a fresh Batch, potentially holding off until the current attempt completes)
member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs: int, ct) =
if not (tryEnqueue req) then false else

/// Wait for dispatch to conclude (for any reason: ok/exn/cancel; we only care about the channel being clear)
member _.AwaitCompletion() =
Async.FromContinuations(fun (cont, _, _) ->
task.Value.ContinueWith(fun (_: System.Threading.Tasks.Task<'Res>) -> cont ())
|> ignore)
// Prepare a new instance, with cancellation under our control (it won't start until the Force triggers it though)
let newInstance : Lazy<Task<'Res[]>> = lazy task {
do! Task.Delay(lingerMs, ct)
queue.CompleteAdding()
return! dispatch.Invoke(queue.ToArray(), ct) }
// If there are concurrent executions, the first through the gate wins; everybody else awaits the attempt the winner wrote
let _ = Interlocked.CompareExchange(&attempt, newInstance, null)
true

/// Await the outcome of dispatching the batch (on the basis that the caller has a stake due to a successful tryEnqueue)
member _.Await() = attempt.Value

/// Manages concurrent work such that requests arriving while a batch is in flight converge to wait for the next window
type AsyncBatchingGate<'Req, 'Res>(dispatch: 'Req[] -> Async<'Res>, ?linger: System.TimeSpan) =
let linger = match linger with None -> 1 | Some x -> int x.TotalMilliseconds
let mutable cell = AsyncBatch(dispatch, linger)
type AsyncBatchingGate<'Req, 'Res>(dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, [<O; D null>]?linger: TimeSpan) =
let lingerMs = match linger with None -> 1 | Some x -> int x.TotalMilliseconds
let mutable cell = AsyncBatch<'Req, 'Res>()
new (dispatch: 'Req[] -> Async<'Res[]>, ?linger) = AsyncBatchingGate((fun reqs ct -> Async.startImmediateAsTask ct (dispatch reqs)), ?linger = linger)

member x.Execute req =
/// Include an item in the batch; await the collective dispatch (subject to the configured linger time)
member x.ExecuteAsync(req, ct) = task {
let current = cell
// If current has not yet been dispatched, hop on and join
if current.TryAdd req then current.AwaitResult()
else async { // Any thread that discovers a batch in flight, needs to wait for it to conclude first
do! current.AwaitCompletion() // NOTE we don't observe any exception from the preceding batch
if current.TryAdd(req, dispatch, lingerMs, ct) then return! current.Await()
else // Any thread that discovers a batch in flight, needs to wait for it to conclude first
do! current.Await().ContinueWith<unit>(fun (_: Task) -> ()) // don't observe the exception or result from the in-flight batch
// where competing threads discover a closed flight, we only want a single one to regenerate it
let _ = System.Threading.Interlocked.CompareExchange(&cell, AsyncBatch(dispatch, linger), current)
return! x.Execute req }
let _ = Interlocked.CompareExchange(&cell, AsyncBatch(), current)
return! x.ExecuteAsync(req, ct) }

/// Include an item in the batch; await the collective dispatch (subject to the configured linger time)
member x.Execute(req) = async {
let! ct = Async.CancellationToken
return! x.ExecuteAsync(req, ct) |> Async.AwaitTaskCorrect }
10 changes: 5 additions & 5 deletions src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ open System.Threading
open System.Threading.Tasks

/// Asynchronous Lazy<'T> used to gate a workflow to ensure at most once execution of a computation.
type AsyncLazy<'T>(workflow: unit -> Task<'T>) =
type AsyncLazy<'T>(startTask: Func<Task<'T>>) =

// NOTE due to `Lazy<T>` semantics, failed attempts will cache any exception; AsyncCacheCell compensates for this by rolling over to a new instance
let workflow = lazy workflow ()
let workflow = lazy startTask.Invoke()

/// Synchronously peek at what's been previously computed (if it's not Empty, or the last attempt Faulted).
member _.TryCompleted() =
Expand Down Expand Up @@ -37,9 +37,9 @@ type AsyncLazy<'T>(workflow: unit -> Task<'T>) =
/// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics.
/// If `workflow` fails, all readers entering while the load/refresh is in progress will share the failure
/// The first caller through the gate triggers a recomputation attempt if the previous attempt ended in failure
type AsyncCacheCell<'T>(workflow, ?isExpired: 'T -> bool) =
type AsyncCacheCell<'T>(startWorkflow : Func<CancellationToken, Task<'T>>, [<O; D null>]?isExpired: Func<'T, bool>) =

let isValid = match isExpired with Some f -> f >> not | None -> fun _ -> true
let isValid = match isExpired with Some f -> not << f.Invoke | None -> fun _ -> true
let mutable cell = AsyncLazy<'T>.Empty

/// Synchronously check the value remains valid (to enable short-circuiting an Await step where value not required)
Expand All @@ -54,7 +54,7 @@ type AsyncCacheCell<'T>(workflow, ?isExpired: 'T -> bool) =
| ValueSome res when isValid res -> return res
| _ ->
// Prepare a new instance, with cancellation under our control (it won't start until the first Await on the LazyTask triggers it though)
let newInstance = AsyncLazy<'T>(fun () -> workflow ct)
let newInstance = AsyncLazy<'T>(fun () -> startWorkflow.Invoke ct)
// If there are concurrent executions, the first through the gate wins; everybody else awaits the instance the winner wrote
let _ = Interlocked.CompareExchange(&cell, newInstance, current)
return! cell.Await() }
14 changes: 7 additions & 7 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state,
if verifiedTimestamp < timestamp then // Don't count attempts to overwrite with stale state as verification
verifiedTimestamp <- timestamp
// Follows high level flow of AsyncCacheCell.Await - read the comments there, and the AsyncCacheCell tests first!
member x.ReadThrough(maxAge: TimeSpan, isStale, load) = task {
member x.ReadThrough(maxAge: TimeSpan, isStale, load: Func<_, _>) = task {
let cacheEntryValidityCheckTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
let isWithinMaxAge cachedValueTimestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - cachedValueTimestamp) <= maxAge.TotalSeconds
let fetchStateConsistently () = struct (cell, tryGet (), isWithinMaxAge verifiedTimestamp)
Expand All @@ -40,8 +40,8 @@ type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state,
| _ ->

// .. it wasn't; join the race to dispatch a request (others following us will share our fate via the TryAwaitValid)
let newInstance = AsyncLazy(load maybeBaseState)
let _ = System.Threading.Interlocked.CompareExchange(&cell, newInstance, ourInitialCellState)
let newInstance = AsyncLazy(fun () -> load.Invoke maybeBaseState)
let _ = Interlocked.CompareExchange(&cell, newInstance, ourInitialCellState)
let! timestamp, (token, state as res) = cell.Await()
x.MergeUpdates(isStale, timestamp, token, state) // merge observed result into the cache
return res }
Expand Down Expand Up @@ -71,16 +71,16 @@ type Cache private (inner: System.Runtime.Caching.MemoryCache) =
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
Cache(new System.Runtime.Caching.MemoryCache(name, config))
// if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws)
member _.Load(key, maxAge, isStale, policy, loadOrReload) = task {
let loadOrReload maybeBaseState () = task {
member _.Load(key, maxAge, isStale, policy, loadOrReload, ct) = task {
let loadOrReload maybeBaseState = task {
let act = System.Diagnostics.Activity.Current
if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore
let ts = System.Diagnostics.Stopwatch.GetTimestamp()
let! res = loadOrReload maybeBaseState
let! res = loadOrReload ct maybeBaseState
return struct (ts, res) }
if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it
let maybeBaseState = tryLoad key
let! timestamp, res = loadOrReload maybeBaseState ()
let! timestamp, res = loadOrReload maybeBaseState
addOrMergeCacheEntry isStale key policy timestamp res
return res
else // ensure we have an entry in the cache for this key; coordinate retrieval through that
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.Core/Caching.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'ev
(category: 'cat, cache: Equinox.Cache, isStale, createKey, createOptions) =
interface ICategory<'event, 'state, 'context> with
member _.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) = task {
let loadOrReload = function
let loadOrReload ct = function
| ValueNone -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct)
| ValueSome (struct (token, state)) -> category.Reload(log, streamName, requireLeader, token, state, ct)
return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload) }
return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload, ct) }
member _.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task {
let timestamp = System.Diagnostics.Stopwatch.GetTimestamp() // NB take the timestamp before any potential write takes place
let save struct (token, state) = cache.Save(createKey streamName, isStale, createOptions (), timestamp, token, state)
Expand Down