Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResolveOption/FromMemento -> LoadOption #308

Merged
merged 20 commits into from
Mar 4, 2022
Merged
Prev Previous commit
Next Next commit
Remove stream name from Tokens
  • Loading branch information
bartelink committed Mar 4, 2022
commit ec10f993d05bdfa2ee0ad3a18087881e80784427
16 changes: 2 additions & 14 deletions src/Equinox.Core/StoreCategory.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,8 @@ type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'e
category.Load(log, streamId, opt)

member _.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
category.TrySync(log, token, originState, events, context)

/// Handles case where some earlier processing has loaded or determined a the state of a stream, allowing us to avoid a read roundtrip
type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>, memento : StreamToken * 'state) =
let mutable preloadedTokenAndState = Some memento
interface IStream<'event, 'state> with
member _.Load log =
match preloadedTokenAndState with
| Some value -> async { preloadedTokenAndState <- None; return value }
| None -> inner.Load log

member _.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
inner.TrySync(log, token, originState, events)
category.TrySync(log, streamId, token, originState, events, context)

module Stream =

let create (category : ICategory<'event, 'state, 'streamId, 'context>) streamId opt context : IStream<'event, 'state> = Stream(category, streamId, opt, context) :> _
let ofMemento (memento : StreamToken * 'state) (inner : IStream<'event,'state>) : IStream<'event, 'state> = InitializedStream(inner, memento) :> _
2 changes: 1 addition & 1 deletion src/Equinox.Core/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type ICategory<'event, 'state, 'streamId, 'context> =
/// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State
/// NB the central precondition upon which the sync is predicated is that the stream has not diverged from the `originState` represented by `token`
/// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store)
abstract TrySync : log: ILogger * StreamToken * 'state * events: 'event list * 'context option -> Async<SyncResult<'state>>
abstract TrySync : log: ILogger * streamName : 'streamId * StreamToken * 'state * events: 'event list * 'context option -> Async<SyncResult<'state>>

/// Represents a time measurement of a computation that includes stopwatch tick metadata
[<NoEquality; NoComparison>]
Expand Down
63 changes: 31 additions & 32 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -973,12 +973,12 @@ module Prune =
return eventsDeleted, eventsDeferred, lwm
}

type [<NoComparison>] Token = { stream : string; pos : Position }
type [<NoComparison>] Token = { pos : Position }
module Token =

let create stream pos : StreamToken = { value = box { stream = stream; pos = pos }; version = pos.index }
let (|Unpack|) (token : StreamToken) : string*Position = let t = unbox<Token> token.value in t.stream, t.pos
let supersedes (Unpack (_, currentPos)) (Unpack (_, xPos)) =
let create pos : StreamToken = { value = box { pos = pos }; version = pos.index }
let (|Unpack|) (token : StreamToken) : Position = let t = unbox<Token> token.value in t.pos
let supersedes (Unpack currentPos) (Unpack xPos) =
let currentVersion, newVersion = currentPos.index, xPos.index
let currentETag, newETag = currentPos.etag, xPos.etag
newVersion > currentVersion || currentETag <> newETag
Expand Down Expand Up @@ -1045,22 +1045,22 @@ type StoreClient(container : Container, fallback : Container option, query : Que

let log = log |> Log.prop "stream" stream
let! pos, events = Query.load log (minIndex, maxIndex) tip (walk log container) walkFallback
return Token.create stream pos, events }
return Token.create pos, events }
member _.ReadLazy(log, batching : QueryOptions, stream, direction, (tryDecode, isOrigin), ?minIndex, ?maxIndex) : AsyncSeq<'event[]> =
Query.walkLazy log (container, stream) batching.MaxItems batching.MaxRequests (tryDecode, isOrigin) (direction, minIndex, maxIndex)

