Skip to content

Commit

Permalink
ISyncContext.Version fixes/changes as v2 #282
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 12, 2021
1 parent fc34509 commit 1655cb3
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 37 deletions.
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `Equinox`: change `createAttemptsExhaustedException` to allow any `exn`-derived `type` [#275](https://github.com/jet/equinox/pull/275)

<a name="2.5.1"></a>
## [2.5.1] - 2021-03-12

### Changed

- `ISyncContext.Version`: documented value as being `0`-based, rather than whatever a given store happens to use internally (which happens to align with the native version representation in `Equinox.Cosmos`) [#282](https://github.com/jet/equinox/pull/282)
- `MemoryStore` / `SqlStreamStore` / `EventStore`: aligned implementations to represent `Version` in a store-neutral manner per the documentation change [#282](https://github.com/jet/equinox/pull/282)

### Removed
### Fixed

- `Cosmos` / `ISyncContext.Version`: fixed erroneous `0` value when re-reading with caching but without snapshots in `Cosmos` store [#282](https://github.com/jet/equinox/pull/282)

<a name="2.5.0"></a>
## [2.5.0] - 2021-02-24

Expand Down Expand Up @@ -533,7 +546,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
(For information pertaining to earlier releases, see release notes in https://github.com/jet/equinox/releases and/or can someone please add it!)

[Unreleased]: https://github.com/jet/equinox/compare/3.0.0-beta.1...HEAD
[3.0.0-beta.1]: https://github.com/jet/equinox/compare/2.5.0...3.0.0-beta.1
[3.0.0-beta.1]: https://github.com/jet/equinox/compare/2.5.1...3.0.0-beta.1
[2.5.1]: https://github.com/jet/equinox/compare/2.5.0...2.5.1
[2.5.0]: https://github.com/jet/equinox/compare/2.4.0...2.5.0
[2.4.0]: https://github.com/jet/equinox/compare/2.3.0...2.4.0
[2.3.0]: https://github.com/jet/equinox/compare/2.3.0-rc2...2.3.0
Expand Down
10 changes: 6 additions & 4 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.
let execute clientId command : Async<unit> =
let decider = resolve clientId
decider.Transact(interpret command)
let read clientId : Async<Events.Favorited []> =
let decider = resolve clientId
decider.Query id

member __.Execute(clientId, command) =
execute clientId command
Expand All @@ -76,7 +73,12 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.
execute clientId (Command.Unfavorite sku)

member __.List clientId : Async<Events.Favorited []> =
read clientId
let decider = resolve clientId
decider.Query(id)

member __.ListWithVersion clientId : Async<int64 * Events.Favorited []> =
let decider = resolve clientId
decider.QueryEx(fun ctx -> ctx.Version, ctx.State)

let create log resolveStream =
let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3)
Expand Down
56 changes: 44 additions & 12 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,44 @@ let createServiceGes log context =
let cat = EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot)
Favorites.create log cat.Resolve

let createServiceCosmos log context =
let createServiceCosmosSnapshotsUncached log context =
let cat = CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot)
Favorites.create log cat.Resolve

let createServiceCosmosRollingState log context =
let createServiceCosmosRollingStateUncached log context =
let access = CosmosStore.AccessStrategy.RollingState Domain.Favorites.Fold.snapshot
let cat = CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, access)
Favorites.create log cat.Resolve

let createServiceCosmosUnoptimizedButCached log context =
let access = CosmosStore.AccessStrategy.Unoptimized
let caching =
let cache = Equinox.Cache ("name", 10)
CosmosStore.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let cat = CosmosStore.CosmosStoreCategory(context, codec, fold, initial, caching, access)
Favorites.create log cat.Resolve

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
let createLog () = createLogger testOutput

let act (service : Favorites.Service) (clientId, command) = async {
do! service.Execute(clientId, command)
let! items = service.List clientId
let! version, items = service.ListWithVersion clientId

match command with
| Domain.Favorites.Favorite (_,skuIds) ->
| Domain.Favorites.Favorite (_, skuIds) ->
test <@ skuIds |> List.forall (fun skuId -> items |> Array.exists (function { skuId = itemSkuId} -> itemSkuId = skuId)) @>
| _ ->
test <@ Array.isEmpty items @> }
| Domain.Favorites.Unfavorite _ ->
test <@ Array.isEmpty items @>
return version, items }

