Skip to content

Commit

Permalink
Clarify GES token management
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 5, 2017
1 parent a9437f6 commit f684211
Showing 1 changed file with 39 additions and 35 deletions.
74 changes: 39 additions & 35 deletions src/Foldunk.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ module private Read =
"{ExternalCall} {Action} {Stream} {Version} {SliceLength} {TotalPayloadSize} {Latency}",
true, action, streamName, startPos, batchSize, payloadSize, t.Elapsed)
return slice }
let readBatches (log : Serilog.ILogger) (readSlice : Serilog.ILogger -> int -> Async<StreamEventsSlice>)
(maxPermittedBatchReads : int option) (startPosition : int) : AsyncSeq<int option * ResolvedEvent[]> =
let readBatches (log : Serilog.ILogger) (readSlice : Serilog.ILogger -> int -> Async<StreamEventsSlice>) (maxPermittedBatchReads : int option) (startPosition : int)
: AsyncSeq<int option * ResolvedEvent[]> =
let rec loop batchCount pos = asyncSeq {
match maxPermittedBatchReads with
| Some mpbr when batchCount >= mpbr -> invalidOp "batch Limit exceeded"
Expand All @@ -55,7 +55,8 @@ module private Read =
yield! loop (batchCount + 1) slice.NextEventNumber
| x -> raise <| System.ArgumentOutOfRangeException("SliceReadStatus", x, "Unknown result value") }
loop 0 startPosition
let loadForwardsFrom conn batchSize maxPermittedBatchReads streamName log startPosition : Async<int * ResolvedEvent[]> = async {
let loadForwardsFrom conn batchSize maxPermittedBatchReads streamName log startPosition
: Async<int * ResolvedEvent[]> =
let mergeBatches (batches: AsyncSeq<int option * ResolvedEvent[]>) = async {
let versionFromStream = ref None
let! (events : ResolvedEvent[]) =
Expand All @@ -67,51 +68,54 @@ module private Read =
return version, events }
let readSlice log streamPos = loggedReadSlice log conn streamName Direction.Forward batchSize streamPos
let batches : AsyncSeq<int option * ResolvedEvent[]> = readBatches log readSlice maxPermittedBatchReads startPosition
return! mergeBatches batches }
mergeBatches batches

