diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d3d4f845..0ee4032ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,18 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Removed ### Fixed + +## [2.4.0] - 2020-12-03 + +### Added + +- `Cosmos`: Prometheus integration package as `master` [#266](https://github.com/jet/equinox/pull/266) [#267](https://github.com/jet/equinox/pull/267) + +### Fixed + +- `MemoryStore`: Serialize `Committed` events to guarantee consumption in event `Index` order re [#265](https://github.com/jet/equinox/issues/265) [#269](https://github.com/jet/equinox/pull/269) pray: [@fnipo](https://github.com/fnipo) +- `Cosmos`: Fix defaulting for `compressUnfolds` in C# [#261](https://github.com/jet/equinox/pull/261) + ## [2.3.0] - 2020-11-04 @@ -437,7 +449,8 @@ The `Unreleased` section name is replaced by the expected version of next releas (For information pertaining to earlier releases, see release notes in https://github.com/jet/equinox/releases and/or can someone please add it!) -[Unreleased]: https://github.com/jet/equinox/compare/2.3.0...HEAD +[Unreleased]: https://github.com/jet/equinox/compare/2.4.0...HEAD +[2.4.0]: https://github.com/jet/equinox/compare/2.3.0...2.4.0 [2.3.0]: https://github.com/jet/equinox/compare/2.3.0-rc2...2.3.0 [2.3.0-rc2]: https://github.com/jet/equinox/compare/2.3.0-rc1...2.3.0-rc2 [2.3.0-rc1]: https://github.com/jet/equinox/compare/2.2.0...2.3.0-rc1 diff --git a/Equinox.sln b/Equinox.sln index 95fa8ec40..1632e0172 100644 --- a/Equinox.sln +++ b/Equinox.sln @@ -76,6 +76,8 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.SqlStreamStore.Post EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Core", "src\Equinox.Core\Equinox.Core.fsproj", "{3021659A-5CA4-4E06-AF00-2457ED3F105B}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Cosmos.Prometheus", "src\Equinox.Cosmos.Prometheus\Equinox.Cosmos.Prometheus.fsproj", "{BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -182,6 +184,10 @@ Global {3021659A-5CA4-4E06-AF00-2457ED3F105B}.Debug|Any CPU.Build.0 = Debug|Any CPU {3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.ActiveCfg = Release|Any CPU {3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.Build.0 = Release|Any CPU + {BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/build.proj b/build.proj index d822cc84b..3e12b8e97 100644 --- a/build.proj +++ b/build.proj @@ -17,6 +17,7 @@ + diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs index 534a66de2..25d67d157 100644 --- a/samples/Store/Integration/LogIntegration.fs +++ b/samples/Store/Integration/LogIntegration.fs @@ -27,22 +27,22 @@ module EquinoxCosmosInterop = [] type FlatMetric = { action: string; stream : string; interval: StopwatchInterval; bytes: int; count: int; responses: int option; ru: float } with override __.ToString() = sprintf "%s-Stream=%s %s-Elapsed=%O Ru=%O" __.action __.stream __.action __.interval.Elapsed __.ru - let flatten (evt : Log.Event) : FlatMetric = + let flatten (evt : Log.Metric) : FlatMetric = let action, metric, batches, ru = match evt with - | Log.Tip m -> "CosmosTip", m, None, m.ru - | Log.TipNotFound m -> "CosmosTip404", m, None, m.ru - | Log.TipNotModified m -> "CosmosTip302", m, None, m.ru - | Log.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru - | Log.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru - | Log.Response (Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru - | Log.Response (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru - | Log.SyncSuccess m -> "CosmosSync200", m, None, m.ru - | Log.SyncConflict m -> "CosmosSync409", m, None, m.ru - | Log.SyncResync m -> "CosmosSyncResync", m, None, m.ru - | Log.PruneResponse m -> "CosmosPruneResponse", m, None, m.ru - | Log.Delete m -> "CosmosDelete", m, None, m.ru - | Log.Prune (events, m) -> "CosmosPrune", m, Some events, m.ru + | Log.Metric.Tip m -> "CosmosTip", m, None, m.ru + | Log.Metric.TipNotFound m -> "CosmosTip404", m, None, m.ru + | Log.Metric.TipNotModified m -> "CosmosTip302", m, None, m.ru + | Log.Metric.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru + | Log.Metric.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru + | Log.Metric.QueryResponse(Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru + | Log.Metric.QueryResponse (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru + | Log.Metric.SyncSuccess m -> "CosmosSync200", m, None, m.ru + | Log.Metric.SyncConflict m -> "CosmosSync409", m, None, m.ru + | Log.Metric.SyncResync m -> "CosmosSyncResync", m, None, m.ru + | Log.Metric.PruneResponse m -> "CosmosPruneResponse", m, None, m.ru + | Log.Metric.Delete m -> "CosmosDelete", m, None, m.ru + | Log.Metric.Prune (events, m) -> "CosmosPrune", m, Some events, m.ru { action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; responses = batches interval = StopwatchInterval(metric.interval.StartTicks,metric.interval.EndTicks); ru = ru } @@ -65,7 +65,7 @@ type SerilogMetricsExtractor(emit : string -> unit) = logEvent.Properties |> Seq.tryPick (function | KeyValue (k, SerilogScalar (:? Equinox.EventStore.Log.Event as m)) -> Some <| Choice1Of3 (k,m) - | KeyValue (k, SerilogScalar (:? Equinox.Cosmos.Store.Log.Event as m)) -> Some <| Choice2Of3 (k,m) + | KeyValue (k, SerilogScalar (:? Equinox.Cosmos.Store.Log.Metric as m)) -> Some <| Choice2Of3 (k,m) | _ -> None) |> Option.defaultValue (Choice3Of3 ()) let handleLogEvent logEvent = diff --git a/samples/Tutorial/Counter.fsx b/samples/Tutorial/Counter.fsx index 9cd5128cf..1c633f137 100644 --- a/samples/Tutorial/Counter.fsx +++ b/samples/Tutorial/Counter.fsx @@ -94,8 +94,6 @@ let store = Equinox.MemoryStore.VolatileStore() let _ = store.Committed.Subscribe(fun (s, xs) -> logEvents s xs) let codec = FsCodec.Box.Codec.Create() let resolve = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve -open Serilog -let log = LoggerConfiguration().WriteTo.Console().CreateLogger() let service = Service(log, resolve, maxAttempts=3) let clientId = "ClientA" service.Read(clientId) |> Async.RunSynchronously diff --git a/src/Equinox.Cosmos.Prometheus/CosmosPrometheus.fs b/src/Equinox.Cosmos.Prometheus/CosmosPrometheus.fs new file mode 100644 index 000000000..de991e86d --- /dev/null +++ b/src/Equinox.Cosmos.Prometheus/CosmosPrometheus.fs @@ -0,0 +1,116 @@ +namespace Equinox.CosmosStore.Prometheus + +module private Impl = + + let baseName stat = "equinox_" + stat + let baseDesc desc = "Equinox CosmosDB " + desc + +module private Histograms = + + let labelNames = [| "facet"; "op"; "app"; "db"; "con"; "cat" |] + let private mkHistogram (cfg : Prometheus.HistogramConfiguration) name desc = + let h = Prometheus.Metrics.CreateHistogram(name, desc, cfg) + fun (facet : string, op : string) app (db, con, cat : string) s -> + h.WithLabels(facet, op, app, db, con, cat).Observe(s) + // Given we also have summary metrics with equivalent labels, we focus the bucketing on LAN latencies + let private sHistogram = + let sBuckets = [| 0.0005; 0.001; 0.002; 0.004; 0.008; 0.016; 0.5; 1.; 2.; 4.; 8. |] + let sCfg = Prometheus.HistogramConfiguration(Buckets = sBuckets, LabelNames = labelNames) + mkHistogram sCfg + let private ruHistogram = + let ruBuckets = Prometheus.Histogram.ExponentialBuckets(1., 2., 11) // 1 .. 1024 + let ruCfg = Prometheus.HistogramConfiguration(Buckets = ruBuckets, LabelNames = labelNames) + mkHistogram ruCfg + let sAndRuPair stat desc = + let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc + let observeS = sHistogram (baseName + "_seconds") (baseDesc + " latency") + let observeRu = ruHistogram (baseName + "_ru") (baseDesc + " charge") + fun (facet, op) app (db, con, cat, s : System.TimeSpan, ru) -> + observeS (facet, op) app (db, con, cat) s.TotalSeconds + observeRu (facet, op) app (db, con, cat) ru + +module private Summaries = + + let labelNames = [| "facet"; "app"; "db"; "con" |] + let private mkSummary (cfg : Prometheus.SummaryConfiguration) name desc = + let s = Prometheus.Metrics.CreateSummary(name, desc, cfg) + fun (facet : string) app (db, con) o -> s.WithLabels(facet, app, db, con).Observe(o) + let config = + let inline qep q e = Prometheus.QuantileEpsilonPair(q, e) + let objectives = [| qep 0.50 0.05; qep 0.95 0.01; qep 0.99 0.01 |] + Prometheus.SummaryConfiguration(Objectives = objectives, LabelNames = labelNames, MaxAge = System.TimeSpan.FromMinutes 1.) + let sAndRuPair stat desc = + let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc + let observeS = mkSummary config (baseName + "_seconds") (baseDesc + " latency") + let observeRu = mkSummary config (baseName + "_ru") (baseDesc + " charge") + fun facet app (db, con, s : System.TimeSpan, ru) -> + observeS facet app (db, con) s.TotalSeconds + observeRu facet app (db, con) ru + +module private Counters = + + let labelNames = [| "facet"; "op"; "outcome"; "app"; "db"; "con"; "cat" |] + let private mkCounter (cfg : Prometheus.CounterConfiguration) name desc = + let h = Prometheus.Metrics.CreateCounter(name, desc, cfg) + fun (facet : string, op : string, outcome : string) app (db, con, cat) c -> + h.WithLabels(facet, op, outcome, app, db, con, cat).Inc(c) + let config = Prometheus.CounterConfiguration(LabelNames = labelNames) + let total stat desc = + let name = Impl.baseName (stat + "_total") + let desc = Impl.baseDesc desc + mkCounter config name desc + let eventsAndBytesPair stat desc = + let observeE = total (stat + "_events") (desc + "Events") + let observeB = total (stat + "_bytes") (desc + "Bytes") + fun ctx app (db, con, cat, e, b) -> + observeE ctx app (db, con, cat) e + match b with None -> () | Some b -> observeB ctx app (db, con, cat) b + +module private Stats = + + let opHistogram = Histograms.sAndRuPair "op" "Operation" + let roundtripHistogram = Histograms.sAndRuPair "roundtrip" "Fragment" + let opSummary = Summaries.sAndRuPair "op_summary" "Operation Summary" + let roundtripSummary = Summaries.sAndRuPair "roundtrip_summary" "Fragment Summary" + let payloadCounters = Counters.eventsAndBytesPair "payload" "Payload, " + let cacheCounter = Counters.total "cache" "Cache" + + let observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru) = + opHistogram (facet, op) app (db, con, cat, s, ru) + opSummary facet app (db, con, s, ru) + let observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, count, bytes) = + observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru) + payloadCounters (facet, op, outcome) app (db, con, cat, float count, if bytes = -1 then None else Some (float bytes)) + + let inline (|CatSRu|) ({ interval = i; ru = ru } : Equinox.Cosmos.Store.Log.Measurement as m) = + let cat, _id = FsCodec.StreamName.splitCategoryAndId (FSharp.UMX.UMX.tag m.stream) + m.database, m.container, cat, i.Elapsed, ru + let observeRes (facet, _op as stat) app (CatSRu (db, con, cat, s, ru)) = + roundtripHistogram stat app (db, con, cat, s, ru) + roundtripSummary facet app (db, con, s, ru) + let observe_ stat app (CatSRu (db, con, cat, s, ru)) = + observeLatencyAndCharge stat app (db, con, cat, s, ru) + let observe (facet, op, outcome) app (CatSRu (db, con, cat, s, ru) as m) = + observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes) + let observeTip (facet, op, outcome, cacheOutcome) app (CatSRu (db, con, cat, s, ru) as m) = + observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes) + cacheCounter (facet, op, cacheOutcome) app (db, con, cat) 1. + +open Equinox.Cosmos.Store.Log + +type LogSink(app) = + interface Serilog.Core.ILogEventSink with + member __.Emit logEvent = logEvent |> function + | MetricEvent cm -> cm |> function + | Op (Operation.Tip, m) -> Stats.observeTip ("query", "tip", "ok", "200") app m + | Op (Operation.Tip404, m) -> Stats.observeTip ("query", "tip", "ok", "404") app m + | Op (Operation.Tip302, m) -> Stats.observeTip ("query", "tip", "ok", "302") app m + | Op (Operation.Query, m) -> Stats.observe ("query", "query", "ok") app m + | QueryRes (_direction, m) -> Stats.observeRes ("query", "queryPage") app m + | Op (Operation.Write, m) -> Stats.observe ("transact", "sync", "ok") app m + | Op (Operation.Conflict, m) -> Stats.observe ("transact", "conflict", "conflict") app m + | Op (Operation.Resync, m) -> Stats.observe ("transact", "resync", "conflict") app m + | Op (Operation.Prune, m) -> Stats.observe_ ("prune", "pruneQuery") app m + | PruneRes ( m) -> Stats.observeRes ("prune", "pruneQueryPage") app m + | Op (Operation.Delete, m) -> Stats.observe ("prune", "delete", "ok") app m + | _ -> () diff --git a/src/Equinox.Cosmos.Prometheus/Equinox.Cosmos.Prometheus.fsproj b/src/Equinox.Cosmos.Prometheus/Equinox.Cosmos.Prometheus.fsproj new file mode 100644 index 000000000..38fd20cb6 --- /dev/null +++ b/src/Equinox.Cosmos.Prometheus/Equinox.Cosmos.Prometheus.fsproj @@ -0,0 +1,30 @@ + + + + netstandard2.0;net461 + 5 + false + true + true + $(DefineConstants);NET461 + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 044c671bc..ccdfafffe 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -206,7 +206,7 @@ type internal Enum() = Enum.Events(b.i, b.e, startPos, direction) |> if direction = Direction.Backward then System.Linq.Enumerable.Reverse else id static member Unfolds(xs: Unfold[]) : ITimelineEvent seq = seq { - for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, x.d, x.m, Guid.Empty, null, null, x.t, isUnfold=true) } + for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, x.d, x.m, Guid.Empty, null, null, x.t, isUnfold = true) } static member EventsAndUnfolds(x: Tip): ITimelineEvent seq = Enum.Events x |> Seq.append (Enum.Unfolds x.u) @@ -216,44 +216,50 @@ type internal Enum() = type IRetryPolicy = abstract member Execute: (int -> Async<'T>) -> Async<'T> module Log = + [] - type Measurement = { stream: string; interval: StopwatchInterval; bytes: int; count: int; ru: float } - [] - type Event = + type Measurement = + { database: string; container: string; stream: string + interval: StopwatchInterval; bytes: int; count: int; ru: float } + [] + type Metric = /// Individual read request for the Tip | Tip of Measurement /// Individual read request for the Tip, not found | TipNotFound of Measurement /// Tip read with Single RU Request Charge due to correct use of etag in cache | TipNotModified of Measurement + /// Summarizes a set of Responses for a given Read request | Query of Direction * responses: int * Measurement /// Individual read request in a Batch - /// Charges are rolled up into Query (so do not double count) - | Response of Direction * Measurement + /// Charges are rolled up into Query Metric (so do not double count) + | QueryResponse of Direction * Measurement + | SyncSuccess of Measurement | SyncResync of Measurement | SyncConflict of Measurement + + /// Summarizes outcome of request to trim batches from head of a stream and events in Tip + /// Count in Measurement is number of batches (documents) deleted + /// Bytes in Measurement is number of events deleted + | Prune of responsesHandled : int * Measurement /// Handled response from listing of batches in a stream - /// Charges are rolled up into Prune (so do not double count) + /// Charges are rolled up into the Prune Metric (so do not double count) | PruneResponse of Measurement /// Deleted an individual Batch | Delete of Measurement - /// Pruned batches from head of a stream - /// Count in Measurement is number of batches (documents) deleted - /// Bytes in Measurement is number of events deleted - | Prune of responsesHandled : int * Measurement - let prop name value (log : ILogger) = log.ForContext(name, value) - let propData name (events: #IEventData seq) (log : ILogger) = + let internal prop name value (log : ILogger) = log.ForContext(name, value) + let internal propData name (events: #IEventData seq) (log : ILogger) = let render = function null -> "null" | bytes -> System.Text.Encoding.UTF8.GetString bytes let items = seq { for e in events do yield sprintf "{\"%s\": %s}" e.EventType (render e.Data) } log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items)) - let propEvents = propData "events" - let propDataUnfolds = Enum.Unfolds >> propData "unfolds" - let propStartPos (value : Position) log = prop "startPos" value.index log - let propStartEtag (value : Position) log = prop "startEtag" value.etag log + let internal propEvents = propData "events" + let internal propDataUnfolds = Enum.Unfolds >> propData "unfolds" + let internal propStartPos (value : Position) log = prop "startPos" value.index log + let internal propStartEtag (value : Position) log = prop "startEtag" value.etag log - let withLoggedRetries<'t> (retryPolicy: IRetryPolicy option) (contextLabel : string) (f : ILogger -> Async<'t>) log: Async<'t> = + let internal withLoggedRetries<'t> (retryPolicy: IRetryPolicy option) (contextLabel : string) (f : ILogger -> Async<'t>) log: Async<'t> = match retryPolicy with | None -> f log | Some retryPolicy -> @@ -261,75 +267,87 @@ module Log = let log = if count = 1 then log else log |> prop contextLabel count f log retryPolicy.Execute withLoggingContextWrapping - /// Attach a property to the log context to hold the metrics + /// Include a LogEvent property bearing metrics // Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124 - open Serilog.Events - let event (value : Event) (log : ILogger) = - let enrich (e : LogEvent) = e.AddPropertyIfAbsent(LogEventProperty("cosmosEvt", ScalarValue(value))) + let internal event (value : Metric) (log : ILogger) = + let enrich (e : Serilog.Events.LogEvent) = + e.AddPropertyIfAbsent(Serilog.Events.LogEventProperty("cosmosEvt", Serilog.Events.ScalarValue(value))) log.ForContext({ new Serilog.Core.ILogEventEnricher with member __.Enrich(evt,_) = enrich evt }) - let (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length - let (|EventLen|) (x: #IEventData<_>) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes + metaBytes + 80 - let (|BatchLen|) = Seq.sumBy (|EventLen|) + let internal (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length + let internal (|EventLen|) (x: #IEventData<_>) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes + metaBytes + 80 + let internal (|BatchLen|) = Seq.sumBy (|EventLen|) + let internal (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function + | (:? Serilog.Events.ScalarValue as x) -> Some x.Value + | _ -> None + let (|MetricEvent|_|) (logEvent : Serilog.Events.LogEvent) : Metric option = + match logEvent.Properties.TryGetValue("cosmosEvt") with + | true, SerilogScalar (:? Metric as e) -> Some e + | _ -> None + [] + type Operation = Tip | Tip404 | Tip302 | Query | Write | Resync | Conflict | Prune | Delete + let (|Op|QueryRes|PruneRes|) = function + | Metric.Tip s -> Op (Operation.Tip, s) + | Metric.TipNotFound s -> Op (Operation.Tip404, s) + | Metric.TipNotModified s -> Op (Operation.Tip302, s) + + | Metric.Query (_, _, s) -> Op (Operation.Query, s) + | Metric.QueryResponse (direction, s) -> QueryRes (direction, s) + + | Metric.SyncSuccess s -> Op (Operation.Write, s) + | Metric.SyncResync s -> Op (Operation.Resync, s) + | Metric.SyncConflict s -> Op (Operation.Conflict, s) + + | Metric.Prune (_, s) -> Op (Operation.Prune, s) + | Metric.PruneResponse s -> PruneRes s + | Metric.Delete s -> Op (Operation.Delete, s) /// NB Caveat emptor; this is subject to unlimited change without the major version changing - while the `dotnet-templates` repo will be kept in step, and /// the ChangeLog will mention changes, it's critical to not assume that the presence or nature of these helpers be considered stable module InternalMetrics = module Stats = - let inline (|Stats|) ({ interval = i; ru = ru }: Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds - - let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|CosmosDeleteRc|CosmosPruneRc|) = function - | Tip (Stats s) - | TipNotFound (Stats s) - | TipNotModified (Stats s) - | Query (_,_, (Stats s)) -> CosmosReadRc s - // slices are rolled up into batches so be sure not to double-count - | Response (_,(Stats s)) - // costs roll up into Prune operation so be sure not to double-count - | PruneResponse (Stats s) -> CosmosResponseRc s - | SyncSuccess (Stats s) - | SyncConflict (Stats s) -> CosmosWriteRc s - | SyncResync (Stats s) -> CosmosResyncRc s - | Delete (Stats s) -> CosmosDeleteRc s - | Prune (_, (Stats s)) -> CosmosPruneRc s - let (|SerilogScalar|_|) : LogEventPropertyValue -> obj option = function - | (:? ScalarValue as x) -> Some x.Value - | _ -> None - let (|CosmosMetric|_|) (logEvent : LogEvent) : Event option = - match logEvent.Properties.TryGetValue("cosmosEvt") with - | true, SerilogScalar (:? Event as e) -> Some e - | _ -> None - type Counter = - { mutable rux100: int64; mutable count: int64; mutable ms: int64 } - static member Create() = { rux100 = 0L; count = 0L; ms = 0L } - member __.Ingest (ru, ms) = - System.Threading.Interlocked.Increment(&__.count) |> ignore - System.Threading.Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore - System.Threading.Interlocked.Add(&__.ms, ms) |> ignore + + type internal Counter = + { mutable rux100: int64; mutable count: int64; mutable ms: int64 } + static member Create() = { rux100 = 0L; count = 0L; ms = 0L } + member __.Ingest (ru, ms) = + System.Threading.Interlocked.Increment(&__.count) |> ignore + System.Threading.Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore + System.Threading.Interlocked.Add(&__.ms, ms) |> ignore + let inline private (|RcMs|) ({ interval = i; ru = ru }: Measurement) = + ru, let e = i.Elapsed in int64 e.TotalMilliseconds type LogSink() = static let epoch = System.Diagnostics.Stopwatch.StartNew() - static member val Read = Counter.Create() with get, set - static member val Write = Counter.Create() with get, set - static member val Resync = Counter.Create() with get, set - static member val Delete = Counter.Create() with get, set - static member val Prune = Counter.Create() with get, set + static member val internal Read = Counter.Create() with get, set + static member val internal Write = Counter.Create() with get, set + static member val internal Resync = Counter.Create() with get, set + static member val internal Conflict = Counter.Create() with get, set + static member val internal Prune = Counter.Create() with get, set + static member val internal Delete = Counter.Create() with get, set static member Restart() = LogSink.Read <- Counter.Create() LogSink.Write <- Counter.Create() LogSink.Resync <- Counter.Create() - LogSink.Delete <- Counter.Create() + LogSink.Conflict <- Counter.Create() LogSink.Prune <- Counter.Create() + LogSink.Delete <- Counter.Create() let span = epoch.Elapsed epoch.Restart() span interface Serilog.Core.ILogEventSink with - member __.Emit logEvent = logEvent |> function - | CosmosMetric (CosmosReadRc stats) -> LogSink.Read.Ingest stats - | CosmosMetric (CosmosWriteRc stats) -> LogSink.Write.Ingest stats - | CosmosMetric (CosmosResyncRc stats) -> LogSink.Resync.Ingest stats - | CosmosMetric (CosmosDeleteRc stats) -> LogSink.Delete.Ingest stats - | CosmosMetric (CosmosPruneRc stats) -> LogSink.Prune.Ingest stats - | CosmosMetric (CosmosResponseRc _) -> () // Costs are already included in others + member __.Emit logEvent = + match logEvent with + | MetricEvent cm -> + match cm with + | Op ((Operation.Tip | Operation.Tip404 | Operation.Tip302 | Operation.Query), RcMs m) -> + LogSink.Read.Ingest m + | QueryRes (_direction, _) -> () + | Op (Operation.Write, RcMs m) -> LogSink.Write.Ingest m + | Op (Operation.Conflict, RcMs m) -> LogSink.Conflict.Ingest m + | Op (Operation.Resync, RcMs m) -> LogSink.Resync.Ingest m + | Op (Operation.Prune, RcMs m) -> LogSink.Prune.Ingest m + | PruneRes ( _) -> () + | Op (Operation.Delete, RcMs m) -> LogSink.Delete.Ingest m | _ -> () /// Relies on feeding of metrics from Log through to Stats.LogSink @@ -339,8 +357,9 @@ module Log = [ "Read", Stats.LogSink.Read "Write", Stats.LogSink.Write "Resync", Stats.LogSink.Resync - "Delete", Stats.LogSink.Delete - "Prune", Stats.LogSink.Prune ] + "Conflict", Stats.LogSink.Conflict + "Prune", Stats.LogSink.Prune + "Delete", Stats.LogSink.Delete ] let mutable rows, totalCount, totalRc, totalMs = 0, 0L, 0., 0L let logActivity name count rc lat = log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms", @@ -363,6 +382,8 @@ module Log = type Container(client : Client.DocumentClient, databaseId, containerId) = let collectionUri = Microsoft.Azure.Documents.Client.UriFactory.CreateDocumentCollectionUri(databaseId, containerId) member __.Client = client + member __.DatabaseId = databaseId + member __.ContainerId = containerId member __.CollectionUri = collectionUri [] @@ -507,12 +528,14 @@ function sync(req, expIndex, expEtag) { | [||] -> Result.ConflictUnknown newPos | xs -> Result.Conflict (newPos, Enum.Unfolds xs |> Array.ofSeq) } - let private logged (container,stream) (exp : Exp, req: Tip) (log : ILogger) + let private logged (container : Container, stream) (exp : Exp, req: Tip) (log : ILogger) : Async = async { let! t, (ru, result) = run (container,stream) (exp, req) |> Stopwatch.Time let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length let log = - let inline mkMetric ru : Log.Measurement = { stream = stream; interval = t; bytes = bytes; count = count; ru = ru } + let inline mkMetric ru : Log.Measurement = + { database = container.DatabaseId; container = container.ContainerId; stream = stream + interval = t; bytes = bytes; count = count; ru = ru } let inline propConflict log = log |> Log.prop "conflict" true |> Log.prop "eventTypes" (Seq.truncate 5 (seq { for x in req.e -> x.c })) let verbose = log.IsEnabled Serilog.Events.LogEventLevel.Debug (if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log) @@ -522,12 +545,12 @@ function sync(req, expIndex, expEtag) { | Exp.Any -> Log.prop "expectedVersion" -1 |> match result with | Result.Written pos -> - Log.prop "nextExpectedVersion" pos >> Log.event (Log.SyncSuccess (mkMetric ru)) + Log.prop "nextExpectedVersion" pos >> Log.event (Log.Metric.SyncSuccess (mkMetric ru)) | Result.ConflictUnknown pos' -> - Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.SyncConflict (mkMetric ru)) + Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.Metric.SyncConflict (mkMetric ru)) | Result.Conflict (pos', xs) -> (if verbose then Log.propData "conflicts" xs else id) - >> Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.SyncResync (mkMetric ru)) + >> Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.Metric.SyncResync (mkMetric ru)) log.Information("EqxCosmos {action:l} {stream} {count}+{ucount} {ms:f1}ms {ru}RU {bytes:n0}b {exp}", "Sync", stream, count, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru, bytes, exp) return result } @@ -538,7 +561,7 @@ function sync(req, expIndex, expEtag) { let private mkEvent (e : IEventData<_>) = { t = e.Timestamp; c = e.EventType; d = e.Data; m = e.Meta; correlationId = e.CorrelationId; causationId = e.CausationId } - let mkBatch (stream: string) (events: IEventData<_>[]) unfolds: Tip = + let mkBatch (stream: string) (events: IEventData<_>[]) unfolds : Tip = { p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null e = Array.map mkEvent events; u = Array.ofSeq unfolds } let mkUnfold compressor baseIndex (unfolds: IEventData<_> seq) : Unfold seq = @@ -627,19 +650,20 @@ module internal Tip = let ac = match maybePos with Some { etag=Some etag } -> Client.AccessCondition(Type=Client.AccessConditionType.IfNoneMatch, Condition=etag) | _ -> null let ro = Client.RequestOptions(PartitionKey=PartitionKey stream, AccessCondition = ac) container.TryReadItem(Tip.WellKnownDocumentId, ro) - let private loggedGet (get : Container * string -> Position option -> Async<_>) (container,stream) (maybePos: Position option) (log: ILogger) = async { + let private loggedGet (get : Container * string -> Position option -> Async<_>) (container : Container, stream) (maybePos: Position option) (log: ILogger) = async { let log = log |> Log.prop "stream" stream let! t, (ru, res : ReadResult) = get (container,stream) maybePos |> Stopwatch.Time - let log bytes count (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream; interval = t; bytes = bytes; count = count; ru = ru }) + let log bytes count (f : Log.Measurement -> _) = + log |> Log.event (f { database = container.DatabaseId; container = container.ContainerId; stream = stream; interval = t; bytes = bytes; count = count; ru = ru }) match res with | ReadResult.NotModified -> - (log 0 0 Log.TipNotModified).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru) + (log 0 0 Log.Metric.TipNotModified).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.NotFound -> - (log 0 0 Log.TipNotFound).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru) + (log 0 0 Log.Metric.TipNotFound).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.Found tip -> let log = let (Log.BatchLen bytes), count = Enum.Unfolds tip.u, tip.u.Length - log bytes count Log.Tip + log bytes count Log.Metric.Tip let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataUnfolds tip.u let log = match maybePos with Some p -> log |> Log.propStartPos p |> Log.propStartEtag p | None -> log let log = log |> Log.prop "_etag" tip._etag |> Log.prop "n" tip.n @@ -654,7 +678,7 @@ module internal Tip = | ReadResult.NotFound -> return Result.NotFound | ReadResult.Found tip -> return Result.Found (Position.fromTip tip, Enum.EventsAndUnfolds tip |> Array.ofSeq) } - module internal Query = +module internal Query = open Microsoft.Azure.Documents.Linq open FSharp.Control let private mkQuery (container : Container, stream : string) maxItems (direction: Direction) startPos = @@ -670,15 +694,15 @@ module internal Tip = container.Client.CreateDocumentQuery(container.CollectionUri, query, qro).AsDocumentQuery() // Unrolls the Batches in a response - note when reading backwards, the events are emitted in reverse order of index - let private handleResponse direction (streamName: string) startPos (query: IDocumentQuery) (log: ILogger) + let private handleResponse direction (container : Container, streamName: string) startPos (query: IDocumentQuery) (log: ILogger) : Async[] * Position option * float> = async { let! ct = Async.CancellationToken let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time let batches, ru = Array.ofSeq res, res.RequestCharge let events = batches |> Seq.collect (fun b -> Enum.Events(b, startPos, direction)) |> Array.ofSeq let (Log.BatchLen bytes), count = events, events.Length - let reqMetric : Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count; ru = ru } - let log = let evt = Log.Response (direction, reqMetric) in log |> Log.event evt + let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = streamName; interval = t; bytes = bytes; count = count; ru = ru } + let log = let evt = Log.Metric.QueryResponse (direction, reqMetric) in log |> Log.event evt let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEvents events let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i }) (log |> (match startPos with Some pos -> Log.propStartPos pos | None -> id) |> Log.prop "bytes" bytes) @@ -703,10 +727,10 @@ module internal Tip = yield! loop (batchCount + 1) } loop 0 - let private logQuery direction batchSize streamName interval (responsesCount, events : ITimelineEvent[]) n (ru: float) (log : ILogger) = + let private logQuery direction batchSize (container : Container, streamName) interval (responsesCount, events : ITimelineEvent[]) n (ru: float) (log : ILogger) = let (Log.BatchLen bytes), count = events, events.Length - let reqMetric : Log.Measurement = { stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } - let evt = Log.Event.Query (direction, responsesCount, reqMetric) + let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } + let evt = Log.Metric.Query (direction, responsesCount, reqMetric) let action = match direction with Direction.Forward -> "QueryF" | Direction.Backward -> "QueryB" (log |> Log.prop "bytes" bytes |> Log.prop "batchSize" batchSize |> Log.event evt).Information( "EqxCosmos {action:l} {stream} v{n} {count}/{responses} {ms}ms rc={ru}", @@ -749,7 +773,7 @@ module internal Tip = |> AsyncSeq.toArrayAsync return events, maybeTipPos, ru } let query = mkQuery (container,stream) maxItems direction startPos - let pullSlice = handleResponse direction stream startPos + let pullSlice = handleResponse direction (container, stream) startPos let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query) let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream let readlog = log |> Log.prop "direction" direction @@ -758,7 +782,7 @@ module internal Tip = let raws, decoded = (Array.map fst events), (events |> Seq.choose snd |> Array.ofSeq) let pos = match maybeTipPos with Some p -> p | None -> Position.fromMaxIndex raws - log |> logQuery direction maxItems stream t (!responseCount,raws) pos.index ru + log |> logQuery direction maxItems (container, stream) t (!responseCount,raws) pos.index ru return pos, decoded } let walkLazy<'event> (log : ILogger) (container,stream) retryPolicy maxItems maxRequests direction startPos @@ -766,7 +790,7 @@ module internal Tip = : AsyncSeq<'event[]> = asyncSeq { let responseCount = ref 0 let query = mkQuery (container,stream) maxItems direction startPos - let pullSlice = handleResponse direction stream startPos + let pullSlice = handleResponse direction (container, stream) startPos let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query) let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream let mutable ru = 0. @@ -802,7 +826,7 @@ module internal Tip = finally let endTicks = System.Diagnostics.Stopwatch.GetTimestamp() let t = StopwatchInterval(startTicks, endTicks) - log |> logQuery direction maxItems stream t (!responseCount,allSlices.ToArray()) -1L ru } + log |> logQuery direction maxItems (container, stream) t (!responseCount,allSlices.ToArray()) -1L ru } // Manages deletion of (full) Batches, maintaining ordering guarantees by never updating non-Tip batches // Additionally, the nature of the fallback algorithm requires that deletions be carried out in sequential order so as not to leave gaps @@ -822,14 +846,14 @@ module Prune = let qo = Client.RequestOptions(PartitionKey = PartitionKey stream) let! t, res = container.Client.DeleteDocumentAsync(docLink, qo, cancellationToken=ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time let rc, ms = res.RequestCharge, (let e = t.Elapsed in e.TotalMilliseconds) - let reqMetric : Log.Measurement = { stream = stream; interval = t; bytes = -1; count = count; ru = rc } - let log = let evt = Log.Delete reqMetric in log |> Log.event evt + let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = stream; interval = t; bytes = -1; count = count; ru = rc } + let log = let evt = Log.Metric.Delete reqMetric in log |> Log.event evt log.Information("EqxCosmos {action:l} {id} {ms}ms rc={ru}", "Delete", id, ms, rc) return rc } let log = log |> Log.prop "index" indexInclusive let query : IDocumentQuery = - // sort by i to guarantee we don't ever leave an observable gap in the sequence + // sort by i to guarantee we don't ever leave an observable gap in the sequence let q = SqlQuerySpec("SELECT c.id, c.i, c.n FROM c ORDER by c.i") let qro = Client.FeedOptions(PartitionKey = PartitionKey stream, MaxItemCount=Nullable maxItems) container.Client.CreateDocumentQuery<_>(container.CollectionUri, q, qro).AsDocumentQuery() @@ -840,8 +864,8 @@ module Prune = let! t, (res : Client.FeedResponse<_>) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time let batches, rc, ms = Array.ofSeq res, res.RequestCharge, (let e = t.Elapsed in e.TotalMilliseconds) let next = (match batches with [||] -> None | xs -> Some (xs.[ xs.Length - 1])) |> Option.map (fun x -> x.n) |> Option.toNullable - let reqMetric : Log.Measurement = { stream = stream; interval = t; bytes = -1; count = batches.Length; ru = rc } - let log = let evt = Log.PruneResponse reqMetric in log |> Log.event evt + let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = stream; interval = t; bytes = -1; count = batches.Length; ru = rc } + let log = let evt = Log.Metric.PruneResponse reqMetric in log |> Log.event evt log.Information("EqxCosmos {action:l} {batches} {ms}ms n={next} rc={ru}", "PruneResponse", batches.Length, ms, next, rc) return Some ((rc, batches), x) } @@ -887,8 +911,8 @@ module Prune = batches <- batches + bCount eventsDeleted <- eventsDeleted + eDel eventsDeferred <- eventsDeferred + eDef - let reqMetric : Log.Measurement = { stream = stream; interval = pt; bytes = eventsDeleted; count = batches; ru = queryCharges } - let log = let evt = Log.Prune (responses, reqMetric) in log |> Log.event evt + let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = stream; interval = pt; bytes = eventsDeleted; count = batches; ru = queryCharges } + let log = let evt = Log.Metric.Prune (responses, reqMetric) in log |> Log.event evt let lwm = defaultArg lwm 0L // If we've seen no batches at all, then the write position is 0L log.Information("EqxCosmos {action:l} {events}/{batches} lwm={lwm} {ms}ms queryRu={queryRu} deleteRu={deleteRu} trimRu={trimRu}", "Prune", eventsDeleted, batches, lwm, (let e = pt.Elapsed in e.TotalMilliseconds), queryCharges, delCharges, trimCharges) @@ -1017,7 +1041,7 @@ type private Category<'event, 'state, 'context>(gateway : Gateway, codec : IEven | LoadFromTokenResult.Found (token', events') -> return token', fold state events' } member __.Sync(Token.Unpack (container,stream,pos), state as current, events, mapUnfolds, fold, isOrigin, log, context, compressUnfolds): Async> = async { let state' = fold state (Seq.ofList events) - let encode e = codec.Encode(context,e) + let encode e = codec.Encode(context, e) let exp,events,eventsEncoded,projectionsEncoded = match mapUnfolds with | Choice1Of3 () -> Sync.Exp.Version pos.index, events, Seq.map encode events |> Array.ofSeq, Seq.empty @@ -1098,7 +1122,7 @@ type private Folder<'event, 'state, 'context> | Some tokenAndState when opt = Some AllowStale -> return tokenAndState | Some tokenAndState -> return! category.LoadFromToken tokenAndState fold isOrigin log } member __.TrySync(log : ILogger, streamToken, state, events : 'event list, context) - : Async> = async { + : Async> = async { match! category.Sync((streamToken, state), events, mapUnfolds, fold, isOrigin, log, context, compressUnfolds) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token',state') -> return SyncResult.Written (token',state') } @@ -1199,7 +1223,7 @@ type Resolver<'event, 'state, 'context> ( context : Context, codec, fold, initial, caching, access, /// Compress Unfolds in Tip. Default: true. /// NOTE when set to false, requires Equinox.Cosmos Version >= 2.3.0 to be able to read - ?compressUnfolds) = + []?compressUnfolds) = let compressUnfolds = defaultArg compressUnfolds true let readCacheOption = match caching with @@ -1283,7 +1307,7 @@ type Connector []?gatewayModeMaxConnectionLimit, /// Connection mode (default: ConnectionMode.Gateway (lowest perf, least trouble)) []?mode : ConnectionMode, - /// consistency mode (default: ConsistencyLevel.Session) + /// consistency mode (default: ConsistencyLevel.Session) []?defaultConsistencyLevel : ConsistencyLevel, /// Retries for read requests, over and above those defined by the mandatory policies @@ -1422,7 +1446,7 @@ type Context /// Reads all Events from a `Position` in a given `direction` member __.Read(stream, ?position, ?maxCount, ?direction) : Async[]> = - __.GetInternal((stream, position), ?maxCount=maxCount, ?direction=direction) |> yieldPositionAndData + __.GetInternal((stream, position), ?maxCount = maxCount, ?direction = direction) |> yieldPositionAndData /// Appends the supplied batch of events, subject to a consistency check based on the `position` /// Callers should implement appropriate idempotent handling, or use Equinox.Stream for that purpose @@ -1454,7 +1478,7 @@ type Context /// Provides mechanisms for building `EventData` records to be supplied to the `Events` API type EventData() = /// Creates an Event record, suitable for supplying to Append et al - static member FromUtf8Bytes(eventType, data, ?meta) : IEventData<_> = FsCodec.Core.EventData.Create(eventType, data, ?meta=meta) :> _ + static member FromUtf8Bytes(eventType, data, ?meta) : IEventData<_> = FsCodec.Core.EventData.Create(eventType, data, ?meta = meta) :> _ /// Api as defined in the Equinox Specification /// Note the CosmosContext APIs can yield better performance due to the fact that a Position tracks the etag of the Stream's Tip diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index 56cb5d10a..76d32a6b7 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -7,7 +7,7 @@ open Equinox open Equinox.Core open System.Runtime.InteropServices -/// Equivalent to GetEventStore's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary +/// Equivalent to EventStoreDB's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary exception private WrongVersionException of streamName: string * expected: int * value: obj /// Internal result used to reflect the outcome of syncing with the entry in the inner ConcurrentDictionary @@ -20,10 +20,21 @@ type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't /// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance type VolatileStore<'Format>() = - let streams = System.Collections.Concurrent.ConcurrentDictionary[]>() + + let streams = System.Collections.Concurrent.ConcurrentDictionary[]>() + + // Where TrySync attempts overlap on the same stream, there's a race to raise the Committed event for each 'commit' resulting from a successful Sync + // If we don't serialize the publishing of the events, its possible for handlers to observe the Events out of order let committed = Event<_>() + // Here we neuter that effect - the BatchingGate can end up with commits submitted out of order, but we serialize the raising of the events per stream + let publishBatches (commits : (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[])[]) = async { + for streamName, events in commits |> Seq.groupBy fst do + committed.Trigger(streamName, events |> Seq.collect snd |> Seq.sortBy (fun x -> x.Index) |> Seq.toArray) } + let publishCommit = AsyncBatchingGate(publishBatches, System.TimeSpan.FromMilliseconds 2.) [] + /// Notifies of a batch of events being committed to a given Stream. Guarantees no out of order and/or overlapping raising of the event
+ /// NOTE in some cases, two or more overlapping commits can be coalesced into a single Committed event member __.Committed : IEvent[]> = committed.Publish /// Loads state from a given stream @@ -33,20 +44,22 @@ type VolatileStore<'Format>() = member __.TrySync ( streamName, trySyncValue : FsCodec.ITimelineEvent<'Format>[] -> ConcurrentDictionarySyncResult[]>, events: FsCodec.ITimelineEvent<'Format>[]) - : ConcurrentArraySyncResult[]> = + : Async[]>> = async { let seedStream _streamName = events let updateValue streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) = match trySyncValue currentValue with | ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue) | ConcurrentDictionarySyncResult.Written value -> value - try let res = streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written - committed.Trigger((FsCodec.StreamName.parse streamName, events)) // raise here, once, as updateValue can conceptually be invoked multiple times - res - with WrongVersionException(_, _, conflictingValue) -> unbox conflictingValue |> Conflict + try let res = streams.AddOrUpdate(streamName, seedStream, updateValue) + // we publish the event here, once, as `updateValue` can be invoked multiple times + do! publishCommit.Execute((FsCodec.StreamName.parse streamName, events)) + return Written res + with WrongVersionException(_, _, conflictingValue) -> + return Conflict (unbox conflictingValue) } type Token = { streamVersion: int; streamName: string } -/// Internal implementation detail of MemoryStreamStore +/// Internal implementation detail of MemoryStore module private Token = let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken = @@ -64,36 +77,37 @@ module private Token = /// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!). type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) = - let (|Decode|) = Array.choose codec.TryDecode interface ICategory<'event, 'state, string, 'context> with member __.Load(_log, streamName, _opt) = async { match store.TryLoad streamName with | None -> return Token.ofEmpty streamName initial - | Some (Decode events) -> return Token.ofEventArray streamName fold initial events } + | Some events -> return Token.ofEventArray streamName fold initial (events |> Array.choose codec.TryDecode) } member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async { let inline map i (e : FsCodec.IEventData<'Format>) = FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp) - let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i+1) (codec.Encode(context,e))) |> Array.ofSeq + let encoded = events |> Seq.mapi (fun i e -> map (token.streamVersion + i + 1) (codec.Encode(context, e))) |> Array.ofSeq let trySyncValue currentValue = if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion) else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq) - match store.TrySync(token.streamName, trySyncValue, encoded) with + match! store.TrySync(token.streamName, trySyncValue, encoded) with + | ConcurrentArraySyncResult.Written _ -> + return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events | ConcurrentArraySyncResult.Conflict conflictingEvents -> let resync = async { let version = Token.tokenOfArray token.streamName conflictingEvents let successorEvents = conflictingEvents |> Seq.skip (token.streamVersion + 1) |> List.ofSeq return version, fold state (successorEvents |> Seq.choose codec.TryDecode) } - return SyncResult.Conflict resync - | ConcurrentArraySyncResult.Written _ -> return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events } + return SyncResult.Conflict resync } type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) = let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial) let resolveStream streamName context = Stream.create category streamName None context + member __.Resolve(streamName : FsCodec.StreamName, [] ?option, [] ?context : 'context) = match FsCodec.StreamName.toString streamName, option with - | sn, (None|Some AllowStale) -> resolveStream sn context + | sn, (None | Some AllowStale) -> resolveStream sn context | sn, Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn context) /// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento] member __.FromMemento(Token.Unpack stream as streamToken, state, ?context) = - Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context) + Stream.ofMemento (streamToken, state) (resolveStream stream.streamName context) diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index 42e1c5fb2..ca55cf77a 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -256,7 +256,7 @@ type Tests(testOutputHelper) = verifyCorrectEvents 0L expected res test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> let queryRoundTripsAndItemCounts = function - | EqxEvent (Equinox.Cosmos.Store.Log.Event.Query (Equinox.Cosmos.Store.Direction.Forward, responses, { count = c })) -> Some (responses,c) + | EqxEvent (Equinox.Cosmos.Store.Log.Metric.Query (Equinox.Cosmos.Store.Direction.Forward, responses, { count = c })) -> Some (responses,c) | _ -> None // validate that, because we stopped after 1 item, we only needed one trip (which contained 4 events) [1,4] =! capture.ChooseCalls queryRoundTripsAndItemCounts @@ -322,7 +322,7 @@ type Tests(testOutputHelper) = test <@ [EqxAct.ResponseBackward; EqxAct.QueryBackward] = capture.ExternalCalls @> // validate that, despite only requesting max 1 item, we only needed one trip, bearing 5 items (from which one item was omitted) let queryRoundTripsAndItemCounts = function - | EqxEvent (Equinox.Cosmos.Store.Log.Event.Query (Equinox.Cosmos.Store.Direction.Backward, responses, { count = c })) -> Some (responses,c) + | EqxEvent (Equinox.Cosmos.Store.Log.Metric.Query (Equinox.Cosmos.Store.Direction.Backward, responses, { count = c })) -> Some (responses,c) | _ -> None [1,5] =! capture.ChooseCalls queryRoundTripsAndItemCounts verifyRequestChargesMax 4 // 3.24 // WAS 3 // 2.98 diff --git a/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs b/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs index 42e4b4e50..c448016df 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs @@ -59,42 +59,45 @@ module SerilogHelpers = | Append | Resync | Conflict | PruneResponse | Delete | Prune let (|EqxAction|) = function - | Event.Tip _ -> EqxAct.Tip - | Event.TipNotFound _ -> EqxAct.TipNotFound - | Event.TipNotModified _ -> EqxAct.TipNotModified - | Event.Response (Direction.Forward,_) -> EqxAct.ResponseForward - | Event.Response (Direction.Backward,_) -> EqxAct.ResponseBackward - | Event.Query (Direction.Forward,_,_) -> EqxAct.QueryForward - | Event.Query (Direction.Backward,_,_) -> EqxAct.QueryBackward - | Event.SyncSuccess _ -> EqxAct.Append - | Event.SyncResync _ -> EqxAct.Resync - | Event.SyncConflict _ -> EqxAct.Conflict - | Event.PruneResponse _ -> EqxAct.PruneResponse - | Event.Delete _ -> EqxAct.Delete - | Event.Prune _ -> EqxAct.Prune - let inline (|Stats|) ({ ru = ru }: Equinox.Cosmos.Store.Log.Measurement) = ru - let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|CosmosDeleteRc|CosmosPruneRc|) = function - | Event.Tip (Stats s) - | Event.TipNotFound (Stats s) - | Event.TipNotModified (Stats s) - // slices are rolled up into batches so be sure not to double-count - | Event.PruneResponse (Stats s) - | Event.Response (_,Stats s) -> CosmosResponseRc s - | Event.Query (_,_, (Stats s)) -> CosmosReadRc s - | Event.SyncSuccess (Stats s) - | Event.SyncConflict (Stats s) -> CosmosWriteRc s - | Event.SyncResync (Stats s) -> CosmosResyncRc s - | Event.Delete (Stats s) -> CosmosDeleteRc s - | Event.Prune (_, (Stats s)) -> CosmosPruneRc s + | Metric.Tip _ -> EqxAct.Tip + | Metric.TipNotFound _ -> EqxAct.TipNotFound + | Metric.TipNotModified _ -> EqxAct.TipNotModified + + | Metric.Query (Direction.Forward, _, _) -> EqxAct.QueryForward + | Metric.Query (Direction.Backward, _, _) -> EqxAct.QueryBackward + | Metric.QueryResponse (Direction.Forward, _) -> EqxAct.ResponseForward + | Metric.QueryResponse (Direction.Backward, _) -> EqxAct.ResponseBackward + + | Metric.SyncSuccess _ -> EqxAct.Append + | Metric.SyncResync _ -> EqxAct.Resync + | Metric.SyncConflict _ -> EqxAct.Conflict + + | Metric.Prune _ -> EqxAct.Prune + | Metric.PruneResponse _ -> EqxAct.PruneResponse + | Metric.Delete _ -> EqxAct.Delete + let (|Load|Write|Resync|Prune|Delete|Response|) = function + | Metric.Tip s + | Metric.TipNotFound s + | Metric.TipNotModified s + + | Metric.Query (_, _, s) -> Load s + | Metric.QueryResponse (_, s) -> Response s + + | Metric.SyncSuccess s + | Metric.SyncConflict s -> Write s + | Metric.SyncResync s -> Resync s + + | Metric.Prune (_, s) -> Prune s + | Metric.PruneResponse s -> Response s + | Metric.Delete s -> Delete s + let inline (|Rc|) ({ ru = ru }: Equinox.Cosmos.Store.Log.Measurement) = ru /// Facilitates splitting between events with direct charges vs synthetic events Equinox generates to avoid double counting - let (|CosmosRequestCharge|EquinoxChargeRollup|) = function - | CosmosResponseRc _ -> - EquinoxChargeRollup - | CosmosReadRc rc | CosmosWriteRc rc | CosmosResyncRc rc | CosmosDeleteRc rc | CosmosPruneRc rc as e -> - CosmosRequestCharge (e,rc) - let (|EqxEvent|_|) (logEvent : LogEvent) : Equinox.Cosmos.Store.Log.Event option = + let (|TotalRequestCharge|ResponseBreakdown|) = function + | Load (Rc rc) | Write (Rc rc) | Resync (Rc rc) | Delete (Rc rc) | Prune (Rc rc) as e -> TotalRequestCharge (e, rc) + | Response _ -> ResponseBreakdown + let (|EqxEvent|_|) (logEvent : LogEvent) : Equinox.Cosmos.Store.Log.Metric option = logEvent.Properties.Values |> Seq.tryPick (function - | SerilogScalar (:? Equinox.Cosmos.Store.Log.Event as e) -> Some e + | SerilogScalar (:? Equinox.Cosmos.Store.Log.Metric as e) -> Some e | _ -> None) let (|HasProp|_|) (name : string) (e : LogEvent) : LogEventPropertyValue option = @@ -113,7 +116,7 @@ module SerilogHelpers = member __.Clear () = captured.Clear() member __.ChooseCalls chooser = captured |> Seq.choose chooser |> List.ofSeq member __.ExternalCalls = __.ChooseCalls (function EqxEvent (EqxAction act) -> Some act | _ -> None) - member __.RequestCharges = __.ChooseCalls (function EqxEvent (CosmosRequestCharge e) -> Some e | _ -> None) + member __.RequestCharges = __.ChooseCalls (function EqxEvent (TotalRequestCharge e) -> Some e | _ -> None) type TestsWithLogCapture(testOutputHelper) = let log, capture = TestsWithLogCapture.CreateLoggerWithCapture testOutputHelper @@ -124,6 +127,7 @@ type TestsWithLogCapture(testOutputHelper) = let capture = LogCaptureBuffer() let logger = Serilog.LoggerConfiguration() + .MinimumLevel.Debug() .WriteTo.Seq("http://localhost:5341") .WriteTo.Sink(testOutput) .WriteTo.Sink(capture)