member store.Load(log, (stream, maybePos), (tryDecode, isOrigin), checkUnfolds : bool) : Async<StreamToken * 'event[]> =
if not checkUnfolds then store.Read(log, stream, Direction.Backward, (tryDecode, isOrigin))
else async {
match! loadTip log stream maybePos with
| Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty, Array.empty
| Tip.Result.NotFound -> return Token.create Position.fromKnownEmpty, Array.empty
| Tip.Result.NotModified -> return invalidOp "Not applicable"
| Tip.Result.Found (pos, i, xs) -> return! store.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), tip = (pos, i, xs)) }
member _.GetPosition(log, stream, ?pos) : Async<StreamToken> = async {
match! loadTip log stream pos with
| Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty
| Tip.Result.NotModified -> return Token.create stream pos.Value
| Tip.Result.Found (pos, _i, _unfoldsAndEvents) -> return Token.create stream pos }
| Tip.Result.NotFound -> return Token.create Position.fromKnownEmpty
| Tip.Result.NotModified -> return Token.create pos.Value
| Tip.Result.Found (pos, _i, _unfoldsAndEvents) -> return Token.create pos }
member store.Reload(log, (stream, pos), (tryDecode, isOrigin), ?preview): Async<LoadFromTokenResult<'event>> =
let read tipContent = async {
let! res = store.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), minIndex = pos.index, tip = tipContent)
Expand All @@ -1069,16 +1069,16 @@ type StoreClient(container : Container, fallback : Container option, query : Que
| Some (pos, i, xs) -> read (pos, i, xs)
| None -> async {
match! loadTip log stream (Some pos) with
| Tip.Result.NotFound -> return LoadFromTokenResult.Found (Token.create stream Position.fromKnownEmpty, Array.empty)
| Tip.Result.NotFound -> return LoadFromTokenResult.Found (Token.create Position.fromKnownEmpty, Array.empty)
| Tip.Result.NotModified -> return LoadFromTokenResult.Unchanged
| Tip.Result.Found (pos, i, xs) -> return! read (pos, i, xs) }

member internal _.Sync(log, stream, exp, batch : Tip) : Async<InternalSyncResult> = async {
if Array.isEmpty batch.e && Array.isEmpty batch.u then invalidOp "Must write either events or unfolds."
match! Sync.batch log (tip.WriteRetryPolicy, tip.MaxEvents, tip.MaxJsonLength) (container, stream) (exp, batch) with
| Sync.Result.Conflict (pos', events) -> return InternalSyncResult.Conflict (pos', events)
| Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create stream pos')
| Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create stream pos') }
| Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create pos')
| Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create pos') }

member _.Prune(log, stream, index) =
Prune.until log (container, stream) query.MaxItems index
Expand All @@ -1087,14 +1087,13 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE
member _.Load(log, stream, initial, checkUnfolds, fold, isOrigin) : Async<StreamToken * 'state> = async {
let! token, events = store.Load(log, (stream, None), (codec.TryDecode, isOrigin), checkUnfolds)
return token, fold initial events }
member _.Reload(log, (Token.Unpack (stream, pos) as streamToken), state, fold, isOrigin, ?preloaded) : Async<StreamToken * 'state> = async {
match! store.Reload(log, (stream, pos), (codec.TryDecode, isOrigin), ?preview = preloaded) with
member _.Reload(log, streamName, (Token.Unpack pos as streamToken), state, fold, isOrigin, ?preloaded) : Async<StreamToken * 'state> = async {
match! store.Reload(log, (streamName, pos), (codec.TryDecode, isOrigin), ?preview = preloaded) with
| LoadFromTokenResult.Unchanged -> return streamToken, state
| LoadFromTokenResult.Found (token', events) -> return token', fold state events }
member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) : Async<SyncResult<'state>> = async {
member cat.Sync(log, streamName, (Token.Unpack pos as streamToken), state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) : Async<SyncResult<'state>> = async {
let state' = fold state (Seq.ofList events)
let encode e = codec.Encode(context, e)
let (Token.Unpack (stream, pos)) = token
let exp, events, eventsEncoded, projectionsEncoded =
match mapUnfolds with
| Choice1Of3 () -> SyncExp.Version pos.index, events, Seq.map encode events |> Array.ofSeq, Seq.empty
Expand All @@ -1105,10 +1104,10 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE
let baseIndex = pos.index + int64 (List.length events)
let compressor = if compressUnfolds then JsonCompressedBase64Converter.Compress else JsonHelper.fixup
let projections = Sync.mkUnfold compressor baseIndex projectionsEncoded
let batch = Sync.mkBatch stream eventsEncoded projections
match! store.Sync(log, stream, exp, batch) with
| InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', pos.index, tipEvents)))
| InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin))
let batch = Sync.mkBatch streamName eventsEncoded projections
match! store.Sync(log, streamName, exp, batch) with
| InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, streamName, streamToken, state, fold, isOrigin, (pos', pos.index, tipEvents)))
| InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (cat.Reload(log, streamName, streamToken, state, fold, isOrigin))
| InternalSyncResult.Written token' -> return SyncResult.Written (token', state') }

module internal Caching =
Expand Down Expand Up @@ -1141,10 +1140,10 @@ module internal Caching =
match! tryReadCache streamName with
| None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName
| Some tokenAndState when opt = Some Equinox.AllowStale -> return tokenAndState // read already updated TTL, no need to write
| Some (token, state) -> return! category.Reload(log, token, state, fold, isOrigin) |> cache streamName }
member _.TrySync(log : ILogger, (Token.Unpack (streamName, _) as streamToken), state, events : 'event list, context)
| Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName }
member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context)
: Async<SyncResult<'state>> = async {
match! category.Sync(log, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with
match! category.Sync(log, streamName, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with
| SyncResult.Conflict resync ->
return SyncResult.Conflict (cache streamName resync)
| SyncResult.Written (token', state') ->
Expand Down Expand Up @@ -1449,17 +1448,17 @@ type CosmosStoreCategory<'event, 'state, 'context>
Caching.CachingCategory<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, tryReadCache, updateCache, checkUnfolds, compressUnfolds, mapUnfolds) :> _
categories.GetOrAdd(categoryName, createCategory)

let resolveStream (StreamName.CategoryAndId (categoryName, streamId)) opt ctx =
let resolveStream (StreamName.CategoryAndId (categoryName, streamId) as sn) opt ctx =
let container, streamId, maybeContainerInitializationGate = context.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId)
let category = resolveCategory (categoryName, container)
{ new IStream<'event, 'state> with
member _.Load log = category.Load(log, streamId, opt)
member _.TrySync(log : ILogger, token : StreamToken, originState : 'state, events : 'event list) =
match maybeContainerInitializationGate with
| None -> category.TrySync(log, token, originState, events, ctx)
| None -> category.TrySync(log, StreamName.toString sn, token, originState, events, ctx)
| Some init -> async {
do! init ()
return! category.TrySync(log, token, originState, events, ctx) } }
return! category.TrySync(log, StreamName.toString sn, token, originState, events, ctx) } }