module private GesEventSumAdapters =
open Foldunk.EventSum
let encodedEventOfResolvedEvent (x : ResolvedEvent) =
module private EventSumAdapters =
let private encodedEventOfResolvedEvent (x : ResolvedEvent) : EventSum.EncodedEvent<byte[]> =
{ EventType = x.Event.EventType; Payload = x.Event.Data }
let private encodeBatchToEventDataBatch (events : EncodedEvent<byte[]> seq) : EventData[] =
let eventDataOfEncodedEvent x = EventData(Guid.NewGuid(), x.EventType, (*isJson*) true, x.Payload, [||])
[| for e in events -> eventDataOfEncodedEvent e |]
let encodeEvents (codec : EventSum.IEventSumEncoder<'event, byte[]>) (events : 'event seq) : EventData[] =
events |> Seq.map codec.Encode |> encodeBatchToEventDataBatch
let decodeKnownEvents (codec : EventSum.IEventSumEncoder<'event, byte[]>) (events : ResolvedEvent[]) : 'event seq =
events |> Seq.map encodedEventOfResolvedEvent |> Seq.choose codec.TryDecode
let private eventDataOfEncodedEvent (x : EventSum.EncodedEvent<byte[]>) =
EventData(Guid.NewGuid(), x.EventType, (*isJson*) true, x.Payload, [||])
let encodeEvents (codec : EventSum.IEventSumEncoder<'event, byte[]>) (xs : 'event seq) : EventData[] =
xs |> Seq.map codec.Encode |> Seq.map eventDataOfEncodedEvent |> Seq.toArray
let decodeKnownEvents (codec : EventSum.IEventSumEncoder<'event, byte[]>) (xs : ResolvedEvent[]) : 'event seq =
xs |> Seq.map encodedEventOfResolvedEvent |> Seq.choose codec.TryDecode

type Token = { streamVersion: int }
module private Token =
let private create version : Token =
{ streamVersion = version }
let ofVersion version : Token =
create version

type GesStreamStore(conn, batchSize, ?maxPermittedBatchReads) =
member __.Load streamName log : Async<int * ResolvedEvent[]> =
Read.loadForwardsFrom conn batchSize maxPermittedBatchReads streamName log 0
member __.LoadFromVersion streamName log version : Async<int * ResolvedEvent[]> =
Read.loadForwardsFrom conn batchSize maxPermittedBatchReads streamName log version
member __.TrySync streamName log version (encodedEvents: EventData array) : Async<Result<int, unit>> = async {
member __.LoadBatched streamName log : Async<Token * ResolvedEvent[]> = async {
let! version, events = Read.loadForwardsFrom conn batchSize maxPermittedBatchReads streamName log 0
return Token.ofVersion version, events }
member __.LoadFromToken streamName log token : Async<Token * ResolvedEvent[]> = async {
let! version, events = Read.loadForwardsFrom conn batchSize maxPermittedBatchReads streamName log (token.streamVersion + 1)
return Token.ofVersion version, events }
member __.TrySync streamName log token (encodedEvents: EventData array) : Async<Result<Token, unit>> = async {
try
let! wr = Write.loggedWriteEvents conn log streamName version encodedEvents
return Ok wr.NextExpectedVersion
let! wr = Write.loggedWriteEvents conn log streamName token.streamVersion encodedEvents
return Ok (Token.ofVersion wr.NextExpectedVersion)
with :? EventStore.ClientAPI.Exceptions.WrongExpectedVersionException as ex ->
log.Information(ex, "TrySync WrongExpectedVersionException")
return Error () }

type GesToken = { streamVersion: int }

type GesEventStreamAdapter<'state, 'event>(store : GesStreamStore, codec : EventSum.IEventSumEncoder<'event, byte[]>) =
let tokenOfVersion version = box { streamVersion = version }

interface IEventStream<'state, 'event> with
interface IEventStream<'state,'event> with
member __.Load streamName log : Async<StreamState<'state, 'event>> = async {
let! version, events = store.Load streamName log
return GesEventSumAdapters.decodeKnownEvents codec events |> StreamState.ofTokenAndEvents (tokenOfVersion version) }
let! token, events = store.LoadBatched streamName log
return EventSumAdapters.decodeKnownEvents codec events |> StreamState.ofTokenAndEvents (box token) }
member __.TrySync streamName log (token, snapshotState) (events : 'event list, proposedState: 'state) = async {
let encodedEvents : EventData[] = GesEventSumAdapters.encodeEvents codec events
let! syncRes = store.TrySync streamName log (unbox token).streamVersion encodedEvents
let encodedEvents : EventData[] = EventSumAdapters.encodeEvents codec events
let! syncRes = store.TrySync streamName log (unbox token) encodedEvents
match syncRes with
| Error () ->
let resync = async {
let! token, events = store.LoadFromVersion streamName log ((unbox token).streamVersion + 1)
let successorEvents = GesEventSumAdapters.decodeKnownEvents codec events |> List.ofSeq
return StreamState.ofTokenSnapshotAndEvents (tokenOfVersion token) snapshotState successorEvents }
let! token', events = store.LoadFromToken streamName log (unbox token)
let successorEvents = EventSumAdapters.decodeKnownEvents codec events |> List.ofSeq
return StreamState.ofTokenSnapshotAndEvents (box token') snapshotState successorEvents }
return Error resync
| Ok version' ->
return Ok (StreamState.ofTokenAndKnownState (tokenOfVersion version') proposedState) }
| Ok token' ->
return Ok (StreamState.ofTokenAndKnownState (box token') proposedState) }

0 comments on commit f684211

Please sign in to comment.