Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: Tasks / Struct #337

Merged
merged 14 commits into from
Sep 2, 2022
Prev Previous commit
Next Next commit
checkpoint
  • Loading branch information
bartelink committed Sep 1, 2022
commit 2fbb70b8f53f4d3964c673c0e700e47c669c7207
43 changes: 29 additions & 14 deletions src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ type AsyncLazy<'T>(workflow : unit -> Task<'T>) =
member _.Await() = workflow.Value

/// Synchronously check whether the value has been computed (and/or remains valid)
member x.IsValid(isExpired) =
member _.IsValid(isExpired) =
if not workflow.IsValueCreated then false else

let t = workflow.Value
if t = null || not t.IsCompleted || t.IsFaulted then false else
if not t.IsCompleted || t.IsFaulted then false else

match isExpired with
| ValueSome f -> not (f t.Result)
Expand All @@ -28,11 +28,11 @@ type AsyncLazy<'T>(workflow : unit -> Task<'T>) =
let t = workflow.Value

// Determines if the last attempt completed, but failed; For TMI see https://stackoverflow.com/a/33946166/11635
if t = null || t.IsFaulted then Task.FromResult ValueNone
if t.IsFaulted then Task.FromResult ValueNone
else task {
let! (res : 'T) = t
match isExpired with
| ValueSome check when not (check res) -> return ValueNone
| ValueSome isExpired when isExpired res -> return ValueNone
| _ -> return ValueSome res }

/// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics.
Expand All @@ -41,22 +41,37 @@ type AsyncLazy<'T>(workflow : unit -> Task<'T>) =
type AsyncCacheCell<'T>(workflow : CancellationToken -> Task<'T>, ?isExpired : 'T -> bool) =

let isExpired = match isExpired with Some x -> ValueSome x | None -> ValueNone
let mutable cell = AsyncLazy(fun () -> null)

/// Synchronously check the value remains valid (to short-circuit an Async AwaitValue step where value not required)
member _.IsValid() = cell.IsValid(isExpired)
let mutable cell = Unchecked.defaultof<AsyncLazy<'T>>
let cellIsUninitialized () = obj.ReferenceEquals(null, cell)
let createFresh ct =
// Prepare to do the work, with cancellation under out control
let attemptLoad () = workflow ct
// we want the actual workflow (if it is ultimately to run) to be on the thread pool
let dispatch () = Task.Run<'T>(attemptLoad)
AsyncLazy dispatch
let mutable initialized = 0
let ensureInitialized ct =
if Interlocked.Exchange(&initialized, 1) = 1 then
// Concurrent calls will need to wait for the first one to initialize
while cellIsUninitialized () do Thread.SpinWait 20
Volatile.Read &cell
else // First ever caller generates the first task
let fresh = createFresh ct
Volatile.Write(&cell, fresh)
fresh

/// Synchronously check the value remains valid (to short-circuit an Await step where value not required)
member _.IsValid() = not (cellIsUninitialized ()) && cell.IsValid(isExpired)

/// Gets or asynchronously recomputes a cached value depending on expiry and availability
member _.Await(ct) = task {
// First, take a local copy of the current state
let current = cell
let current = ensureInitialized ct

match! current.TryAwaitValid(isExpired) with
| ValueSome res -> return res // ... if it's already / still valid, we're done
| ValueNone ->
// Prepare to do the work, with cancellation under out control
let attemptLoad () = workflow ct
// we want the actual workflow (if it is ultimately to run) to be on the thread pool
let dispatch () = Task.Run<'T>(attemptLoad)
// avoid unnecessary recomputation in cases where competing threads detect expiry;
// the first write attempt wins, and everybody else reads off that value
let _ = Interlocked.CompareExchange(&cell, AsyncLazy(dispatch), current)
let _ = Interlocked.CompareExchange(&cell, createFresh ct, current)
return! cell.Await() }
11 changes: 8 additions & 3 deletions tests/Equinox.CosmosStore.Integration/CacheCellTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ let ``AsyncCacheCell correctness`` () = async {
let mutable state = 0
let mutable expectedValue = 1
let cell = AsyncCacheCell((fun _ct -> task { return Interlocked.Increment &state }), fun value -> value <> expectedValue)

false =! cell.IsValid()

let! r = cell.Await(CancellationToken.None) |> Async.AwaitTask
true =! cell.IsValid()
1 =! r

let! accessResult = [|1 .. 100|] |> Array.map (fun _i -> cell.Await(CancellationToken.None) |> Async.AwaitTask) |> Async.Parallel
test <@ accessResult |> Array.forall ((=) 1) @>
true =! cell.IsValid()
Expand Down Expand Up @@ -57,7 +62,7 @@ let ``AsyncCacheCell correctness with throwing`` initiallyThrowing = async {
// If the runner is throwing, we want to be sure it doesn't place us in a failed state forever, per the semantics of Lazy<T>
// However, we _do_ want to be sure that the function only runs once
if initiallyThrowing then
let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.Await(CancellationToken.None) |> Async.AwaitTask |> Async.Catch) |> Async.Parallel
let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.Await(CancellationToken.None) |> Async.AwaitTaskCorrect |> Async.Catch) |> Async.Parallel
test <@ accessResult |> Array.forall (function Choice2Of2 (:? InvalidOperationException) -> true | _ -> false) @>
throwing <- false
false =! cell.IsValid()
Expand All @@ -68,7 +73,7 @@ let ``AsyncCacheCell correctness with throwing`` initiallyThrowing = async {

expectedValue <- expectedValue + 1

let! accessResult = [|1 .. 100|] |> Array.map (fun _ -> cell.Await(CancellationToken.None) |> Async.AwaitTask) |> Async.Parallel
let! accessResult = [|1 .. 100|] |> Array.map (fun _ -> cell.Await(CancellationToken.None) |> Async.AwaitTaskCorrect) |> Async.Parallel
test <@ accessResult |> Array.forall ((=) 2) @>
true =! cell.IsValid()

Expand All @@ -78,7 +83,7 @@ let ``AsyncCacheCell correctness with throwing`` initiallyThrowing = async {
// but make the computation ultimately fail
throwing <- true
// All share the failure
let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.Await(CancellationToken.None) |> Async.AwaitTask |> Async.Catch) |> Async.Parallel
let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.Await(CancellationToken.None) |> Async.AwaitTaskCorrect |> Async.Catch) |> Async.Parallel
test <@ accessResult |> Array.forall (function Choice2Of2 (:? InvalidOperationException) -> true | _ -> false) @>
// Restore normality
throwing <- false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\..\src\Equinox.Core\Infrastructure.fs" Link="Infrastructure.fs" />
<Compile Include="..\..\samples\Store\Integration\TestOutput.fs" Link="TestOutput.fs" />
<Compile Include="CosmosFixtures.fs" />
<Compile Include="CosmosFixturesInfrastructure.fs" />
Expand Down