member _.Resolve
( streamName : StreamName,
Expand Down Expand Up @@ -1496,7 +1495,7 @@ type EventsContext internal
false

let yieldPositionAndData res = async {
let! Token.Unpack (_, pos'), data = res
let! Token.Unpack pos', data = res
return pos', data }

let getRange direction startPos =
Expand All @@ -1523,7 +1522,7 @@ type EventsContext internal
let direction = defaultArg direction Direction.Forward
if maxCount = Some 0 then
// Search semantics include the first hit so we need to special case this anyway
return Token.create stream (defaultArg startPos Position.fromKnownEmpty), Array.empty
return Token.create (defaultArg startPos Position.fromKnownEmpty), Array.empty
else
let isOrigin =
match maxCount with
Expand All @@ -1536,8 +1535,8 @@ type EventsContext internal

/// Establishes the current position of the stream in as efficient a manner as possible
/// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency state checks)
member _.Sync(stream, [<O; D null>] ?position : Position) : Async<Position> = async {
let! (Token.Unpack (_, pos')) = store.GetPosition(log, stream, ?pos = position)
member __.Sync(stream, [<O; D null>] ?position : Position) : Async<Position> = async {
let! Token.Unpack pos' = store.GetPosition(log, stream, ?pos = position)
return pos' }

/// Query (with MaxItems set to `queryMaxItems`) from the specified `Position`, allowing the reader to efficiently walk away from a running query
Expand All @@ -1560,9 +1559,9 @@ type EventsContext internal
| Some init -> do! init ()
let batch = Sync.mkBatch stream events Seq.empty
match! store.Sync(log, stream, SyncExp.Version position.index, batch) with
| InternalSyncResult.Written (Token.Unpack (_, pos)) -> return AppendResult.Ok pos
| InternalSyncResult.Written (Token.Unpack pos) -> return AppendResult.Ok pos
| InternalSyncResult.Conflict (pos, events) -> return AppendResult.Conflict (pos, events)
| InternalSyncResult.ConflictUnknown (Token.Unpack (_, pos)) -> return AppendResult.ConflictUnknown pos }
| InternalSyncResult.ConflictUnknown (Token.Unpack pos) -> return AppendResult.ConflictUnknown pos }

/// Low level, non-idempotent call appending events to a stream without a concurrency control mechanism in play
/// NB Should be used sparingly; Equinox.Decider enables building equivalent equivalent idempotent handling with minimal code.
Expand Down
Loading