diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6d3d4f845..0ee4032ae 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,18 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Removed
### Fixed
+
+## [2.4.0] - 2020-12-03
+
+### Added
+
+- `Cosmos`: Prometheus integration package as `master` [#266](https://github.com/jet/equinox/pull/266) [#267](https://github.com/jet/equinox/pull/267)
+
+### Fixed
+
+- `MemoryStore`: Serialize `Committed` events to guarantee consumption in event `Index` order re [#265](https://github.com/jet/equinox/issues/265) [#269](https://github.com/jet/equinox/pull/269) pray: [@fnipo](https://github.com/fnipo)
+- `Cosmos`: Fix defaulting for `compressUnfolds` in C# [#261](https://github.com/jet/equinox/pull/261)
+
## [2.3.0] - 2020-11-04
@@ -437,7 +449,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
(For information pertaining to earlier releases, see release notes in https://github.com/jet/equinox/releases and/or can someone please add it!)
-[Unreleased]: https://github.com/jet/equinox/compare/2.3.0...HEAD
+[Unreleased]: https://github.com/jet/equinox/compare/2.4.0...HEAD
+[2.4.0]: https://github.com/jet/equinox/compare/2.3.0...2.4.0
[2.3.0]: https://github.com/jet/equinox/compare/2.3.0-rc2...2.3.0
[2.3.0-rc2]: https://github.com/jet/equinox/compare/2.3.0-rc1...2.3.0-rc2
[2.3.0-rc1]: https://github.com/jet/equinox/compare/2.2.0...2.3.0-rc1
diff --git a/Equinox.sln b/Equinox.sln
index 95fa8ec40..1632e0172 100644
--- a/Equinox.sln
+++ b/Equinox.sln
@@ -76,6 +76,8 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.SqlStreamStore.Post
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Core", "src\Equinox.Core\Equinox.Core.fsproj", "{3021659A-5CA4-4E06-AF00-2457ED3F105B}"
EndProject
+Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Cosmos.Prometheus", "src\Equinox.Cosmos.Prometheus\Equinox.Cosmos.Prometheus.fsproj", "{BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -182,6 +184,10 @@ Global
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/build.proj b/build.proj
index d822cc84b..3e12b8e97 100644
--- a/build.proj
+++ b/build.proj
@@ -17,6 +17,7 @@
+
diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs
index 534a66de2..25d67d157 100644
--- a/samples/Store/Integration/LogIntegration.fs
+++ b/samples/Store/Integration/LogIntegration.fs
@@ -27,22 +27,22 @@ module EquinoxCosmosInterop =
[]
type FlatMetric = { action: string; stream : string; interval: StopwatchInterval; bytes: int; count: int; responses: int option; ru: float } with
override __.ToString() = sprintf "%s-Stream=%s %s-Elapsed=%O Ru=%O" __.action __.stream __.action __.interval.Elapsed __.ru
- let flatten (evt : Log.Event) : FlatMetric =
+ let flatten (evt : Log.Metric) : FlatMetric =
let action, metric, batches, ru =
match evt with
- | Log.Tip m -> "CosmosTip", m, None, m.ru
- | Log.TipNotFound m -> "CosmosTip404", m, None, m.ru
- | Log.TipNotModified m -> "CosmosTip302", m, None, m.ru
- | Log.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru
- | Log.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru
- | Log.Response (Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru
- | Log.Response (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru
- | Log.SyncSuccess m -> "CosmosSync200", m, None, m.ru
- | Log.SyncConflict m -> "CosmosSync409", m, None, m.ru
- | Log.SyncResync m -> "CosmosSyncResync", m, None, m.ru
- | Log.PruneResponse m -> "CosmosPruneResponse", m, None, m.ru
- | Log.Delete m -> "CosmosDelete", m, None, m.ru
- | Log.Prune (events, m) -> "CosmosPrune", m, Some events, m.ru
+ | Log.Metric.Tip m -> "CosmosTip", m, None, m.ru
+ | Log.Metric.TipNotFound m -> "CosmosTip404", m, None, m.ru
+ | Log.Metric.TipNotModified m -> "CosmosTip302", m, None, m.ru
+ | Log.Metric.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru
+ | Log.Metric.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru
+ | Log.Metric.QueryResponse(Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru
+ | Log.Metric.QueryResponse (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru
+ | Log.Metric.SyncSuccess m -> "CosmosSync200", m, None, m.ru
+ | Log.Metric.SyncConflict m -> "CosmosSync409", m, None, m.ru
+ | Log.Metric.SyncResync m -> "CosmosSyncResync", m, None, m.ru
+ | Log.Metric.PruneResponse m -> "CosmosPruneResponse", m, None, m.ru
+ | Log.Metric.Delete m -> "CosmosDelete", m, None, m.ru
+ | Log.Metric.Prune (events, m) -> "CosmosPrune", m, Some events, m.ru
{ action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; responses = batches
interval = StopwatchInterval(metric.interval.StartTicks,metric.interval.EndTicks); ru = ru }
@@ -65,7 +65,7 @@ type SerilogMetricsExtractor(emit : string -> unit) =
logEvent.Properties
|> Seq.tryPick (function
| KeyValue (k, SerilogScalar (:? Equinox.EventStore.Log.Event as m)) -> Some <| Choice1Of3 (k,m)
- | KeyValue (k, SerilogScalar (:? Equinox.Cosmos.Store.Log.Event as m)) -> Some <| Choice2Of3 (k,m)
+ | KeyValue (k, SerilogScalar (:? Equinox.Cosmos.Store.Log.Metric as m)) -> Some <| Choice2Of3 (k,m)
| _ -> None)
|> Option.defaultValue (Choice3Of3 ())
let handleLogEvent logEvent =
diff --git a/samples/Tutorial/Counter.fsx b/samples/Tutorial/Counter.fsx
index 9cd5128cf..1c633f137 100644
--- a/samples/Tutorial/Counter.fsx
+++ b/samples/Tutorial/Counter.fsx
@@ -94,8 +94,6 @@ let store = Equinox.MemoryStore.VolatileStore()
let _ = store.Committed.Subscribe(fun (s, xs) -> logEvents s xs)
let codec = FsCodec.Box.Codec.Create()
let resolve = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve
-open Serilog
-let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
let service = Service(log, resolve, maxAttempts=3)
let clientId = "ClientA"
service.Read(clientId) |> Async.RunSynchronously
diff --git a/src/Equinox.Cosmos.Prometheus/CosmosPrometheus.fs b/src/Equinox.Cosmos.Prometheus/CosmosPrometheus.fs
new file mode 100644
index 000000000..de991e86d
--- /dev/null
+++ b/src/Equinox.Cosmos.Prometheus/CosmosPrometheus.fs
@@ -0,0 +1,116 @@
+namespace Equinox.CosmosStore.Prometheus
+
+module private Impl =
+
+ let baseName stat = "equinox_" + stat
+ let baseDesc desc = "Equinox CosmosDB " + desc
+
+module private Histograms =
+
+ let labelNames = [| "facet"; "op"; "app"; "db"; "con"; "cat" |]
+ let private mkHistogram (cfg : Prometheus.HistogramConfiguration) name desc =
+ let h = Prometheus.Metrics.CreateHistogram(name, desc, cfg)
+ fun (facet : string, op : string) app (db, con, cat : string) s ->
+ h.WithLabels(facet, op, app, db, con, cat).Observe(s)
+ // Given we also have summary metrics with equivalent labels, we focus the bucketing on LAN latencies
+ let private sHistogram =
+ let sBuckets = [| 0.0005; 0.001; 0.002; 0.004; 0.008; 0.016; 0.5; 1.; 2.; 4.; 8. |]
+ let sCfg = Prometheus.HistogramConfiguration(Buckets = sBuckets, LabelNames = labelNames)
+ mkHistogram sCfg
+ let private ruHistogram =
+ let ruBuckets = Prometheus.Histogram.ExponentialBuckets(1., 2., 11) // 1 .. 1024
+ let ruCfg = Prometheus.HistogramConfiguration(Buckets = ruBuckets, LabelNames = labelNames)
+ mkHistogram ruCfg
+ let sAndRuPair stat desc =
+ let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc
+ let observeS = sHistogram (baseName + "_seconds") (baseDesc + " latency")
+ let observeRu = ruHistogram (baseName + "_ru") (baseDesc + " charge")
+ fun (facet, op) app (db, con, cat, s : System.TimeSpan, ru) ->
+ observeS (facet, op) app (db, con, cat) s.TotalSeconds
+ observeRu (facet, op) app (db, con, cat) ru
+
+module private Summaries =
+
+ let labelNames = [| "facet"; "app"; "db"; "con" |]
+ let private mkSummary (cfg : Prometheus.SummaryConfiguration) name desc =
+ let s = Prometheus.Metrics.CreateSummary(name, desc, cfg)
+ fun (facet : string) app (db, con) o -> s.WithLabels(facet, app, db, con).Observe(o)
+ let config =
+ let inline qep q e = Prometheus.QuantileEpsilonPair(q, e)
+ let objectives = [| qep 0.50 0.05; qep 0.95 0.01; qep 0.99 0.01 |]
+ Prometheus.SummaryConfiguration(Objectives = objectives, LabelNames = labelNames, MaxAge = System.TimeSpan.FromMinutes 1.)
+ let sAndRuPair stat desc =
+ let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc
+ let observeS = mkSummary config (baseName + "_seconds") (baseDesc + " latency")
+ let observeRu = mkSummary config (baseName + "_ru") (baseDesc + " charge")
+ fun facet app (db, con, s : System.TimeSpan, ru) ->
+ observeS facet app (db, con) s.TotalSeconds
+ observeRu facet app (db, con) ru
+
+module private Counters =
+
+ let labelNames = [| "facet"; "op"; "outcome"; "app"; "db"; "con"; "cat" |]
+ let private mkCounter (cfg : Prometheus.CounterConfiguration) name desc =
+ let h = Prometheus.Metrics.CreateCounter(name, desc, cfg)
+ fun (facet : string, op : string, outcome : string) app (db, con, cat) c ->
+ h.WithLabels(facet, op, outcome, app, db, con, cat).Inc(c)
+ let config = Prometheus.CounterConfiguration(LabelNames = labelNames)
+ let total stat desc =
+ let name = Impl.baseName (stat + "_total")
+ let desc = Impl.baseDesc desc
+ mkCounter config name desc
+ let eventsAndBytesPair stat desc =
+ let observeE = total (stat + "_events") (desc + "Events")
+ let observeB = total (stat + "_bytes") (desc + "Bytes")
+ fun ctx app (db, con, cat, e, b) ->
+ observeE ctx app (db, con, cat) e
+ match b with None -> () | Some b -> observeB ctx app (db, con, cat) b
+
+module private Stats =
+
+ let opHistogram = Histograms.sAndRuPair "op" "Operation"
+ let roundtripHistogram = Histograms.sAndRuPair "roundtrip" "Fragment"
+ let opSummary = Summaries.sAndRuPair "op_summary" "Operation Summary"
+ let roundtripSummary = Summaries.sAndRuPair "roundtrip_summary" "Fragment Summary"
+ let payloadCounters = Counters.eventsAndBytesPair "payload" "Payload, "
+ let cacheCounter = Counters.total "cache" "Cache"
+
+ let observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru) =
+ opHistogram (facet, op) app (db, con, cat, s, ru)
+ opSummary facet app (db, con, s, ru)
+ let observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, count, bytes) =
+ observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru)
+ payloadCounters (facet, op, outcome) app (db, con, cat, float count, if bytes = -1 then None else Some (float bytes))
+
+ let inline (|CatSRu|) ({ interval = i; ru = ru } : Equinox.Cosmos.Store.Log.Measurement as m) =
+ let cat, _id = FsCodec.StreamName.splitCategoryAndId (FSharp.UMX.UMX.tag m.stream)
+ m.database, m.container, cat, i.Elapsed, ru
+ let observeRes (facet, _op as stat) app (CatSRu (db, con, cat, s, ru)) =
+ roundtripHistogram stat app (db, con, cat, s, ru)
+ roundtripSummary facet app (db, con, s, ru)
+ let observe_ stat app (CatSRu (db, con, cat, s, ru)) =
+ observeLatencyAndCharge stat app (db, con, cat, s, ru)
+ let observe (facet, op, outcome) app (CatSRu (db, con, cat, s, ru) as m) =
+ observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes)
+ let observeTip (facet, op, outcome, cacheOutcome) app (CatSRu (db, con, cat, s, ru) as m) =
+ observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes)
+ cacheCounter (facet, op, cacheOutcome) app (db, con, cat) 1.
+
+open Equinox.Cosmos.Store.Log
+
+type LogSink(app) =
+ interface Serilog.Core.ILogEventSink with
+ member __.Emit logEvent = logEvent |> function
+ | MetricEvent cm -> cm |> function
+ | Op (Operation.Tip, m) -> Stats.observeTip ("query", "tip", "ok", "200") app m
+ | Op (Operation.Tip404, m) -> Stats.observeTip ("query", "tip", "ok", "404") app m
+ | Op (Operation.Tip302, m) -> Stats.observeTip ("query", "tip", "ok", "302") app m
+ | Op (Operation.Query, m) -> Stats.observe ("query", "query", "ok") app m
+ | QueryRes (_direction, m) -> Stats.observeRes ("query", "queryPage") app m
+ | Op (Operation.Write, m) -> Stats.observe ("transact", "sync", "ok") app m
+ | Op (Operation.Conflict, m) -> Stats.observe ("transact", "conflict", "conflict") app m
+ | Op (Operation.Resync, m) -> Stats.observe ("transact", "resync", "conflict") app m
+ | Op (Operation.Prune, m) -> Stats.observe_ ("prune", "pruneQuery") app m
+ | PruneRes ( m) -> Stats.observeRes ("prune", "pruneQueryPage") app m
+ | Op (Operation.Delete, m) -> Stats.observe ("prune", "delete", "ok") app m
+ | _ -> ()
diff --git a/src/Equinox.Cosmos.Prometheus/Equinox.Cosmos.Prometheus.fsproj b/src/Equinox.Cosmos.Prometheus/Equinox.Cosmos.Prometheus.fsproj
new file mode 100644
index 000000000..38fd20cb6
--- /dev/null
+++ b/src/Equinox.Cosmos.Prometheus/Equinox.Cosmos.Prometheus.fsproj
@@ -0,0 +1,30 @@
+
+
+
+ netstandard2.0;net461
+ 5
+ false
+ true
+ true
+ $(DefineConstants);NET461
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs
index 044c671bc..ccdfafffe 100644
--- a/src/Equinox.Cosmos/Cosmos.fs
+++ b/src/Equinox.Cosmos/Cosmos.fs
@@ -206,7 +206,7 @@ type internal Enum() =
Enum.Events(b.i, b.e, startPos, direction)
|> if direction = Direction.Backward then System.Linq.Enumerable.Reverse else id
static member Unfolds(xs: Unfold[]) : ITimelineEvent seq = seq {
- for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, x.d, x.m, Guid.Empty, null, null, x.t, isUnfold=true) }
+ for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, x.d, x.m, Guid.Empty, null, null, x.t, isUnfold = true) }
static member EventsAndUnfolds(x: Tip): ITimelineEvent seq =
Enum.Events x
|> Seq.append (Enum.Unfolds x.u)
@@ -216,44 +216,50 @@ type internal Enum() =
type IRetryPolicy = abstract member Execute: (int -> Async<'T>) -> Async<'T>
module Log =
+
[]
- type Measurement = { stream: string; interval: StopwatchInterval; bytes: int; count: int; ru: float }
- []
- type Event =
+ type Measurement =
+ { database: string; container: string; stream: string
+ interval: StopwatchInterval; bytes: int; count: int; ru: float }
+ []
+ type Metric =
/// Individual read request for the Tip
| Tip of Measurement
/// Individual read request for the Tip, not found
| TipNotFound of Measurement
/// Tip read with Single RU Request Charge due to correct use of etag in cache
| TipNotModified of Measurement
+
/// Summarizes a set of Responses for a given Read request
| Query of Direction * responses: int * Measurement
/// Individual read request in a Batch
- /// Charges are rolled up into Query (so do not double count)
- | Response of Direction * Measurement
+ /// Charges are rolled up into Query Metric (so do not double count)
+ | QueryResponse of Direction * Measurement
+
| SyncSuccess of Measurement
| SyncResync of Measurement
| SyncConflict of Measurement
+
+ /// Summarizes outcome of request to trim batches from head of a stream and events in Tip
+ /// Count in Measurement is number of batches (documents) deleted
+ /// Bytes in Measurement is number of events deleted
+ | Prune of responsesHandled : int * Measurement
/// Handled response from listing of batches in a stream
- /// Charges are rolled up into Prune (so do not double count)
+ /// Charges are rolled up into the Prune Metric (so do not double count)
| PruneResponse of Measurement
/// Deleted an individual Batch
| Delete of Measurement
- /// Pruned batches from head of a stream
- /// Count in Measurement is number of batches (documents) deleted
- /// Bytes in Measurement is number of events deleted
- | Prune of responsesHandled : int * Measurement
- let prop name value (log : ILogger) = log.ForContext(name, value)
- let propData name (events: #IEventData seq) (log : ILogger) =
+ let internal prop name value (log : ILogger) = log.ForContext(name, value)
+ let internal propData name (events: #IEventData seq) (log : ILogger) =
let render = function null -> "null" | bytes -> System.Text.Encoding.UTF8.GetString bytes
let items = seq { for e in events do yield sprintf "{\"%s\": %s}" e.EventType (render e.Data) }
log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items))
- let propEvents = propData "events"
- let propDataUnfolds = Enum.Unfolds >> propData "unfolds"
- let propStartPos (value : Position) log = prop "startPos" value.index log
- let propStartEtag (value : Position) log = prop "startEtag" value.etag log
+ let internal propEvents = propData "events"
+ let internal propDataUnfolds = Enum.Unfolds >> propData "unfolds"
+ let internal propStartPos (value : Position) log = prop "startPos" value.index log
+ let internal propStartEtag (value : Position) log = prop "startEtag" value.etag log
- let withLoggedRetries<'t> (retryPolicy: IRetryPolicy option) (contextLabel : string) (f : ILogger -> Async<'t>) log: Async<'t> =
+ let internal withLoggedRetries<'t> (retryPolicy: IRetryPolicy option) (contextLabel : string) (f : ILogger -> Async<'t>) log: Async<'t> =
match retryPolicy with
| None -> f log
| Some retryPolicy ->
@@ -261,75 +267,87 @@ module Log =
let log = if count = 1 then log else log |> prop contextLabel count
f log
retryPolicy.Execute withLoggingContextWrapping
- /// Attach a property to the log context to hold the metrics
+ /// Include a LogEvent property bearing metrics
// Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124
- open Serilog.Events
- let event (value : Event) (log : ILogger) =
- let enrich (e : LogEvent) = e.AddPropertyIfAbsent(LogEventProperty("cosmosEvt", ScalarValue(value)))
+ let internal event (value : Metric) (log : ILogger) =
+ let enrich (e : Serilog.Events.LogEvent) =
+ e.AddPropertyIfAbsent(Serilog.Events.LogEventProperty("cosmosEvt", Serilog.Events.ScalarValue(value)))
log.ForContext({ new Serilog.Core.ILogEventEnricher with member __.Enrich(evt,_) = enrich evt })
- let (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length
- let (|EventLen|) (x: #IEventData<_>) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes + metaBytes + 80
- let (|BatchLen|) = Seq.sumBy (|EventLen|)
+ let internal (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length
+ let internal (|EventLen|) (x: #IEventData<_>) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes + metaBytes + 80
+ let internal (|BatchLen|) = Seq.sumBy (|EventLen|)
+ let internal (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function
+ | (:? Serilog.Events.ScalarValue as x) -> Some x.Value
+ | _ -> None
+ let (|MetricEvent|_|) (logEvent : Serilog.Events.LogEvent) : Metric option =
+ match logEvent.Properties.TryGetValue("cosmosEvt") with
+ | true, SerilogScalar (:? Metric as e) -> Some e
+ | _ -> None
+ []
+ type Operation = Tip | Tip404 | Tip302 | Query | Write | Resync | Conflict | Prune | Delete
+ let (|Op|QueryRes|PruneRes|) = function
+ | Metric.Tip s -> Op (Operation.Tip, s)
+ | Metric.TipNotFound s -> Op (Operation.Tip404, s)
+ | Metric.TipNotModified s -> Op (Operation.Tip302, s)
+
+ | Metric.Query (_, _, s) -> Op (Operation.Query, s)
+ | Metric.QueryResponse (direction, s) -> QueryRes (direction, s)
+
+ | Metric.SyncSuccess s -> Op (Operation.Write, s)
+ | Metric.SyncResync s -> Op (Operation.Resync, s)
+ | Metric.SyncConflict s -> Op (Operation.Conflict, s)
+
+ | Metric.Prune (_, s) -> Op (Operation.Prune, s)
+ | Metric.PruneResponse s -> PruneRes s
+ | Metric.Delete s -> Op (Operation.Delete, s)
/// NB Caveat emptor; this is subject to unlimited change without the major version changing - while the `dotnet-templates` repo will be kept in step, and
/// the ChangeLog will mention changes, it's critical to not assume that the presence or nature of these helpers be considered stable
module InternalMetrics =
module Stats =
- let inline (|Stats|) ({ interval = i; ru = ru }: Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds
-
- let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|CosmosDeleteRc|CosmosPruneRc|) = function
- | Tip (Stats s)
- | TipNotFound (Stats s)
- | TipNotModified (Stats s)
- | Query (_,_, (Stats s)) -> CosmosReadRc s
- // slices are rolled up into batches so be sure not to double-count
- | Response (_,(Stats s))
- // costs roll up into Prune operation so be sure not to double-count
- | PruneResponse (Stats s) -> CosmosResponseRc s
- | SyncSuccess (Stats s)
- | SyncConflict (Stats s) -> CosmosWriteRc s
- | SyncResync (Stats s) -> CosmosResyncRc s
- | Delete (Stats s) -> CosmosDeleteRc s
- | Prune (_, (Stats s)) -> CosmosPruneRc s
- let (|SerilogScalar|_|) : LogEventPropertyValue -> obj option = function
- | (:? ScalarValue as x) -> Some x.Value
- | _ -> None
- let (|CosmosMetric|_|) (logEvent : LogEvent) : Event option =
- match logEvent.Properties.TryGetValue("cosmosEvt") with
- | true, SerilogScalar (:? Event as e) -> Some e
- | _ -> None
- type Counter =
- { mutable rux100: int64; mutable count: int64; mutable ms: int64 }
- static member Create() = { rux100 = 0L; count = 0L; ms = 0L }
- member __.Ingest (ru, ms) =
- System.Threading.Interlocked.Increment(&__.count) |> ignore
- System.Threading.Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore
- System.Threading.Interlocked.Add(&__.ms, ms) |> ignore
+
+ type internal Counter =
+ { mutable rux100: int64; mutable count: int64; mutable ms: int64 }
+ static member Create() = { rux100 = 0L; count = 0L; ms = 0L }
+ member __.Ingest (ru, ms) =
+ System.Threading.Interlocked.Increment(&__.count) |> ignore
+ System.Threading.Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore
+ System.Threading.Interlocked.Add(&__.ms, ms) |> ignore
+ let inline private (|RcMs|) ({ interval = i; ru = ru }: Measurement) =
+ ru, let e = i.Elapsed in int64 e.TotalMilliseconds
type LogSink() =
static let epoch = System.Diagnostics.Stopwatch.StartNew()
- static member val Read = Counter.Create() with get, set
- static member val Write = Counter.Create() with get, set
- static member val Resync = Counter.Create() with get, set
- static member val Delete = Counter.Create() with get, set
- static member val Prune = Counter.Create() with get, set
+ static member val internal Read = Counter.Create() with get, set
+ static member val internal Write = Counter.Create() with get, set
+ static member val internal Resync = Counter.Create() with get, set
+ static member val internal Conflict = Counter.Create() with get, set
+ static member val internal Prune = Counter.Create() with get, set
+ static member val internal Delete = Counter.Create() with get, set
static member Restart() =
LogSink.Read <- Counter.Create()
LogSink.Write <- Counter.Create()
LogSink.Resync <- Counter.Create()
- LogSink.Delete <- Counter.Create()
+ LogSink.Conflict <- Counter.Create()
LogSink.Prune <- Counter.Create()
+ LogSink.Delete <- Counter.Create()
let span = epoch.Elapsed
epoch.Restart()
span
interface Serilog.Core.ILogEventSink with
- member __.Emit logEvent = logEvent |> function
- | CosmosMetric (CosmosReadRc stats) -> LogSink.Read.Ingest stats
- | CosmosMetric (CosmosWriteRc stats) -> LogSink.Write.Ingest stats
- | CosmosMetric (CosmosResyncRc stats) -> LogSink.Resync.Ingest stats
- | CosmosMetric (CosmosDeleteRc stats) -> LogSink.Delete.Ingest stats
- | CosmosMetric (CosmosPruneRc stats) -> LogSink.Prune.Ingest stats
- | CosmosMetric (CosmosResponseRc _) -> () // Costs are already included in others
+ member __.Emit logEvent =
+ match logEvent with
+ | MetricEvent cm ->
+ match cm with
+ | Op ((Operation.Tip | Operation.Tip404 | Operation.Tip302 | Operation.Query), RcMs m) ->
+ LogSink.Read.Ingest m
+ | QueryRes (_direction, _) -> ()
+ | Op (Operation.Write, RcMs m) -> LogSink.Write.Ingest m
+ | Op (Operation.Conflict, RcMs m) -> LogSink.Conflict.Ingest m
+ | Op (Operation.Resync, RcMs m) -> LogSink.Resync.Ingest m
+ | Op (Operation.Prune, RcMs m) -> LogSink.Prune.Ingest m
+ | PruneRes ( _) -> ()
+ | Op (Operation.Delete, RcMs m) -> LogSink.Delete.Ingest m
| _ -> ()
/// Relies on feeding of metrics from Log through to Stats.LogSink
@@ -339,8 +357,9 @@ module Log =
[ "Read", Stats.LogSink.Read
"Write", Stats.LogSink.Write
"Resync", Stats.LogSink.Resync
- "Delete", Stats.LogSink.Delete
- "Prune", Stats.LogSink.Prune ]
+ "Conflict", Stats.LogSink.Conflict
+ "Prune", Stats.LogSink.Prune
+ "Delete", Stats.LogSink.Delete ]
let mutable rows, totalCount, totalRc, totalMs = 0, 0L, 0., 0L
let logActivity name count rc lat =
log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms",
@@ -363,6 +382,8 @@ module Log =
type Container(client : Client.DocumentClient, databaseId, containerId) =
let collectionUri = Microsoft.Azure.Documents.Client.UriFactory.CreateDocumentCollectionUri(databaseId, containerId)
member __.Client = client
+ member __.DatabaseId = databaseId
+ member __.ContainerId = containerId
member __.CollectionUri = collectionUri
[]
@@ -507,12 +528,14 @@ function sync(req, expIndex, expEtag) {
| [||] -> Result.ConflictUnknown newPos
| xs -> Result.Conflict (newPos, Enum.Unfolds xs |> Array.ofSeq) }
- let private logged (container,stream) (exp : Exp, req: Tip) (log : ILogger)
+ let private logged (container : Container, stream) (exp : Exp, req: Tip) (log : ILogger)
: Async = async {
let! t, (ru, result) = run (container,stream) (exp, req) |> Stopwatch.Time
let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length
let log =
- let inline mkMetric ru : Log.Measurement = { stream = stream; interval = t; bytes = bytes; count = count; ru = ru }
+ let inline mkMetric ru : Log.Measurement =
+ { database = container.DatabaseId; container = container.ContainerId; stream = stream
+ interval = t; bytes = bytes; count = count; ru = ru }
let inline propConflict log = log |> Log.prop "conflict" true |> Log.prop "eventTypes" (Seq.truncate 5 (seq { for x in req.e -> x.c }))
let verbose = log.IsEnabled Serilog.Events.LogEventLevel.Debug
(if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log)
@@ -522,12 +545,12 @@ function sync(req, expIndex, expEtag) {
| Exp.Any -> Log.prop "expectedVersion" -1
|> match result with
| Result.Written pos ->
- Log.prop "nextExpectedVersion" pos >> Log.event (Log.SyncSuccess (mkMetric ru))
+ Log.prop "nextExpectedVersion" pos >> Log.event (Log.Metric.SyncSuccess (mkMetric ru))
| Result.ConflictUnknown pos' ->
- Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.SyncConflict (mkMetric ru))
+ Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.Metric.SyncConflict (mkMetric ru))
| Result.Conflict (pos', xs) ->
(if verbose then Log.propData "conflicts" xs else id)
- >> Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.SyncResync (mkMetric ru))
+ >> Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.Metric.SyncResync (mkMetric ru))
log.Information("EqxCosmos {action:l} {stream} {count}+{ucount} {ms:f1}ms {ru}RU {bytes:n0}b {exp}",
"Sync", stream, count, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru, bytes, exp)
return result }
@@ -538,7 +561,7 @@ function sync(req, expIndex, expEtag) {
let private mkEvent (e : IEventData<_>) =
{ t = e.Timestamp; c = e.EventType; d = e.Data; m = e.Meta; correlationId = e.CorrelationId; causationId = e.CausationId }
- let mkBatch (stream: string) (events: IEventData<_>[]) unfolds: Tip =
+ let mkBatch (stream: string) (events: IEventData<_>[]) unfolds : Tip =
{ p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null
e = Array.map mkEvent events; u = Array.ofSeq unfolds }
let mkUnfold compressor baseIndex (unfolds: IEventData<_> seq) : Unfold seq =
@@ -627,19 +650,20 @@ module internal Tip =
let ac = match maybePos with Some { etag=Some etag } -> Client.AccessCondition(Type=Client.AccessConditionType.IfNoneMatch, Condition=etag) | _ -> null
let ro = Client.RequestOptions(PartitionKey=PartitionKey stream, AccessCondition = ac)
container.TryReadItem(Tip.WellKnownDocumentId, ro)
- let private loggedGet (get : Container * string -> Position option -> Async<_>) (container,stream) (maybePos: Position option) (log: ILogger) = async {
+ let private loggedGet (get : Container * string -> Position option -> Async<_>) (container : Container, stream) (maybePos: Position option) (log: ILogger) = async {
let log = log |> Log.prop "stream" stream
let! t, (ru, res : ReadResult) = get (container,stream) maybePos |> Stopwatch.Time
- let log bytes count (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream; interval = t; bytes = bytes; count = count; ru = ru })
+ let log bytes count (f : Log.Measurement -> _) =
+ log |> Log.event (f { database = container.DatabaseId; container = container.ContainerId; stream = stream; interval = t; bytes = bytes; count = count; ru = ru })
match res with
| ReadResult.NotModified ->
- (log 0 0 Log.TipNotModified).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru)
+ (log 0 0 Log.Metric.TipNotModified).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru)
| ReadResult.NotFound ->
- (log 0 0 Log.TipNotFound).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru)
+ (log 0 0 Log.Metric.TipNotFound).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru)
| ReadResult.Found tip ->
let log =
let (Log.BatchLen bytes), count = Enum.Unfolds tip.u, tip.u.Length
- log bytes count Log.Tip
+ log bytes count Log.Metric.Tip
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataUnfolds tip.u
let log = match maybePos with Some p -> log |> Log.propStartPos p |> Log.propStartEtag p | None -> log
let log = log |> Log.prop "_etag" tip._etag |> Log.prop "n" tip.n
@@ -654,7 +678,7 @@ module internal Tip =
| ReadResult.NotFound -> return Result.NotFound
| ReadResult.Found tip -> return Result.Found (Position.fromTip tip, Enum.EventsAndUnfolds tip |> Array.ofSeq) }
- module internal Query =
+module internal Query =
open Microsoft.Azure.Documents.Linq
open FSharp.Control
let private mkQuery (container : Container, stream : string) maxItems (direction: Direction) startPos =
@@ -670,15 +694,15 @@ module internal Tip =
container.Client.CreateDocumentQuery(container.CollectionUri, query, qro).AsDocumentQuery()
// Unrolls the Batches in a response - note when reading backwards, the events are emitted in reverse order of index
- let private handleResponse direction (streamName: string) startPos (query: IDocumentQuery) (log: ILogger)
+ let private handleResponse direction (container : Container, streamName: string) startPos (query: IDocumentQuery) (log: ILogger)
: Async[] * Position option * float> = async {
let! ct = Async.CancellationToken
let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
let batches, ru = Array.ofSeq res, res.RequestCharge
let events = batches |> Seq.collect (fun b -> Enum.Events(b, startPos, direction)) |> Array.ofSeq
let (Log.BatchLen bytes), count = events, events.Length
- let reqMetric : Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count; ru = ru }
- let log = let evt = Log.Response (direction, reqMetric) in log |> Log.event evt
+ let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = streamName; interval = t; bytes = bytes; count = count; ru = ru }
+ let log = let evt = Log.Metric.QueryResponse (direction, reqMetric) in log |> Log.event evt
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEvents events
let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i })
(log |> (match startPos with Some pos -> Log.propStartPos pos | None -> id) |> Log.prop "bytes" bytes)
@@ -703,10 +727,10 @@ module internal Tip =
yield! loop (batchCount + 1) }
loop 0
- let private logQuery direction batchSize streamName interval (responsesCount, events : ITimelineEvent[]) n (ru: float) (log : ILogger) =
+ let private logQuery direction batchSize (container : Container, streamName) interval (responsesCount, events : ITimelineEvent[]) n (ru: float) (log : ILogger) =
let (Log.BatchLen bytes), count = events, events.Length
- let reqMetric : Log.Measurement = { stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru }
- let evt = Log.Event.Query (direction, responsesCount, reqMetric)
+ let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru }
+ let evt = Log.Metric.Query (direction, responsesCount, reqMetric)
let action = match direction with Direction.Forward -> "QueryF" | Direction.Backward -> "QueryB"
(log |> Log.prop "bytes" bytes |> Log.prop "batchSize" batchSize |> Log.event evt).Information(
"EqxCosmos {action:l} {stream} v{n} {count}/{responses} {ms}ms rc={ru}",
@@ -749,7 +773,7 @@ module internal Tip =
|> AsyncSeq.toArrayAsync
return events, maybeTipPos, ru }
let query = mkQuery (container,stream) maxItems direction startPos
- let pullSlice = handleResponse direction stream startPos
+ let pullSlice = handleResponse direction (container, stream) startPos
let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query)
let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream
let readlog = log |> Log.prop "direction" direction
@@ -758,7 +782,7 @@ module internal Tip =
let raws, decoded = (Array.map fst events), (events |> Seq.choose snd |> Array.ofSeq)
let pos = match maybeTipPos with Some p -> p | None -> Position.fromMaxIndex raws
- log |> logQuery direction maxItems stream t (!responseCount,raws) pos.index ru
+ log |> logQuery direction maxItems (container, stream) t (!responseCount,raws) pos.index ru
return pos, decoded }
let walkLazy<'event> (log : ILogger) (container,stream) retryPolicy maxItems maxRequests direction startPos
@@ -766,7 +790,7 @@ module internal Tip =
: AsyncSeq<'event[]> = asyncSeq {
let responseCount = ref 0
let query = mkQuery (container,stream) maxItems direction startPos
- let pullSlice = handleResponse direction stream startPos
+ let pullSlice = handleResponse direction (container, stream) startPos
let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query)
let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream
let mutable ru = 0.
@@ -802,7 +826,7 @@ module internal Tip =
finally
let endTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let t = StopwatchInterval(startTicks, endTicks)
- log |> logQuery direction maxItems stream t (!responseCount,allSlices.ToArray()) -1L ru }
+ log |> logQuery direction maxItems (container, stream) t (!responseCount,allSlices.ToArray()) -1L ru }
// Manages deletion of (full) Batches, maintaining ordering guarantees by never updating non-Tip batches
// Additionally, the nature of the fallback algorithm requires that deletions be carried out in sequential order so as not to leave gaps
@@ -822,14 +846,14 @@ module Prune =
let qo = Client.RequestOptions(PartitionKey = PartitionKey stream)
let! t, res = container.Client.DeleteDocumentAsync(docLink, qo, cancellationToken=ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
let rc, ms = res.RequestCharge, (let e = t.Elapsed in e.TotalMilliseconds)
- let reqMetric : Log.Measurement = { stream = stream; interval = t; bytes = -1; count = count; ru = rc }
- let log = let evt = Log.Delete reqMetric in log |> Log.event evt
+ let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = stream; interval = t; bytes = -1; count = count; ru = rc }
+ let log = let evt = Log.Metric.Delete reqMetric in log |> Log.event evt
log.Information("EqxCosmos {action:l} {id} {ms}ms rc={ru}", "Delete", id, ms, rc)
return rc
}
let log = log |> Log.prop "index" indexInclusive
let query : IDocumentQuery =
- // sort by i to guarantee we don't ever leave an observable gap in the sequence
+ // sort by i to guarantee we don't ever leave an observable gap in the sequence
let q = SqlQuerySpec("SELECT c.id, c.i, c.n FROM c ORDER by c.i")
let qro = Client.FeedOptions(PartitionKey = PartitionKey stream, MaxItemCount=Nullable maxItems)
container.Client.CreateDocumentQuery<_>(container.CollectionUri, q, qro).AsDocumentQuery()
@@ -840,8 +864,8 @@ module Prune =
let! t, (res : Client.FeedResponse<_>) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
let batches, rc, ms = Array.ofSeq res, res.RequestCharge, (let e = t.Elapsed in e.TotalMilliseconds)
let next = (match batches with [||] -> None | xs -> Some (xs.[ xs.Length - 1])) |> Option.map (fun x -> x.n) |> Option.toNullable
- let reqMetric : Log.Measurement = { stream = stream; interval = t; bytes = -1; count = batches.Length; ru = rc }
- let log = let evt = Log.PruneResponse reqMetric in log |> Log.event evt
+ let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = stream; interval = t; bytes = -1; count = batches.Length; ru = rc }
+ let log = let evt = Log.Metric.PruneResponse reqMetric in log |> Log.event evt
log.Information("EqxCosmos {action:l} {batches} {ms}ms n={next} rc={ru}", "PruneResponse", batches.Length, ms, next, rc)
return Some ((rc, batches), x)
}
@@ -887,8 +911,8 @@ module Prune =
batches <- batches + bCount
eventsDeleted <- eventsDeleted + eDel
eventsDeferred <- eventsDeferred + eDef
- let reqMetric : Log.Measurement = { stream = stream; interval = pt; bytes = eventsDeleted; count = batches; ru = queryCharges }
- let log = let evt = Log.Prune (responses, reqMetric) in log |> Log.event evt
+ let reqMetric : Log.Measurement = { database = container.DatabaseId; container = container.ContainerId; stream = stream; interval = pt; bytes = eventsDeleted; count = batches; ru = queryCharges }
+ let log = let evt = Log.Metric.Prune (responses, reqMetric) in log |> Log.event evt
let lwm = defaultArg lwm 0L // If we've seen no batches at all, then the write position is 0L
log.Information("EqxCosmos {action:l} {events}/{batches} lwm={lwm} {ms}ms queryRu={queryRu} deleteRu={deleteRu} trimRu={trimRu}",
"Prune", eventsDeleted, batches, lwm, (let e = pt.Elapsed in e.TotalMilliseconds), queryCharges, delCharges, trimCharges)
@@ -1017,7 +1041,7 @@ type private Category<'event, 'state, 'context>(gateway : Gateway, codec : IEven
| LoadFromTokenResult.Found (token', events') -> return token', fold state events' }
member __.Sync(Token.Unpack (container,stream,pos), state as current, events, mapUnfolds, fold, isOrigin, log, context, compressUnfolds): Async> = async {
let state' = fold state (Seq.ofList events)
- let encode e = codec.Encode(context,e)
+ let encode e = codec.Encode(context, e)
let exp,events,eventsEncoded,projectionsEncoded =
match mapUnfolds with
| Choice1Of3 () -> Sync.Exp.Version pos.index, events, Seq.map encode events |> Array.ofSeq, Seq.empty
@@ -1098,7 +1122,7 @@ type private Folder<'event, 'state, 'context>
| Some tokenAndState when opt = Some AllowStale -> return tokenAndState
| Some tokenAndState -> return! category.LoadFromToken tokenAndState fold isOrigin log }
member __.TrySync(log : ILogger, streamToken, state, events : 'event list, context)
- : Async> = async {
+ : Async> = async {
match! category.Sync((streamToken, state), events, mapUnfolds, fold, isOrigin, log, context, compressUnfolds) with
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
| SyncResult.Written (token',state') -> return SyncResult.Written (token',state') }
@@ -1199,7 +1223,7 @@ type Resolver<'event, 'state, 'context>
( context : Context, codec, fold, initial, caching, access,
/// Compress Unfolds in Tip. Default: true.
/// NOTE when set to false, requires Equinox.Cosmos Version >= 2.3.0 to be able to read
- ?compressUnfolds) =
+ []?compressUnfolds) =
let compressUnfolds = defaultArg compressUnfolds true
let readCacheOption =
match caching with
@@ -1283,7 +1307,7 @@ type Connector
[]?gatewayModeMaxConnectionLimit,
/// Connection mode (default: ConnectionMode.Gateway (lowest perf, least trouble))
[]?mode : ConnectionMode,
- /// consistency mode (default: ConsistencyLevel.Session)
+ /// consistency mode (default: ConsistencyLevel.Session)
[]?defaultConsistencyLevel : ConsistencyLevel,
/// Retries for read requests, over and above those defined by the mandatory policies
@@ -1422,7 +1446,7 @@ type Context
/// Reads all Events from a `Position` in a given `direction`
member __.Read(stream, ?position, ?maxCount, ?direction) : Async[]> =
- __.GetInternal((stream, position), ?maxCount=maxCount, ?direction=direction) |> yieldPositionAndData
+ __.GetInternal((stream, position), ?maxCount = maxCount, ?direction = direction) |> yieldPositionAndData
/// Appends the supplied batch of events, subject to a consistency check based on the `position`
/// Callers should implement appropriate idempotent handling, or use Equinox.Stream for that purpose
@@ -1454,7 +1478,7 @@ type Context
/// Provides mechanisms for building `EventData` records to be supplied to the `Events` API
type EventData() =
/// Creates an Event record, suitable for supplying to Append et al
- static member FromUtf8Bytes(eventType, data, ?meta) : IEventData<_> = FsCodec.Core.EventData.Create(eventType, data, ?meta=meta) :> _
+ static member FromUtf8Bytes(eventType, data, ?meta) : IEventData<_> = FsCodec.Core.EventData.Create(eventType, data, ?meta = meta) :> _
/// Api as defined in the Equinox Specification
/// Note the CosmosContext APIs can yield better performance due to the fact that a Position tracks the etag of the Stream's Tip
diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs
index 56cb5d10a..76d32a6b7 100644
--- a/src/Equinox.MemoryStore/MemoryStore.fs
+++ b/src/Equinox.MemoryStore/MemoryStore.fs
@@ -7,7 +7,7 @@ open Equinox
open Equinox.Core
open System.Runtime.InteropServices
-/// Equivalent to GetEventStore's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
+/// Equivalent to EventStoreDB's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
exception private WrongVersionException of streamName: string * expected: int * value: obj
/// Internal result used to reflect the outcome of syncing with the entry in the inner ConcurrentDictionary
@@ -20,10 +20,21 @@ type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't
/// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance
type VolatileStore<'Format>() =
- let streams = System.Collections.Concurrent.ConcurrentDictionary[]>()
+
+ let streams = System.Collections.Concurrent.ConcurrentDictionary[]>()
+
+ // Where TrySync attempts overlap on the same stream, there's a race to raise the Committed event for each 'commit' resulting from a successful Sync
+ // If we don't serialize the publishing of the events, its possible for handlers to observe the Events out of order
let committed = Event<_>()
+ // Here we neuter that effect - the BatchingGate can end up with commits submitted out of order, but we serialize the raising of the events per stream
+ let publishBatches (commits : (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[])[]) = async {
+ for streamName, events in commits |> Seq.groupBy fst do
+ committed.Trigger(streamName, events |> Seq.collect snd |> Seq.sortBy (fun x -> x.Index) |> Seq.toArray) }
+ let publishCommit = AsyncBatchingGate(publishBatches, System.TimeSpan.FromMilliseconds 2.)
[]
+ /// Notifies of a batch of events being committed to a given Stream. Guarantees no out of order and/or overlapping raising of the event
+ /// NOTE in some cases, two or more overlapping commits can be coalesced into a single Committed event
member __.Committed : IEvent[]> = committed.Publish
/// Loads state from a given stream
@@ -33,20 +44,22 @@ type VolatileStore<'Format>() =
member __.TrySync
( streamName, trySyncValue : FsCodec.ITimelineEvent<'Format>[] -> ConcurrentDictionarySyncResult[]>,
events: FsCodec.ITimelineEvent<'Format>[])
- : ConcurrentArraySyncResult[]> =
+ : Async[]>> = async {
let seedStream _streamName = events
let updateValue streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
match trySyncValue currentValue with
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue)
| ConcurrentDictionarySyncResult.Written value -> value
- try let res = streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written
- committed.Trigger((FsCodec.StreamName.parse streamName, events)) // raise here, once, as updateValue can conceptually be invoked multiple times
- res
- with WrongVersionException(_, _, conflictingValue) -> unbox conflictingValue |> Conflict
+ try let res = streams.AddOrUpdate(streamName, seedStream, updateValue)
+ // we publish the event here, once, as `updateValue` can be invoked multiple times
+ do! publishCommit.Execute((FsCodec.StreamName.parse streamName, events))
+ return Written res
+ with WrongVersionException(_, _, conflictingValue) ->
+ return Conflict (unbox conflictingValue) }
type Token = { streamVersion: int; streamName: string }
-/// Internal implementation detail of MemoryStreamStore
+/// Internal implementation detail of MemoryStore
module private Token =
let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken =
@@ -64,36 +77,37 @@ module private Token =
/// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!).
type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
- let (|Decode|) = Array.choose codec.TryDecode
interface ICategory<'event, 'state, string, 'context> with
member __.Load(_log, streamName, _opt) = async {
match store.TryLoad streamName with
| None -> return Token.ofEmpty streamName initial
- | Some (Decode events) -> return Token.ofEventArray streamName fold initial events }
+ | Some events -> return Token.ofEventArray streamName fold initial (events |> Array.choose codec.TryDecode) }
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
let inline map i (e : FsCodec.IEventData<'Format>) =
FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp)
- let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i+1) (codec.Encode(context,e))) |> Array.ofSeq
+ let encoded = events |> Seq.mapi (fun i e -> map (token.streamVersion + i + 1) (codec.Encode(context, e))) |> Array.ofSeq
let trySyncValue currentValue =
if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion)
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq)
- match store.TrySync(token.streamName, trySyncValue, encoded) with
+ match! store.TrySync(token.streamName, trySyncValue, encoded) with
+ | ConcurrentArraySyncResult.Written _ ->
+ return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events
| ConcurrentArraySyncResult.Conflict conflictingEvents ->
let resync = async {
let version = Token.tokenOfArray token.streamName conflictingEvents
let successorEvents = conflictingEvents |> Seq.skip (token.streamVersion + 1) |> List.ofSeq
return version, fold state (successorEvents |> Seq.choose codec.TryDecode) }
- return SyncResult.Conflict resync
- | ConcurrentArraySyncResult.Written _ -> return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events }
+ return SyncResult.Conflict resync }
type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial)
let resolveStream streamName context = Stream.create category streamName None context
+
member __.Resolve(streamName : FsCodec.StreamName, [] ?option, [] ?context : 'context) =
match FsCodec.StreamName.toString streamName, option with
- | sn, (None|Some AllowStale) -> resolveStream sn context
+ | sn, (None | Some AllowStale) -> resolveStream sn context
| sn, Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn context)
/// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento]
member __.FromMemento(Token.Unpack stream as streamToken, state, ?context) =
- Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context)
+ Stream.ofMemento (streamToken, state) (resolveStream stream.streamName context)
diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
index 42e1c5fb2..ca55cf77a 100644
--- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
+++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
@@ -256,7 +256,7 @@ type Tests(testOutputHelper) =
verifyCorrectEvents 0L expected res
test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @>
let queryRoundTripsAndItemCounts = function
- | EqxEvent (Equinox.Cosmos.Store.Log.Event.Query (Equinox.Cosmos.Store.Direction.Forward, responses, { count = c })) -> Some (responses,c)
+ | EqxEvent (Equinox.Cosmos.Store.Log.Metric.Query (Equinox.Cosmos.Store.Direction.Forward, responses, { count = c })) -> Some (responses,c)
| _ -> None
// validate that, because we stopped after 1 item, we only needed one trip (which contained 4 events)
[1,4] =! capture.ChooseCalls queryRoundTripsAndItemCounts
@@ -322,7 +322,7 @@ type Tests(testOutputHelper) =
test <@ [EqxAct.ResponseBackward; EqxAct.QueryBackward] = capture.ExternalCalls @>
// validate that, despite only requesting max 1 item, we only needed one trip, bearing 5 items (from which one item was omitted)
let queryRoundTripsAndItemCounts = function
- | EqxEvent (Equinox.Cosmos.Store.Log.Event.Query (Equinox.Cosmos.Store.Direction.Backward, responses, { count = c })) -> Some (responses,c)
+ | EqxEvent (Equinox.Cosmos.Store.Log.Metric.Query (Equinox.Cosmos.Store.Direction.Backward, responses, { count = c })) -> Some (responses,c)
| _ -> None
[1,5] =! capture.ChooseCalls queryRoundTripsAndItemCounts
verifyRequestChargesMax 4 // 3.24 // WAS 3 // 2.98
diff --git a/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs b/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs
index 42e4b4e50..c448016df 100644
--- a/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs
+++ b/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs
@@ -59,42 +59,45 @@ module SerilogHelpers =
| Append | Resync | Conflict
| PruneResponse | Delete | Prune
let (|EqxAction|) = function
- | Event.Tip _ -> EqxAct.Tip
- | Event.TipNotFound _ -> EqxAct.TipNotFound
- | Event.TipNotModified _ -> EqxAct.TipNotModified
- | Event.Response (Direction.Forward,_) -> EqxAct.ResponseForward
- | Event.Response (Direction.Backward,_) -> EqxAct.ResponseBackward
- | Event.Query (Direction.Forward,_,_) -> EqxAct.QueryForward
- | Event.Query (Direction.Backward,_,_) -> EqxAct.QueryBackward
- | Event.SyncSuccess _ -> EqxAct.Append
- | Event.SyncResync _ -> EqxAct.Resync
- | Event.SyncConflict _ -> EqxAct.Conflict
- | Event.PruneResponse _ -> EqxAct.PruneResponse
- | Event.Delete _ -> EqxAct.Delete
- | Event.Prune _ -> EqxAct.Prune
- let inline (|Stats|) ({ ru = ru }: Equinox.Cosmos.Store.Log.Measurement) = ru
- let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|CosmosDeleteRc|CosmosPruneRc|) = function
- | Event.Tip (Stats s)
- | Event.TipNotFound (Stats s)
- | Event.TipNotModified (Stats s)
- // slices are rolled up into batches so be sure not to double-count
- | Event.PruneResponse (Stats s)
- | Event.Response (_,Stats s) -> CosmosResponseRc s
- | Event.Query (_,_, (Stats s)) -> CosmosReadRc s
- | Event.SyncSuccess (Stats s)
- | Event.SyncConflict (Stats s) -> CosmosWriteRc s
- | Event.SyncResync (Stats s) -> CosmosResyncRc s
- | Event.Delete (Stats s) -> CosmosDeleteRc s
- | Event.Prune (_, (Stats s)) -> CosmosPruneRc s
+ | Metric.Tip _ -> EqxAct.Tip
+ | Metric.TipNotFound _ -> EqxAct.TipNotFound
+ | Metric.TipNotModified _ -> EqxAct.TipNotModified
+
+ | Metric.Query (Direction.Forward, _, _) -> EqxAct.QueryForward
+ | Metric.Query (Direction.Backward, _, _) -> EqxAct.QueryBackward
+ | Metric.QueryResponse (Direction.Forward, _) -> EqxAct.ResponseForward
+ | Metric.QueryResponse (Direction.Backward, _) -> EqxAct.ResponseBackward
+
+ | Metric.SyncSuccess _ -> EqxAct.Append
+ | Metric.SyncResync _ -> EqxAct.Resync
+ | Metric.SyncConflict _ -> EqxAct.Conflict
+
+ | Metric.Prune _ -> EqxAct.Prune
+ | Metric.PruneResponse _ -> EqxAct.PruneResponse
+ | Metric.Delete _ -> EqxAct.Delete
+ let (|Load|Write|Resync|Prune|Delete|Response|) = function
+ | Metric.Tip s
+ | Metric.TipNotFound s
+ | Metric.TipNotModified s
+
+ | Metric.Query (_, _, s) -> Load s
+ | Metric.QueryResponse (_, s) -> Response s
+
+ | Metric.SyncSuccess s
+ | Metric.SyncConflict s -> Write s
+ | Metric.SyncResync s -> Resync s
+
+ | Metric.Prune (_, s) -> Prune s
+ | Metric.PruneResponse s -> Response s
+ | Metric.Delete s -> Delete s
+ let inline (|Rc|) ({ ru = ru }: Equinox.Cosmos.Store.Log.Measurement) = ru
/// Facilitates splitting between events with direct charges vs synthetic events Equinox generates to avoid double counting
- let (|CosmosRequestCharge|EquinoxChargeRollup|) = function
- | CosmosResponseRc _ ->
- EquinoxChargeRollup
- | CosmosReadRc rc | CosmosWriteRc rc | CosmosResyncRc rc | CosmosDeleteRc rc | CosmosPruneRc rc as e ->
- CosmosRequestCharge (e,rc)
- let (|EqxEvent|_|) (logEvent : LogEvent) : Equinox.Cosmos.Store.Log.Event option =
+ let (|TotalRequestCharge|ResponseBreakdown|) = function
+ | Load (Rc rc) | Write (Rc rc) | Resync (Rc rc) | Delete (Rc rc) | Prune (Rc rc) as e -> TotalRequestCharge (e, rc)
+ | Response _ -> ResponseBreakdown
+ let (|EqxEvent|_|) (logEvent : LogEvent) : Equinox.Cosmos.Store.Log.Metric option =
logEvent.Properties.Values |> Seq.tryPick (function
- | SerilogScalar (:? Equinox.Cosmos.Store.Log.Event as e) -> Some e
+ | SerilogScalar (:? Equinox.Cosmos.Store.Log.Metric as e) -> Some e
| _ -> None)
let (|HasProp|_|) (name : string) (e : LogEvent) : LogEventPropertyValue option =
@@ -113,7 +116,7 @@ module SerilogHelpers =
member __.Clear () = captured.Clear()
member __.ChooseCalls chooser = captured |> Seq.choose chooser |> List.ofSeq
member __.ExternalCalls = __.ChooseCalls (function EqxEvent (EqxAction act) -> Some act | _ -> None)
- member __.RequestCharges = __.ChooseCalls (function EqxEvent (CosmosRequestCharge e) -> Some e | _ -> None)
+ member __.RequestCharges = __.ChooseCalls (function EqxEvent (TotalRequestCharge e) -> Some e | _ -> None)
type TestsWithLogCapture(testOutputHelper) =
let log, capture = TestsWithLogCapture.CreateLoggerWithCapture testOutputHelper
@@ -124,6 +127,7 @@ type TestsWithLogCapture(testOutputHelper) =
let capture = LogCaptureBuffer()
let logger =
Serilog.LoggerConfiguration()
+ .MinimumLevel.Debug()
.WriteTo.Seq("http://localhost:5341")
.WriteTo.Sink(testOutput)
.WriteTo.Sink(capture)