[<AutoData>]
let ``Can roundtrip in Memory, correctly folding the events`` args = Async.RunSynchronously <| async {
let log, store = createLog (), createMemoryStore ()
let service = createServiceMemory log store
do! act service args
let! version, items = act service args
version =! items.LongLength
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
Expand All @@ -55,21 +65,43 @@ type Tests(testOutputHelper) =
let! client = connectToLocalEventStoreNode log
let context = createContext client defaultBatchSize
let service = createServiceGes log context
do! act service args
let! version, items = act service args
version =! items.LongLength
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with caching`` (clientId, cmd) = Async.RunSynchronously <| async {
let log = createLog ()
let context = createPrimaryContext log defaultQueryMaxItems
let service = createServiceCosmosUnoptimizedButCached log context
let clientId = clientId () // generate a fresh one per test so repeated runs start from a stable base
let! version, items = act service (clientId, cmd)
version =! items.LongLength

// Validate consecutive Unoptimized Cached reads yield the correct Version
// TODO represent this more directly as a Cosmos Integration test
let service2 = createServiceCosmosUnoptimizedButCached log context
let! rereadVersion, items = service2.ListWithVersion clientId
rereadVersion =! version
rereadVersion =! items.LongLength
let! rereadVersion2, items = service2.ListWithVersion clientId
rereadVersion2 =! version
rereadVersion2 =! items.LongLength
}
[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events`` args = Async.RunSynchronously <| async {
let log = createLog ()
let context = createPrimaryContext log defaultQueryMaxItems
let service = createServiceCosmos log context
do! act service args
let service = createServiceCosmosSnapshotsUncached log context
let! version, items = act service args
version =! items.LongLength
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with rolling unfolds`` args = Async.RunSynchronously <| async {
let log = createLog ()
let context = createPrimaryContext log defaultQueryMaxItems
let service = createServiceCosmosRollingState log context
do! act service args
let service = createServiceCosmosRollingStateUncached log context
let! version, _items = act service args
version =! 0L
}
4 changes: 3 additions & 1 deletion src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ module Token =
{ value = box {
stream = { name = streamName}
pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } }
version = streamVersion }
// In this impl, the StreamVersion matches the EventStore StreamVersion in being -1-based
// Version however is the representation that needs to align with ISyncContext.Version
version = streamVersion + 1L }

/// No batching / compaction; we only need to retain the StreamVersion
let ofNonCompacting streamName streamVersion : StreamToken =
Expand Down
34 changes: 17 additions & 17 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ open Equinox.Core
open System.Runtime.InteropServices

/// Equivalent to EventStoreDB's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
exception private WrongVersionException of streamName: string * expected: int * value: obj
exception private WrongVersionException of value : obj

/// Internal result used to reflect the outcome of syncing with the entry in the inner ConcurrentDictionary
[<NoEquality; NoComparison>]
type ConcurrentDictionarySyncResult<'t> = Written of 't | Conflict of int
type ConcurrentDictionarySyncResult<'t> = Written of 't | Conflict

/// Response type for VolatileStore.TrySync to communicate the outcome and updated state of a stream
[<NoEquality; NoComparison>]
Expand Down Expand Up @@ -46,30 +46,30 @@ type VolatileStore<'Format>() =
events: FsCodec.ITimelineEvent<'Format>[])
: Async<ConcurrentArraySyncResult<FsCodec.ITimelineEvent<'Format>[]>> = async {
let seedStream _streamName = events
let updateValue streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
let updateValue _streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
match trySyncValue currentValue with
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue)
| ConcurrentDictionarySyncResult.Conflict -> raise <| WrongVersionException (box currentValue)
| ConcurrentDictionarySyncResult.Written value -> value
try let res = streams.AddOrUpdate(streamName, seedStream, updateValue)
// we publish the event here, once, as `updateValue` can be invoked multiple times
do! publishCommit.Execute((FsCodec.StreamName.parse streamName, events))
return Written res
with WrongVersionException(_, _, conflictingValue) ->
with WrongVersionException conflictingValue ->
return Conflict (unbox conflictingValue) }

