Skip to content

Commit

Permalink
Fix Version inconsistencies
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 11, 2021
1 parent 487c5b6 commit a8a3fa5
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 49 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- `ISyncContext.Version`: documented value as being `0`-based, rather than whatever a given store happens to use internally (which happens to align with the native version representation in `Equinox.Cosmos`) [#278](https://github.com/jet/equinox/pull/282)
- `MemoryStore` / `SqlStreamStore` / `EventStore`: aligned implementations to represent `Version` in a store-neutral manner per the documentation change [#278](https://github.com/jet/equinox/pull/282)

### Removed
### Fixed

- `Cosmos` / `ISyncContext.Version`: fixed erroneous `0` value when re-reading with caching but without snapshots in `Cosmos` store [#278](https://github.com/jet/equinox/pull/282)

<a name="2.5.0"></a>
## [2.5.0] - 2021-02-24


### Added

- `Stream.TransactEx`: extended signature, replacing `TransactAsyncEx`. Provides the `ISyncContext` both before and after the `decide` so pre-flight checks can be performed (as `master` [#263](https://github.com/jet/equinox/pull/263)) [#277](https://github.com/jet/equinox/pull/277)
Expand Down
19 changes: 9 additions & 10 deletions samples/Store/Backend/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@ type Service(log, resolve, ?maxAttempts) =

let resolve (Events.ForClientId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 2)

let execute clientId command : Async<unit> =
member __.Execute(clientId, command) =
let stream = resolve clientId
stream.Transact(Commands.interpret command)
let read clientId : Async<Events.Favorited []> =
let stream = resolve clientId
stream.Query id

member __.Execute(clientId, command) =
execute clientId command

member __.Favorite(clientId, skus) =
execute clientId (Command.Favorite(DateTimeOffset.Now, skus))
__.Execute(clientId, Command.Favorite(DateTimeOffset.Now, skus))

member __.Unfavorite(clientId, sku) =
execute clientId (Command.Unfavorite sku)
__.Execute(clientId, Command.Unfavorite sku)

member __.List clientId : Async<Events.Favorited []> =
read clientId
let stream = resolve clientId
stream.Query(id)

member __.ListWithVersion clientId : Async<int64 * Events.Favorited []> =
let stream = resolve clientId
stream.QueryEx(fun ctx -> ctx.Version, ctx.State)
6 changes: 3 additions & 3 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ module Fold =
let snapshot state = Events.Snapshotted { net = state }

type Command =
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
| Unfavorite of skuId : SkuId
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
| Unfavorite of skuId : SkuId

module Commands =
let interpret command (state : Fold.State) =
Expand All @@ -56,4 +56,4 @@ module Commands =
yield Events.Favorited { date = date; skuId = skuId } ]
| Unfavorite skuId ->
if doesntHave skuId then [] else
[ Events.Unfavorited { skuId = skuId } ]
[ Events.Unfavorited { skuId = skuId } ]
58 changes: 46 additions & 12 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,44 @@ let createServiceGes gateway log =
let resolve = EventStore.Resolver(gateway, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve
Backend.Favorites.Service(log, resolve)

let createServiceCosmos gateway log =
let createServiceCosmosSnapshotsUncached gateway log =
let resolve = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve
Backend.Favorites.Service(log, resolve)

let createServiceCosmosRollingState gateway log =
let createServiceCosmosRollingStateUncached gateway log =
let access = Cosmos.AccessStrategy.RollingState Domain.Favorites.Fold.snapshot
let resolve = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve
Backend.Favorites.Service(log, resolve)

let createServiceCosmosUnoptimizedButCached gateway log =
let access = Cosmos.AccessStrategy.Unoptimized
let caching =
let cache = Equinox.Cache ("name", 10)
Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolve = Cosmos.Resolver(gateway, codec, fold, initial, caching, access).Resolve
Backend.Favorites.Service(log, resolve)

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
let createLog () = createLogger testOutput

let act (service : Backend.Favorites.Service) (clientId, command) = async {
do! service.Execute(clientId, command)
let! items = service.List clientId
let! version, items = service.ListWithVersion clientId

match command with
| Domain.Favorites.Favorite (_,skuIds) ->
| Domain.Favorites.Favorite (_, skuIds) ->
test <@ skuIds |> List.forall (fun skuId -> items |> Array.exists (function { skuId = itemSkuId} -> itemSkuId = skuId)) @>
| _ ->
test <@ Array.isEmpty items @> }
| Domain.Favorites.Unfavorite _ ->
test <@ Array.isEmpty items @>
return version, items }

[<AutoData>]
let ``Can roundtrip in Memory, correctly folding the events`` args = Async.RunSynchronously <| async {
let log, store = createLog (), createMemoryStore ()
let service = createServiceMemory log store
do! act service args
let! version, items = act service args
version =! items.LongLength
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
Expand All @@ -54,23 +64,47 @@ type Tests(testOutputHelper) =
let! conn = connectToLocalEventStoreNode log
let gateway = createGesGateway conn defaultBatchSize
let service = createServiceGes gateway log
do! act service args
let! version, items = act service args
version =! items.LongLength
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events`` args = Async.RunSynchronously <| async {
let log = createLog ()
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createCosmosContext conn defaultBatchSize
let service = createServiceCosmos gateway log
do! act service args
let service = createServiceCosmosSnapshotsUncached gateway log
let! version, items = act service args
version =! items.LongLength
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with caching`` (clientId, cmd) = Async.RunSynchronously <| async {
let log = createLog ()
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createCosmosContext conn defaultBatchSize
let service = createServiceCosmosUnoptimizedButCached gateway log
let clientId = clientId () // generate a fresh one per test so repeated runs start from a stable base
let! version, items = act service (clientId, cmd)
version =! items.LongLength

// Validate consecutive Unoptimized Cached reads yield the correct Version
// TODO represent this more directly as a Cosmos Integration test
let service2 = createServiceCosmosUnoptimizedButCached gateway log
let! rereadVersion, items = service2.ListWithVersion clientId
rereadVersion =! version
rereadVersion =! items.LongLength
let! rereadVersion2, items = service2.ListWithVersion clientId
rereadVersion2 =! version
rereadVersion2 =! items.LongLength
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with rolling unfolds`` args = Async.RunSynchronously <| async {
let log = createLog ()
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createCosmosContext conn defaultBatchSize
let service = createServiceCosmosRollingState gateway log
do! act service args
let service = createServiceCosmosRollingStateUncached gateway log
let! version, _items = act service args
version =! 0L
}
13 changes: 10 additions & 3 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,13 @@ module internal Query =
let batches : AsyncSeq<ITimelineEvent<byte[]>[] * Position option * float> = run readlog retryingLoggingReadSlice maxRequests query
let! t, (events, maybeTipPos, ru) = mergeBatches log batches |> Stopwatch.Time
let raws, decoded = (Array.map fst events), (events |> Seq.choose snd |> Array.ofSeq)
let pos = match maybeTipPos with Some p -> p | None -> Position.fromMaxIndex raws
let pos =
// Hack-fix covered by test in FavoritesIntegration.
// This case is implemented differently in V3
match maybeTipPos, startPos with
| Some p, _ -> p
| None, Some startPos when Array.isEmpty raws -> startPos
| None, _ -> Position.fromMaxIndex raws

log |> logQuery direction maxItems (container, stream) t (!responseCount,raws) pos.index ru
return pos, decoded }
Expand Down Expand Up @@ -1012,8 +1018,9 @@ type Gateway(conn : Connection, batching : BatchingPolicy) =
| Tip.Result.NotFound -> return LoadFromTokenResult.Found (Token.create (container,stream) Position.fromKnownEmpty,Array.empty)
| Tip.Result.NotModified -> return LoadFromTokenResult.Unchanged
| Tip.Result.Found (pos, FromUnfold tryDecode isOrigin span) -> return LoadFromTokenResult.Found (Token.create (container,stream) pos, span)
| _ -> let! res = __.Read log (container,stream) Direction.Forward (Some pos) (tryDecode,isOrigin)
return LoadFromTokenResult.Found res }
| Tip.Result.Found _ ->
let! res = __.Read log (container,stream) Direction.Forward (Some pos) (tryDecode,isOrigin)
return LoadFromTokenResult.Found res }
member __.CreateSyncStoredProcIfNotExists log container =
Sync.Initialization.createSyncStoredProcIfNotExists log container
member __.Sync log containerStream (exp, batch: Tip): Async<InternalSyncResult> = async {
Expand Down
4 changes: 3 additions & 1 deletion src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ module Token =
{ value = box {
stream = { name = streamName}
pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } }
version = streamVersion }
// In this impl, the StreamVersion matches the EventStore StreamVersion in being -1-based
// Version however is the representation that needs to align with ISyncContext.Version
version = streamVersion + 1L }

/// No batching / compaction; we only need to retain the StreamVersion
let ofNonCompacting streamName streamVersion : StreamToken =
Expand Down
34 changes: 17 additions & 17 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ open Equinox.Core
open System.Runtime.InteropServices

/// Equivalent to EventStoreDB's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
exception private WrongVersionException of streamName: string * expected: int * value: obj
exception private WrongVersionException of value : obj

/// Internal result used to reflect the outcome of syncing with the entry in the inner ConcurrentDictionary
[<NoEquality; NoComparison>]
type ConcurrentDictionarySyncResult<'t> = Written of 't | Conflict of int
type ConcurrentDictionarySyncResult<'t> = Written of 't | Conflict

/// Response type for VolatileStore.TrySync to communicate the outcome and updated state of a stream
[<NoEquality; NoComparison>]
Expand Down Expand Up @@ -46,30 +46,30 @@ type VolatileStore<'Format>() =
events: FsCodec.ITimelineEvent<'Format>[])
: Async<ConcurrentArraySyncResult<FsCodec.ITimelineEvent<'Format>[]>> = async {
let seedStream _streamName = events
let updateValue streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
let updateValue _streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
match trySyncValue currentValue with
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue)
| ConcurrentDictionarySyncResult.Conflict -> raise <| WrongVersionException (box currentValue)
| ConcurrentDictionarySyncResult.Written value -> value
try let res = streams.AddOrUpdate(streamName, seedStream, updateValue)
// we publish the event here, once, as `updateValue` can be invoked multiple times
do! publishCommit.Execute((FsCodec.StreamName.parse streamName, events))
return Written res
with WrongVersionException(_, _, conflictingValue) ->
with WrongVersionException conflictingValue ->
return Conflict (unbox conflictingValue) }

type Token = { streamVersion: int; streamName: string }
type Token = { streamName : string; eventCount : int }

/// Internal implementation detail of MemoryStore
module private Token =

let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken =
{ value = box { streamName = streamName; streamVersion = streamVersion }
version = int64 streamVersion }
let private streamTokenOfEventCount streamName (eventCount : int) : StreamToken =
{ value = box { streamName = streamName; eventCount = eventCount }
version = int64 eventCount }
let (|Unpack|) (token: StreamToken) : Token = unbox<Token> token.value
/// Represent a stream known to be empty
let ofEmpty streamName initial = streamTokenOfIndex streamName -1, initial
let tokenOfArray streamName (value: 'event array) = Array.length value - 1 |> streamTokenOfIndex streamName
let tokenOfSeq streamName (value: 'event seq) = Seq.length value - 1 |> streamTokenOfIndex streamName
let ofEmpty streamName initial = streamTokenOfEventCount streamName 0, initial
let tokenOfArray streamName (value: 'event array) = Array.length value |> streamTokenOfEventCount streamName
let tokenOfSeq streamName (value: 'event seq) = Seq.length value |> streamTokenOfEventCount streamName
/// Represent a known array of events (without a known folded State)
let ofEventArray streamName fold initial (events: 'event array) = tokenOfArray streamName events, fold initial (Seq.ofArray events)
/// Represent a known array of Events together with the associated state
Expand All @@ -85,18 +85,18 @@ type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>,
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
let inline map i (e : FsCodec.IEventData<'Format>) =
FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp)
let encoded = events |> Seq.mapi (fun i e -> map (token.streamVersion + i + 1) (codec.Encode(context, e))) |> Array.ofSeq
let encoded = events |> Seq.mapi (fun i e -> map (token.eventCount + i) (codec.Encode(context, e))) |> Array.ofSeq
let trySyncValue currentValue =
if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion)
if Array.length currentValue <> token.eventCount then ConcurrentDictionarySyncResult.Conflict
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq)
match! store.TrySync(token.streamName, trySyncValue, encoded) with
| ConcurrentArraySyncResult.Written _ ->
return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events
| ConcurrentArraySyncResult.Conflict conflictingEvents ->
let resync = async {
let version = Token.tokenOfArray token.streamName conflictingEvents
let successorEvents = conflictingEvents |> Seq.skip (token.streamVersion + 1) |> List.ofSeq
return version, fold state (successorEvents |> Seq.choose codec.TryDecode) }
let token' = Token.tokenOfArray token.streamName conflictingEvents
let successorEvents = conflictingEvents |> Seq.skip token.eventCount |> List.ofSeq
return token', fold state (successorEvents |> Seq.choose codec.TryDecode) }
return SyncResult.Conflict resync }

type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
Expand Down
4 changes: 3 additions & 1 deletion src/Equinox.SqlStreamStore/SqlStreamStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,9 @@ module Token =
{ value = box {
stream = { name = streamName}
pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } }
version = streamVersion }
// In this impl, the StreamVersion matches the SqlStreamStore (and EventStore) StreamVersion in being -1-based
// Version however is the representation that needs to align with ISyncContext.Version
version = streamVersion + 1L }
/// No batching / compaction; we only need to retain the StreamVersion
let ofNonCompacting streamName streamVersion : StreamToken =
create None None streamName streamVersion
Expand Down
5 changes: 4 additions & 1 deletion src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ type ISyncContext<'state> =
/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via a Resolver's FromMemento method
abstract member CreateMemento : unit -> StreamToken * 'state

/// Exposes the underlying Store's internal Version/Index (which, depending on the Codec, may or may not be reflected in the last event presented)
/// Exposes the underlying Store's internal Version for the underlying stream.
/// An empty stream is Version 0; one with a single event is Version 1 etc.
/// It's important to consider that this Version is more authoritative than inspecting the `Index` of the last event passed to
/// your `fold` function - the codec may opt to ignore it
abstract member Version : int64

/// The present State of the stream within the context of this Flow
Expand Down

0 comments on commit a8a3fa5

Please sign in to comment.