type Token = { streamVersion: int; streamName: string }
type Token = { streamName : string; eventCount : int }

/// Internal implementation detail of MemoryStore
module private Token =

let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken =
{ value = box { streamName = streamName; streamVersion = streamVersion }
version = int64 streamVersion }
let private streamTokenOfEventCount streamName (eventCount : int) : StreamToken =
{ value = box { streamName = streamName; eventCount = eventCount }
version = int64 eventCount }
let (|Unpack|) (token: StreamToken) : Token = unbox<Token> token.value
/// Represent a stream known to be empty
let ofEmpty streamName initial = streamTokenOfIndex streamName -1, initial
let tokenOfArray streamName (value: 'event array) = Array.length value - 1 |> streamTokenOfIndex streamName
let tokenOfSeq streamName (value: 'event seq) = Seq.length value - 1 |> streamTokenOfIndex streamName
let ofEmpty streamName initial = streamTokenOfEventCount streamName 0, initial
let tokenOfArray streamName (value: 'event array) = Array.length value |> streamTokenOfEventCount streamName
let tokenOfSeq streamName (value: 'event seq) = Seq.length value |> streamTokenOfEventCount streamName
/// Represent a known array of events (without a known folded State)
let ofEventArray streamName fold initial (events: 'event array) = tokenOfArray streamName events, fold initial (Seq.ofArray events)
/// Represent a known array of Events together with the associated state
Expand All @@ -85,18 +85,18 @@ type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>,
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
let inline map i (e : FsCodec.IEventData<'Format>) =
FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp)
let encoded = events |> Seq.mapi (fun i e -> map (token.streamVersion + i + 1) (codec.Encode(context, e))) |> Array.ofSeq
let encoded = events |> Seq.mapi (fun i e -> map (token.eventCount + i) (codec.Encode(context, e))) |> Array.ofSeq
let trySyncValue currentValue =
if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion)
if Array.length currentValue <> token.eventCount then ConcurrentDictionarySyncResult.Conflict
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq)
match! store.TrySync(token.streamName, trySyncValue, encoded) with
| ConcurrentArraySyncResult.Written _ ->
return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events
| ConcurrentArraySyncResult.Conflict conflictingEvents ->
let resync = async {
let version = Token.tokenOfArray token.streamName conflictingEvents
let successorEvents = conflictingEvents |> Seq.skip (token.streamVersion + 1) |> List.ofSeq
return version, fold state (successorEvents |> Seq.choose codec.TryDecode) }
let token' = Token.tokenOfArray token.streamName conflictingEvents
let successorEvents = conflictingEvents |> Seq.skip token.eventCount |> List.ofSeq
return token', fold state (successorEvents |> Seq.choose codec.TryDecode) }
return SyncResult.Conflict resync }

type MemoryStoreCategory<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
Expand Down
4 changes: 3 additions & 1 deletion src/Equinox.SqlStreamStore/SqlStreamStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,9 @@ module Token =
{ value = box {
stream = { name = streamName}
pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } }
version = streamVersion }
// In this impl, the StreamVersion matches the SqlStreamStore (and EventStore) StreamVersion in being -1-based
// Version however is the representation that needs to align with ISyncContext.Version
version = streamVersion + 1L }
/// No batching / compaction; we only need to retain the StreamVersion
let ofNonCompacting streamName streamVersion : StreamToken =
create None None streamName streamVersion
Expand Down
5 changes: 4 additions & 1 deletion src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ type ISyncContext<'state> =
/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via a Resolver's FromMemento method
abstract member CreateMemento : unit -> StreamToken * 'state

/// Exposes the underlying Store's internal Version/Index (which, depending on the Codec, may or may not be reflected in the last event presented)
/// Exposes the underlying Store's internal Version for the underlying stream.
/// An empty stream is Version 0; one with a single event is Version 1 etc.
/// It's important to consider that this Version is more authoritative than inspecting the `Index` of the last event passed to
/// your `fold` function - the codec may opt to ignore it
abstract member Version : int64

/// The present State of the stream within the context of this Flow
Expand Down

0 comments on commit 1655cb3

Please sign in to comment.