-
Notifications
You must be signed in to change notification settings - Fork 70
/
CosmosStore.fs
1573 lines (1422 loc) · 105 KB
/
CosmosStore.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
namespace Equinox.CosmosStore.Core
open Equinox.Core
open FSharp.Control
open FsCodec
open Microsoft.Azure.Cosmos
open Serilog
open System
open System.Collections.Generic
open System.Text.Json
type EventBody = JsonElement
/// A single Domain Event from the array held in a Batch
[<NoEquality; NoComparison>]
type Event = // TODO for STJ v5: All fields required unless explicitly optional
{ /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
t: DateTimeOffset // ISO 8601
/// The Case (Event Type); used to drive deserialization
c: string // required
/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for CosmosDB
d: EventBody // TODO for STJ v5: Required, but can be null so Nullary cases can work
/// Optional metadata, as UTF-8 encoded json, ready to emit directly
m: EventBody // TODO for STJ v5: Optional, not serialized if missing
/// Optional correlationId
correlationId: string // TODO for STJ v5: Optional, not serialized if missing
/// Optional causationId
causationId: string } // TODO for STJ v5: Optional, not serialized if missing
interface IEventData<EventBody> with
member x.EventType = x.c
member x.Data = x.d
member x.Meta = x.m
member _.EventId = Guid.Empty
member x.CorrelationId = x.correlationId
member x.CausationId = x.causationId
member x.Timestamp = x.t
/// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds)
[<NoEquality; NoComparison>]
type Batch = // TODO for STJ v5: All fields required unless explicitly optional
{ /// CosmosDB-mandated Partition Key, must be maintained within the document
/// Not actually required if running in single partition mode, but for simplicity, we always write it
p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
/// CosmosDB-mandated unique row key; needs to be unique within any partition it is maintained; must be string
/// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it
/// NB Tip uses a well known value here while it's actively 'open'
id: string // "{index}"
/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
_etag: string // TODO for STJ v5: Optional, not serialized if missing
/// base 'i' value for the Events held herein
i: int64 // {index}
// `i` value for successor batch (to facilitate identifying which Batch a given startPos is within)
n: int64 // {index}
/// The Domain Events (as opposed to Unfolded Events, see Tip) at this offset in the stream
e: Event[] }
static member internal PartitionKeyField = "p"
// As one cannot sort by the (mandatory) `id` field, we have an indexed `i` field for sort and range query use
// NB its critical to also index the nominated PartitionKey field as RU costs increase (things degrade to scans) if you don't
// TOCONSIDER: indexing strategy was developed and tuned before composite key extensions in ~2021, which might potentially be more efficient
// a decent attempt at https://github.com/jet/equinox/issues/274 failed, but not 100% sure it's fundamentally impossible/wrong
static member internal IndexedPaths = [| Batch.PartitionKeyField; "i"; "n" |] |> Array.map (fun k -> $"/%s{k}/?")
/// Compaction/Snapshot/Projection Event based on the state at a given point in time `i`
[<NoEquality; NoComparison>]
type Unfold =
{ /// Base: Stream Position (Version) of State from which this Unfold Event was generated. An unfold from State Version 1 is i=1 and includes event i=1
i: int64
/// Generation datetime
t: DateTimeOffset // ISO 8601 // Not written by versions <= 2.0.0-rc9
/// The Case (Event Type) of this compaction/snapshot, used to drive deserialization
c: string // required
/// Event body - Json -> Deflate -> Base64 -> JsonElement
[<Serialization.JsonConverter(typeof<JsonCompressedBase64Converter>)>]
d: EventBody // required
/// Optional metadata, same encoding as `d` (can be null; not written if missing)
[<Serialization.JsonConverter(typeof<JsonCompressedBase64Converter>)>]
m: EventBody } // TODO for STJ v5: Optional, not serialized if missing
// Arrays are not indexed by default. 1. enable filtering by `c`ase 2. index uncompressed fields within unfolds for filtering
static member internal IndexedPaths = [| "/u/[]/c/?"; "/u/[]/d/*" |]
/// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document
/// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`)
/// NB the type does double duty as a) model for when we read it b) encoding a batch being sent to the stored proc
[<NoEquality; NoComparison>]
type Tip = // TODO for STJ v5: All fields required unless explicitly optional
{ /// Partition key, as per Batch
p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
/// Document Id within partition, as per Batch
id: string // "{-1}" - Well known IdConstant used while this remains the pending batch
/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
_etag: string // TODO for STJ v5: Optional, not serialized if missing
/// base 'i' value for the Events held herein
i: int64
/// `i` value for successor batch (to facilitate identifying which Batch a given startPos is within)
n: int64 // {index}
/// Domain Events, will eventually move out to a Batch
e: Event[]
/// Compaction/Snapshot/Projection events - owned and managed by the sync stored proc
u: Unfold[] }
static member internal WellKnownDocumentId = "-1"
/// Position and Etag to which an operation is relative
[<NoComparison; NoEquality>]
type Position = { index: int64; etag: string option }
module internal Position =
let fromEtagOnly (value: string) = { index = -2; etag = Some value }
let fromEtagAndIndex (etag, n) = { index = n; etag = Some etag }
/// NB a token without an etag is inefficient compared to fromEtagAndIndex, so paths to this should be minimized
let fromIndex (x: int64) = { index = x; etag = None }
/// If we have strong reason to suspect a stream is empty, we won't have an etag (and Writer Stored Procedure special cases this)
let fromKnownEmpty = fromIndex 0L
/// Blind append mode
let fromAppendAtEnd = fromIndex -1L // sic - needs to yield value -1 to trigger stored proc logic
/// Sentinel value we assign so we can reject attempts to sync without having known the context
let readOnly = { index = -2L; etag = None }
let isReadOnly (x: Position) = x.index = -2L && Option.isNone x.etag
/// If we encounter the tip (id=-1) item/document, we're interested in its etag so we can re-sync for 1 RU
let tryFromBatch (x: Batch) =
if x.id <> Tip.WellKnownDocumentId then None
else Some { index = x.n; etag = Option.ofObj x._etag }
[<RequireQualifiedAccess>]
type Direction = Forward | Backward override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward"
[<AbstractClass; Sealed>]
type internal Enum private () =
static member Events(i: int64, e: Event[], ?minIndex, ?maxIndex): ITimelineEvent<EventBody> seq = seq {
let indexMin, indexMax = defaultArg minIndex 0L, defaultArg maxIndex Int64.MaxValue
for offset in 0..e.Length-1 do
let index = i + int64 offset
// If we're loading from a nominated position, we need to discard items in the batch before/after the start on the start page
if index >= indexMin && index < indexMax then
let x = e[offset]
yield FsCodec.Core.TimelineEvent.Create(index, x.c, x.d, x.m, Guid.Empty, x.correlationId, x.causationId, x.t) }
static member internal Events(t: Tip, ?minIndex, ?maxIndex): ITimelineEvent<EventBody> seq =
Enum.Events(t.i, t.e, ?minIndex = minIndex, ?maxIndex = maxIndex)
static member internal Events(b: Batch, ?minIndex, ?maxIndex) =
Enum.Events(b.i, b.e, ?minIndex = minIndex, ?maxIndex = maxIndex)
static member Unfolds(xs: Unfold[]): ITimelineEvent<EventBody> seq = seq {
for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, x.d, x.m, Guid.Empty, null, null, x.t, isUnfold = true) }
static member EventsAndUnfolds(x: Tip, ?minIndex, ?maxIndex): ITimelineEvent<EventBody> seq =
Enum.Events(x, ?minIndex = minIndex, ?maxIndex = maxIndex)
|> Seq.append (Enum.Unfolds x.u)
// where Index is equal, unfolds get delivered after the events so the fold semantics can be 'idempotent'
|> Seq.sortBy (fun x -> x.Index, x.IsUnfold)
type IRetryPolicy = abstract member Execute: (int -> CancellationToken -> Task<'T>) -> Task<'T>
module Log =
/// <summary>Name of Property used for <c>Metric</c> in <c>LogEvent</c>s.</summary>
let [<Literal>] PropertyTag = "cosmosEvt"
[<NoEquality; NoComparison>]
type Measurement =
{ database: string; container: string; stream: string
interval: StopwatchInterval; bytes: int; count: int; ru: float }
member x.Category = x.stream |> StreamName.Internal.trust |> StreamName.Category.ofStreamName
[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Metric =
/// Individual read request for the Tip
| Tip of Measurement
/// Individual read request for the Tip, not found
| TipNotFound of Measurement
/// Tip read with Single RU Request Charge due to correct use of etag in cache
| TipNotModified of Measurement
/// Summarizes a set of Responses for a given Read request
| Query of Direction * responses: int * Measurement
/// Individual read request in a Batch
/// Charges are rolled up into Query Metric (so do not double count)
| QueryResponse of Direction * Measurement
| SyncSuccess of Measurement
| SyncResync of Measurement
| SyncConflict of Measurement
/// Summarizes outcome of request to trim batches from head of a stream and events in Tip
/// Count in Measurement is number of batches (documents) deleted
/// Bytes in Measurement is number of events deleted
| Prune of responsesHandled: int * Measurement
/// Handled response from listing of batches in a stream
/// Charges are rolled up into the Prune Metric (so do not double count)
| PruneResponse of Measurement
/// Deleted an individual Batch
| Delete of Measurement
/// Trimmed the Tip
| Trim of Measurement
let [<return: Struct>] (|MetricEvent|_|) (logEvent: Serilog.Events.LogEvent): Metric voption =
let mutable p = Unchecked.defaultof<_>
logEvent.Properties.TryGetValue(PropertyTag, &p) |> ignore
match p with Log.ScalarValue (:? Metric as e) -> ValueSome e | _ -> ValueNone
/// Attach a property to the captured event record to hold the metric information
let internal event (value: Metric) = Internal.Log.withScalarProperty PropertyTag value
let internal prop name value (log: ILogger) = log.ForContext(name, value)
let internal propData name (events: #IEventData<EventBody> seq) (log: ILogger) =
let render (body: EventBody) = body.GetRawText()
let items = seq { for e in events do yield sprintf "{\"%s\": %s}" e.EventType (render e.Data) }
log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items))
let internal propEvents = propData "events"
let internal propDataUnfolds = Enum.Unfolds >> propData "unfolds"
let internal propStartPos (value: Position) log = prop "startPos" value.index log
let internal propStartEtag (value: Position) log = prop "startEtag" value.etag log
let withLoggedRetries<'t> (retryPolicy: IRetryPolicy option) (contextLabel: string) (f: ILogger -> CancellationToken -> Task<'t>) log ct: Task<'t> =
match retryPolicy with
| None -> f log ct
| Some retryPolicy ->
let withLoggingContextWrapping count =
let log = if count = 1 then log else log |> prop contextLabel count
f log
retryPolicy.Execute withLoggingContextWrapping
let internal (|BlobLen|) (x: EventBody) = if x.ValueKind = JsonValueKind.Null then 0 else x.GetRawText().Length
let internal eventLen (x: #IEventData<_>) = let BlobLen bytes, BlobLen metaBytes = x.Data, x.Meta in bytes + metaBytes + 80
let internal batchLen = Seq.sumBy eventLen
[<RequireQualifiedAccess>]
type Operation = Tip | Tip404 | Tip304 | Query | Write | Resync | Conflict | Prune | Delete | Trim
let (|Op|QueryRes|PruneRes|) = function
| Metric.Tip s -> Op (Operation.Tip, s)
| Metric.TipNotFound s -> Op (Operation.Tip404, s)
| Metric.TipNotModified s -> Op (Operation.Tip304, s)
| Metric.Query (_, _, s) -> Op (Operation.Query, s)
| Metric.QueryResponse (direction, s) -> QueryRes (direction, s)
| Metric.SyncSuccess s -> Op (Operation.Write, s)
| Metric.SyncResync s -> Op (Operation.Resync, s)
| Metric.SyncConflict s -> Op (Operation.Conflict, s)
| Metric.Prune (_, s) -> Op (Operation.Prune, s)
| Metric.PruneResponse s -> PruneRes s
| Metric.Delete s -> Op (Operation.Delete, s)
| Metric.Trim s -> Op (Operation.Trim, s)
/// NB Caveat emptor; this is subject to unlimited change without the major version changing - while the `dotnet-templates` repo will be kept in step, and
/// the ChangeLog will mention changes, it's critical to not assume that the presence or nature of these helpers be considered stable
module InternalMetrics =
module Stats =
type internal Counter =
{ mutable minRu: float; mutable maxRu: float; mutable rux100: int64; mutable count: int64; mutable ms: int64 }
static member Create() = { minRu = Double.MaxValue; maxRu = 0; rux100 = 0L; count = 0L; ms = 0L }
member x.Ingest(ms, ru) =
Interlocked.Increment(&x.count) |> ignore
Interlocked.Add(&x.rux100, int64 (ru * 100.)) |> ignore
if ru < x.minRu then Interlocked.CompareExchange(&x.minRu, ru, x.minRu) |> ignore
if ru > x.maxRu then Interlocked.CompareExchange(&x.maxRu, ru, x.maxRu) |> ignore
Interlocked.Add(&x.ms, ms) |> ignore
type internal Counters() =
let buckets = System.Collections.Concurrent.ConcurrentDictionary<string, Counter>()
let create (_name: string) = Counter.Create()
member _.Ingest(bucket, ms, ru) = buckets.GetOrAdd(bucket, create).Ingest(ms, ru)
member _.Buckets = buckets.Keys
member _.TryBucket bucket = match buckets.TryGetValue bucket with true, t -> Some t | false, _ -> None
type Epoch() =
let epoch = System.Diagnostics.Stopwatch.StartNew()
member val internal Tip = Counters() with get, set
member val internal Read = Counters() with get, set
member val internal Write = Counters() with get, set
member val internal Resync = Counters() with get, set
member val internal Conflict = Counters() with get, set
member val internal Prune = Counters() with get, set
member val internal Delete = Counters() with get, set
member val internal Trim = Counters() with get, set
member _.Stop() = epoch.Stop()
member _.Elapsed = epoch.Elapsed
type LogSink(categorize) =
let bucket (x: Measurement) = if categorize then $"{x.container}/{x.Category}" else x.container
let (|BucketMsRu|) ({ interval = i; ru = ru } as m) = bucket m, int64 i.ElapsedMilliseconds, ru
static let mutable epoch = Epoch()
new() = LogSink(false)
static member Restart() =
let fresh = Epoch()
let outgoing = Interlocked.Exchange(&epoch, fresh)
outgoing.Stop()
outgoing
interface Serilog.Core.ILogEventSink with
member _.Emit logEvent =
match logEvent with
| MetricEvent cm ->
match cm with
| Op ((Operation.Tip | Operation.Tip404 | Operation.Tip304), BucketMsRu m) ->
epoch.Tip.Ingest m
| Op (Operation.Query, BucketMsRu m) -> epoch.Read.Ingest m
| QueryRes (_direction, _) -> ()
| Op (Operation.Write, BucketMsRu m) -> epoch.Write.Ingest m
| Op (Operation.Conflict, BucketMsRu m) -> epoch.Conflict.Ingest m
| Op (Operation.Resync, BucketMsRu m) -> epoch.Resync.Ingest m
| Op (Operation.Prune, BucketMsRu m) -> epoch.Prune.Ingest m
| PruneRes _ -> ()
| Op (Operation.Delete, BucketMsRu m) -> epoch.Delete.Ingest m
| Op (Operation.Trim, BucketMsRu m) -> epoch.Trim.Ingest m
| _ -> ()
/// Relies on feeding of metrics from Log through to Stats.LogSink
/// Use Stats.LogSink.Restart() to reset the start point (and stats) where relevant
let dump (log: ILogger) =
let res = Stats.LogSink.Restart()
let stats =
[|nameof res.Tip, res.Tip
nameof res.Read, res.Read
nameof res.Write, res.Write
nameof res.Resync, res.Resync
nameof res.Conflict, res.Conflict
nameof res.Prune, res.Prune
nameof res.Delete, res.Delete
nameof res.Trim, res.Trim |]
let isRead = function nameof res.Tip | nameof res.Read | nameof res.Prune -> true | _ -> false
let buckets = stats |> Seq.collect (fun (_n, stat) -> stat.Buckets) |> Seq.distinct |> Seq.sort |> Seq.toArray
if Array.isEmpty buckets then () else
let maxBucketLen = buckets |> Seq.map _.Length |> Seq.max
let duration = res.Elapsed.TotalSeconds
let mutable prevCat, catR, catW, catRRu, catWRu = null, 0L, 0L, 0., 0.
let inline rps count = if duration = 0 then 0L else float count/duration |> int64
let inline ups ru = if duration = 0 then 0. else ru/duration
let logOnCatChange (cat: string) =
if prevCat = null then prevCat <- cat
elif prevCat = cat then ()
else
let reqs = catR + catW
log.Information("{bucket} {count,6}r @ {rps,5:f0} r/s {rups,5:f0} RU/s ({rrups:f0}{r:l}/{wrups:f0}{w:l})",
prevCat.PadRight maxBucketLen, reqs, rps reqs, ups (catRRu + catWRu), ups catRRu, "R", ups catWRu, "W")
catR <- 0; catRRu <- 0; catW <- 0; catWRu <- 0; prevCat <- cat
for bucket in buckets do
let group = match bucket.IndexOf '/' with -1 -> bucket | i -> bucket.Substring(0, i)
group |> logOnCatChange
for act, counts in stats do
match counts.TryBucket bucket with
| Some stat when stat.count <> 0L ->
let ru = float stat.rux100 / 100.
let rut = if isRead act then catR <- catR + stat.count; catRRu <- catRRu + ru; "R"
else catW <- catW + stat.count; catWRu <- catWRu + ru; "W"
let inline avg x = x / float stat.count
log.Information("{bucket} {act,-8}{count,6}r {minRu,5:f1}-{maxRu,3:f0} {rut:l}RU{lat,5:f0} ms @ {rps,5:f0} r/s {rups,5:f0} RU/s Σ {ru,7:f0} avg={avgRu,4:f1}",
bucket.PadRight maxBucketLen, act, stat.count, stat.minRu, stat.maxRu, rut, avg (float stat.ms), rps stat.count, ups ru, ru, avg ru)
| _ -> ()
null |> logOnCatChange
[<AutoOpen>]
module private MicrosoftAzureCosmosWrappers =
type ReadResult<'T> = Found of 'T | NotFound | NotModified
type Container with
member private container.DeserializeResponseBody<'T>(rm: ResponseMessage): 'T =
rm.EnsureSuccessStatusCode().Content
|> container.Database.Client.ClientOptions.Serializer.FromStream<'T>
member container.TryReadItem(partitionKey: PartitionKey, id: string, ct, ?options: ItemRequestOptions): Task<float * ReadResult<'T>> = task {
use! rm = container.ReadItemStreamAsync(id, partitionKey, requestOptions = Option.toObj options, cancellationToken = ct)
return rm.Headers.RequestCharge, rm.StatusCode |> function
| System.Net.HttpStatusCode.NotFound -> NotFound
| System.Net.HttpStatusCode.NotModified -> NotModified
| _ -> container.DeserializeResponseBody<'T>(rm) |> Found }
// NB don't nest in a private module, or serialization will fail miserably ;)
[<CLIMutable; NoEquality; NoComparison>]
type SyncResponse = { etag: string; n: int64; conflicts: Unfold[]; e: Event[] }
module internal SyncStoredProc =
let [<Literal>] name = "EquinoxEventsInTip4" // NB need to rename/number for any breaking change
let [<Literal>] body = """
// Manages the merging of the supplied Request Batch into the stream, potentially storing events in the Tip
// 0 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index)
// High level end-states:
// 1a if there is a Tip, but are only changes to the `u`nfolds (and no `e`vents) -> update Tip only
// 1b if there is a Tip, but incoming request includes an event -> generate a batch document + create empty Tip
// 2a if stream empty, but incoming request includes an event -> generate a batch document + create empty Tip
// 2b if no current Tip, and no events being written -> the incoming `req` becomes the Tip batch
function sync(req, expIndex, expEtag, maxEventsInTip, maxStringifyLen) {
if (!req) throw new Error("Missing req argument");
const collectionLink = __.getSelfLink();
const response = getContext().getResponse();
// Locate the Tip (-1) batch for this stream (which may not exist)
const tipDocId = __.getAltLink() + "/docs/" + req.id;
const isAccepted = __.readDocument(tipDocId, {}, function (err, current) {
// Verify we dont have a conflicting write
if (expIndex === -1) {
// For Any mode, we always do an append operation
executeUpsert(current);
} else if (!current && ((expIndex === -2 && expEtag !== null) || expIndex > 0)) {
// If there is no Tip page, the writer has no possible reason for writing at an index other than zero, and an etag exp must be fulfilled
response.setBody({ etag: null, n: 0, conflicts: [], e: [] });
} else if (current && ((expIndex === -2 && expEtag !== current._etag) || (expIndex !== -2 && expIndex !== current.n))) {
// Where possible, we extract conflicting events from e and/or u in order to avoid another read cycle;
// yielding [] triggers the client to go loading the events itself
// if we're working based on etags, the `u`nfolds likely bear relevant info as state-bearing unfolds
const recentEvents = expIndex < current.i ? [] : current.e.slice(expIndex - current.i);
response.setBody({ etag: current._etag, n: current.n, conflicts: current.u || [], e: recentEvents });
} else {
executeUpsert(current);
}
});
if (!isAccepted) throw new Error("readDocument not Accepted");
function executeUpsert(tip) {
function callback(err, doc) {
if (err) throw err;
response.setBody({ etag: doc._etag, n: doc.n, conflicts: null, e: [] });
}
function shouldCalveBatch(events) {
return events.length > maxEventsInTip || JSON.stringify(events).length > maxStringifyLen;
}
if (tip) {
Array.prototype.push.apply(tip.e, req.e);
tip.n = tip.i + tip.e.length;
// If there are events, calve them to their own batch (this behavior is to simplify CFP consumer impl)
if (shouldCalveBatch(tip.e)) {
const batch = { id: tip.i.toString(), p: tip.p, i: tip.i, n: tip.n, e: tip.e }
const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to remove Tip markings.");
tip.i = tip.n;
tip.e = [];
}
// TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches
// Replace all the unfolds
// TODO: should remove only unfolds being superseded
tip.u = req.u;
// As we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place
const isAccepted = __.replaceDocument(tip._self, tip, { etag: tip._etag }, callback);
if (!isAccepted) throw new Error("Unable to replace Tip batch.");
} else {
// NOTE we write the batch first (more consistent RU cost than writing tip first)
if (shouldCalveBatch(req.e)) {
const batch = { id: "0", p: req.p, i: 0, n: req.e.length, e: req.e };
const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to create Batch 0.");
req.i = batch.n;
req.e = [];
} else {
req.i = 0;
}
req.n = req.i + req.e.length;
const isAccepted = __.createDocument(collectionLink, req, { disableAutomaticIdGeneration: true }, callback);
if (!isAccepted) throw new Error("Unable to create Tip batch.");
}
}
}"""
[<RequireQualifiedAccess>]
type internal SyncExp private = Version of int64 | Etag of string | Any
module internal SyncExp =
let fromVersion i =
if i < 0L then raise <| ArgumentOutOfRangeException(nameof i, i, "must be >= 0")
SyncExp.Version i
let fromVersionOrAppendAtEnd = function -1L -> SyncExp.Any | i -> fromVersion i
let fromEtag etag = SyncExp.Etag etag
module internal Sync =
// NB don't nest in a private module, or serialization will fail miserably ;)
[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Result =
| Written of Position
| Conflict of Position * events: ITimelineEvent<EventBody>[]
| ConflictUnknown of Position
let private run (container: Container, stream: string) (maxEventsInTip, maxStringifyLen) (exp, req: Tip, ct)
: Task<float*Result> = task {
let ep =
match exp with
| SyncExp.Version ev -> Position.fromIndex ev
| SyncExp.Etag et -> Position.fromEtagOnly et
| SyncExp.Any -> Position.fromAppendAtEnd
let args = [| box req; box ep.index; box (Option.toObj ep.etag); box maxEventsInTip; box maxStringifyLen |]
let! (res: Scripts.StoredProcedureExecuteResponse<SyncResponse>) =
container.Scripts.ExecuteStoredProcedureAsync<SyncResponse>(SyncStoredProc.name, PartitionKey stream, args, cancellationToken = ct)
let newPos = { index = res.Resource.n; etag = Option.ofObj res.Resource.etag }
match res.Resource.conflicts with
| null -> return res.RequestCharge, Result.Written newPos
// ConflictUnknown is to be yielded if we believe querying is going to be necessary (as there are no unfolds, and no relevant events in the Tip)
| [||] when res.Resource.e.Length = 0 && newPos.index > ep.index ->
return res.RequestCharge, Result.ConflictUnknown newPos
| unfolds -> // stored proc only returns events and unfolds with index >= req.i - no need to trim to a minIndex
let events = (Enum.Events(ep.index, res.Resource.e), Enum.Unfolds unfolds) ||> Seq.append |> Array.ofSeq
return res.RequestCharge, Result.Conflict (newPos, events) }
let private logged (container, stream) (maxEventsInTip, maxStringifyLen) (exp: SyncExp, req: Tip) (log: ILogger) ct: Task<Result> = task {
let! t, (ru, result) = (fun ct -> run (container, stream) (maxEventsInTip, maxStringifyLen) (exp, req, ct)) |> Stopwatch.time ct
let verbose = log.IsEnabled Serilog.Events.LogEventLevel.Debug
let count, bytes = req.e.Length, if verbose then Enum.Events req |> Log.batchLen else 0
let log =
let inline mkMetric ru: Log.Measurement =
{ database = container.Database.Id; container = container.Id; stream = stream; interval = t; bytes = bytes; count = count; ru = ru }
let inline propConflict log = log |> Log.prop "conflict" true |> Log.prop "eventTypes" (Seq.truncate 5 (seq { for x in req.e -> x.c }))
if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log
|> match exp with
| SyncExp.Etag et -> Log.prop "expectedEtag" et
| SyncExp.Version ev -> Log.prop "expectedVersion" ev
| SyncExp.Any -> Log.prop "expectedVersion" -1
|> match result with
| Result.Written pos ->
Log.prop "nextExpectedVersion" pos >> Log.event (Log.Metric.SyncSuccess (mkMetric ru))
| Result.ConflictUnknown pos' ->
Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.Metric.SyncConflict (mkMetric ru))
| Result.Conflict (pos', xs) ->
if verbose then Log.propData "conflicts" xs else id
>> Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.Metric.SyncResync (mkMetric ru))
log.Information("EqxCosmos {action:l} {stream} {count}+{ucount} {ms:f1}ms {ru}RU {bytes:n0}b {exp}",
"Sync", stream, count, req.u.Length, t.ElapsedMilliseconds, ru, bytes, exp)
return result }
let batch (log: ILogger) (retryPolicy, maxEventsInTip, maxStringifyLen) containerStream expBatch ct: Task<Result> =
let call = logged containerStream (maxEventsInTip, maxStringifyLen) expBatch
Log.withLoggedRetries retryPolicy "writeAttempt" call log ct
let private mkEvent (e: IEventData<_>) =
{ t = e.Timestamp
c = e.EventType; d = JsonElement.undefinedToNull e.Data; m = JsonElement.undefinedToNull e.Meta
correlationId = e.CorrelationId; causationId = e.CausationId }
let mkBatch (stream: string) (events: IEventData<_>[]) unfolds: Tip =
{ p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null
e = Array.map mkEvent events; u = unfolds }
let mkUnfold baseIndex (compressor, x: IEventData<'F>): Unfold =
{ i = baseIndex; t = x.Timestamp
c = x.EventType; d = compressor x.Data; m = compressor x.Meta }
module Initialization =
type [<RequireQualifiedAccess>] Throughput = Manual of rus: int | Autoscale of maxRus: int
type [<RequireQualifiedAccess>] Provisioning = Container of Throughput | Database of Throughput | Serverless
let private (|ThroughputProperties|) = function
| Throughput.Manual rus -> ThroughputProperties.CreateManualThroughput(rus)
| Throughput.Autoscale maxRus -> ThroughputProperties.CreateAutoscaleThroughput(maxRus)
let private createDatabaseIfNotExists (client: CosmosClient) maybeTp dName = async {
let! r = Async.call (fun ct -> client.CreateDatabaseIfNotExistsAsync(dName, throughputProperties = Option.toObj maybeTp, cancellationToken = ct))
return r.Database }
let private createOrProvisionDatabase (client: CosmosClient) dName = function
| Provisioning.Container _ | Provisioning.Serverless -> createDatabaseIfNotExists client None dName
| Provisioning.Database (ThroughputProperties tp) -> async {
let! d = createDatabaseIfNotExists client (Some tp) dName
do! Async.call (fun ct -> d.ReplaceThroughputAsync(tp, cancellationToken = ct)) |> Async.Ignore
return d }
let private createContainerIfNotExists (d: Database) cp maybeTp = async {
let! r = Async.call (fun ct -> d.CreateContainerIfNotExistsAsync(cp, throughputProperties = Option.toObj maybeTp, cancellationToken = ct))
let existed = r.StatusCode = Net.HttpStatusCode.OK
if existed then
do! Async.call (fun ct -> r.Container.ReplaceContainerAsync(cp, cancellationToken = ct)) |> Async.Ignore
return r.Container, existed }
let private createOrProvisionContainer (d: Database) (cName, pkPath, customizeContainer) mode =
let cp = ContainerProperties(id = cName, partitionKeyPath = pkPath)
customizeContainer cp
match mode with
| Provisioning.Database _ | Provisioning.Serverless -> async {
let! c, _existed = createContainerIfNotExists d cp None
return c }
| Provisioning.Container (ThroughputProperties throughput) -> async {
let! c, existed = createContainerIfNotExists d cp (Some throughput)
if existed then do! Async.call (fun ct -> c.ReplaceThroughputAsync(throughput, cancellationToken = ct)) |> Async.Ignore
return c }
let private createStoredProcIfNotExists (c: Container) (name, body) ct: Task<float> = task {
try let! r = c.Scripts.CreateStoredProcedureAsync(Scripts.StoredProcedureProperties(id = name, body = body), cancellationToken = ct)
return r.RequestCharge
with :? CosmosException as ce when ce.StatusCode = System.Net.HttpStatusCode.Conflict -> return ce.RequestCharge }
let private applyBatchAndTipContainerProperties indexUnfolds (cp: ContainerProperties) =
cp.IndexingPolicy.IndexingMode <- IndexingMode.Consistent
cp.IndexingPolicy.Automatic <- true
// We specify fields on whitelist basis; generic querying is inapplicable, and indexing is far from free (write RU and latency)
// Given how long and variable the blacklist would be, we whitelist instead
cp.IndexingPolicy.ExcludedPaths.Add(ExcludedPath(Path="/*"))
for p in [| yield! Batch.IndexedPaths; if indexUnfolds then yield! Unfold.IndexedPaths |] do
cp.IndexingPolicy.IncludedPaths.Add(IncludedPath(Path = p))
let createSyncStoredProcIfNotExists (log: ILogger option) container ct = task {
let! t, ru = createStoredProcIfNotExists container (SyncStoredProc.name, SyncStoredProc.body) |> Stopwatch.time ct
match log with
| None -> ()
| Some log -> log.Information("Created stored procedure {procName} in {ms:f1}ms {ru}RU", SyncStoredProc.name, t.ElapsedMilliseconds, ru) }
let init log (client: CosmosClient) (dName, cName) mode indexUnfolds skipStoredProc ct = task {
let! d = createOrProvisionDatabase client dName mode
let! c = createOrProvisionContainer d (cName, $"/%s{Batch.PartitionKeyField}", applyBatchAndTipContainerProperties indexUnfolds) mode
if not skipStoredProc then
do! createSyncStoredProcIfNotExists (Some log) c ct }
let private applyAuxContainerProperties (cp: ContainerProperties) =
// TL;DR no indexing of any kind; see https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet/issues/142
cp.IndexingPolicy.Automatic <- false
cp.IndexingPolicy.IndexingMode <- IndexingMode.None
let initAux (client: CosmosClient) (dName, cName) mode = async {
let! d = createOrProvisionDatabase client dName mode
return! createOrProvisionContainer d (cName, "/id", applyAuxContainerProperties) mode } // per Cosmos team, Partition Key must be "/id"
/// Per Container, we need to ensure the stored procedure has been created exactly once (per process lifetime)
type internal ContainerInitializerGuard(container: Container, ?initContainer: Container -> CancellationToken -> Task<unit>) =
let initGuard = initContainer |> Option.map (fun init -> TaskCell<unit>(init container))
member val Container = container
/// Coordinates max of one in flight call to the init logic, retrying on next request if it fails. Calls after it has succeeded noop
member _.Initialize(ct): System.Threading.Tasks.ValueTask =
match initGuard with
| Some g when not (g.IsValid()) -> g.Await(ct) |> ValueTask.ofTask |> ValueTask.ignore
| _ -> System.Threading.Tasks.ValueTask.CompletedTask
module internal Tip =
let private get (container: Container, stream: string) (maybePos: Position option) ct =
let ro = match maybePos with Some { etag = Some etag } -> ItemRequestOptions(IfNoneMatchEtag = etag) |> Some | _ -> None
container.TryReadItem<Tip>(PartitionKey stream, Tip.WellKnownDocumentId, ct, ?options = ro)
let private loggedGet (get: Container * string -> Position option -> CancellationToken -> Task<_>) (container, stream) (maybePos: Position option) (log: ILogger) ct = task {
let log = log |> Log.prop "stream" stream
let! t, (ru, res: ReadResult<Tip>) = get (container, stream) maybePos |> Stopwatch.time ct
let verbose = log.IsEnabled Events.LogEventLevel.Debug
let log bytes count (f: Log.Measurement -> _) = log |> Log.event (f { database = container.Database.Id; container = container.Id; stream = stream; interval = t; bytes = bytes; count = count; ru = ru })
match res with
| ReadResult.NotModified ->
(log 0 0 Log.Metric.TipNotModified).Information("EqxCosmos {action:l} {stream} {res} {ms:f1}ms {ru}RU", "Tip", stream, 304, t.ElapsedMilliseconds, ru)
| ReadResult.NotFound ->
(log 0 0 Log.Metric.TipNotFound).Information("EqxCosmos {action:l} {stream} {res} {ms:f1}ms {ru}RU", "Tip", stream, 404, t.ElapsedMilliseconds, ru)
| ReadResult.Found tip ->
let log =
let count, bytes = tip.u.Length, if verbose then Enum.Unfolds tip.u |> Log.batchLen else 0
log bytes count Log.Metric.Tip
let log = if verbose then log |> Log.propDataUnfolds tip.u else log
let log = match maybePos with Some p -> log |> Log.propStartPos p |> Log.propStartEtag p | None -> log
let log = log |> Log.prop "_etag" tip._etag |> Log.prop "n" tip.n
log.Information("EqxCosmos {action:l} {stream} {res} {ms:f1}ms {ru}RU", "Tip", stream, 200, t.ElapsedMilliseconds, ru)
return ru, res }
type [<RequireQualifiedAccess; NoComparison; NoEquality>] Result = NotModified | NotFound | Found of Position * i: int64 * ITimelineEvent<EventBody>[]
/// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with Result.NotModified
let tryLoad (log: ILogger) retryPolicy containerStream (maybePos: Position option, maxIndex) ct: Task<Result> = task {
let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGet get containerStream maybePos) log ct
match res with
| ReadResult.NotModified -> return Result.NotModified
| ReadResult.NotFound -> return Result.NotFound
| ReadResult.Found tip ->
let minIndex = maybePos |> Option.map _.index
return Result.Found (Position.fromEtagAndIndex (tip._etag, tip.n), tip.i, Enum.EventsAndUnfolds(tip, ?maxIndex = maxIndex, ?minIndex = minIndex) |> Array.ofSeq) }
let tryFindOrigin (tryDecode: ITimelineEvent<EventBody> -> 'event voption, isOrigin: 'event -> bool) xs =
let stack = ResizeArray()
let isOrigin' (u: ITimelineEvent<EventBody>) =
match tryDecode u with
| ValueNone -> false
| ValueSome e ->
stack.Insert(0, e) // WalkResult always renders events ordered correctly - here we're aiming to align with Enum.EventsAndUnfolds
isOrigin e
xs |> Seq.tryFindBack isOrigin', stack.ToArray()
let tryHydrate (tryDecode: ITimelineEvent<EventBody> -> 'event voption, isOrigin: 'event -> bool) (unfolds: Unfold[], etag: string voption) =
match Enum.Unfolds unfolds |> tryFindOrigin (tryDecode, isOrigin) with
| Some u, events ->
let pos = match etag with ValueSome etag -> Position.fromEtagAndIndex (etag, u.Index) | ValueNone -> Position.readOnly
ValueSome (pos, events)
| None, _ -> ValueNone
module internal Query =
let feedIteratorMapTi (map: int -> StopwatchInterval -> FeedResponse<'t> -> 'u) (query: FeedIterator<'t>) ct: IAsyncEnumerable<'u> = taskSeq {
// earlier versions, such as 3.9.0, do not implement IDisposable; see linked issue for detail on when SDK team added it
use _ = query // see https://github.com/jet/equinox/issues/225 - in the Cosmos V4 SDK, all this is managed IAsyncEnumerable
let mutable i = 0
while query.HasMoreResults do
let! t, (res: FeedResponse<'t>) = (fun ct -> query.ReadNextAsync(ct)) |> Stopwatch.time ct
yield map i t res
i <- i + 1 }
let private mkQuery (log: ILogger) (container: Container, stream: string) includeTip (maxItems: int) (direction: Direction, minIndex, maxIndex): FeedIterator<Batch> =
let order = if direction = Direction.Forward then "ASC" else "DESC"
let query =
let args = [
match minIndex with None -> () | Some x -> yield "c.n > @minPos", fun (q: QueryDefinition) -> q.WithParameter("@minPos", x)
match maxIndex with None -> () | Some x -> yield "c.i < @maxPos", fun (q: QueryDefinition) -> q.WithParameter("@maxPos", x) ]
let whereClause =
let notTip = sprintf "c.id!=\"%s\"" Tip.WellKnownDocumentId
let conditions = Seq.map fst args
if List.isEmpty args && includeTip then null
else "WHERE " + String.Join(" AND ", if includeTip then conditions else Seq.append conditions (Seq.singleton notTip))
let queryString = $"SELECT c.id, c.i, c._etag, c.n, c.e FROM c %s{whereClause} ORDER BY c.i %s{order}"
let prams = Seq.map snd args
(QueryDefinition queryString, prams) ||> Seq.fold (fun q wp -> q |> wp)
log.Debug("EqxCosmos Query {stream} {query}; n>{minIndex} i<{maxIndex}", stream, query.QueryText, Option.toNullable minIndex, Option.toNullable maxIndex)
container.GetItemQueryIterator<Batch>(query, requestOptions = QueryRequestOptions(PartitionKey = PartitionKey stream, MaxItemCount = maxItems))
// Unrolls the Batches in a response
// NOTE when reading backwards, the events are emitted in reverse Index order to suit the takeWhile consumption
let private mapPage direction (container: Container, streamName: string) (minIndex, maxIndex) (maxRequests: int option)
(log: ILogger) i t (res: FeedResponse<Batch>)
: ITimelineEvent<EventBody>[] * Position option * float =
let log = log |> Log.prop "batchIndex" i
match maxRequests with
| Some mr when i >= mr -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded"
| _ -> ()
let batches, ru = Array.ofSeq res, res.RequestCharge
let unwrapBatch (b: Batch) =
Enum.Events(b, ?minIndex = minIndex, ?maxIndex = maxIndex)
|> if direction = Direction.Backward then Seq.rev else id
let events = batches |> Seq.collect unwrapBatch |> Array.ofSeq
let verbose = log.IsEnabled Events.LogEventLevel.Debug
let count, bytes = events.Length, if verbose then events |> Log.batchLen else 0
let reqMetric: Log.Measurement = { database = container.Database.Id; container = container.Id; stream = streamName; interval = t; bytes = bytes; count = count; ru = ru }
let log = let evt = Log.Metric.QueryResponse (direction, reqMetric) in log |> Log.event evt
let log = if verbose then log |> Log.propEvents events else log
let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i })
(log|> Log.prop "bytes" bytes
|> match minIndex with None -> id | Some i -> Log.prop "minIndex" i
|> match maxIndex with None -> id | Some i -> Log.prop "maxIndex" i)
.Information("EqxCosmos {action:l} {count}/{batches} {direction} {ms:f1}ms i={index} {ru}RU",
"Response", count, batches.Length, direction, t.ElapsedMilliseconds, index, ru)
let maybePosition = batches |> Array.tryPick Position.tryFromBatch
events, maybePosition, ru
let private logQuery direction (container: Container, streamName) interval (responsesCount, events: ITimelineEvent<EventBody>[]) n (ru: float) (log: ILogger) =
let verbose = log.IsEnabled Events.LogEventLevel.Debug
let count, bytes = events.Length, if verbose then events |> Log.batchLen else 0
let reqMetric: Log.Measurement = { database = container.Database.Id; container = container.Id; stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru }
let evt = Log.Metric.Query (direction, responsesCount, reqMetric)
let action = match direction with Direction.Forward -> "QueryF" | Direction.Backward -> "QueryB"
(log |> Log.prop "bytes" bytes |> Log.event evt).Information(
"EqxCosmos {action:l} {stream} v{n} {count}/{responses} {ms:f1}ms {ru}RU",
action, streamName, n, count, responsesCount, interval.ElapsedMilliseconds, ru)
let private calculateUsedVersusDroppedPayload stopIndex (xs: ITimelineEvent<EventBody>[]): int * int =
let mutable used, dropped = 0, 0
let mutable found = false
for x in xs do
let bytes = Log.eventLen x
if found then dropped <- dropped + bytes
else used <- used + bytes
if x.Index = stopIndex then found <- true
used, dropped
[<RequireQualifiedAccess; NoComparison; NoEquality>]
type ScanResult<'event> = { found: bool; minIndex: int64; next: int64; maybeTipPos: Position option; events: 'event[] }
let scanTip (tryDecode, isOrigin) (pos: Position, i: int64, xs: #ITimelineEvent<EventBody>[]): ScanResult<'event> =
let origin, events = xs |> Tip.tryFindOrigin (tryDecode, isOrigin)
{ found = Option.isSome origin; maybeTipPos = Some pos; minIndex = i; next = pos.index + 1L; events = events }
// Yields events in ascending Index order
let scan<'event> (log: ILogger) (container, stream) includeTip (maxItems: int) maxRequests direction
(tryDecode: ITimelineEvent<EventBody> -> 'event voption, isOrigin: 'event -> bool)
(minIndex, maxIndex, ct)
: Task<ScanResult<'event> option> = task {
let mutable found = false
let mutable responseCount = 0
let mergeBatches (log: ILogger) (batchesBackward: IAsyncEnumerable<ITimelineEvent<EventBody>[] * Position option * float>) = task {
let mutable lastResponse, maybeTipPos, ru = None, None, 0.
let! events =
batchesBackward
|> TaskSeq.collectSeq (fun (events, maybePos, r) ->
if Option.isNone maybeTipPos then maybeTipPos <- maybePos
lastResponse <- Some events; ru <- ru + r
responseCount <- responseCount + 1
seq { for x in events -> struct (x, tryDecode x) })
|> TaskSeq.takeWhileInclusive (function
| struct (x, ValueSome e) when isOrigin e ->
found <- true
match lastResponse with
| None -> log.Information("EqxCosmos Stop stream={stream} at={index} {case}", stream, x.Index, x.EventType)
| Some batch ->
let used, residual = batch |> calculateUsedVersusDroppedPayload x.Index
log.Information("EqxCosmos Stop stream={stream} at={index} {case} used={used} residual={residual}",
stream, x.Index, x.EventType, used, residual)
false
| _ -> true)
|> TaskSeq.toArrayAsync
return events, maybeTipPos, ru }
let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream
let readLog = log |> Log.prop "direction" direction
let batches ct: IAsyncEnumerable<ITimelineEvent<EventBody>[] * Position option * float> =
let query = mkQuery readLog (container, stream) includeTip maxItems (direction, minIndex, maxIndex)
feedIteratorMapTi (mapPage direction (container, stream) (minIndex, maxIndex) maxRequests readLog) query ct
let! t, (events, maybeTipPos, ru) = (fun ct -> batches ct |> mergeBatches log) |> Stopwatch.time ct
let raws = Array.map ValueTuple.fst events
let decoded = if direction = Direction.Forward then Array.chooseV ValueTuple.snd events else let xs = Array.chooseV ValueTuple.snd events in Array.Reverse xs; xs
let minMax = (None, raws) ||> Array.fold (fun acc x -> let i = x.Index in Some (match acc with None -> i, i | Some (n, x) -> min n i, max x i))
let version =
match maybeTipPos, minMax with
| Some { index = max }, _
| _, Some (_, max) -> max + 1L
| None, None -> 0L
log |> logQuery direction (container, stream) t (responseCount, raws) version ru
match minMax, maybeTipPos with
| Some (i, m), _ -> return Some ({ found = found; minIndex = i; next = m + 1L; maybeTipPos = maybeTipPos; events = decoded }: ScanResult<_>)
| None, Some { index = tipI } -> return Some { found = found; minIndex = tipI; next = tipI; maybeTipPos = maybeTipPos; events = [||] }
| None, _ -> return None }
let walkLazy<'event> (log: ILogger) (container, stream) maxItems maxRequests
(tryDecode: ITimelineEvent<EventBody> -> 'event option, isOrigin: 'event -> bool)
(direction, minIndex, maxIndex, ct: CancellationToken)
: IAsyncEnumerable<'event[]> = taskSeq {
let query = mkQuery log (container, stream) true maxItems (direction, minIndex, maxIndex)
let readPage = mapPage direction (container, stream) (minIndex, maxIndex) maxRequests
let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream
let readLog = log |> Log.prop "direction" direction
let query = feedIteratorMapTi (readPage readLog) query ct
let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let allEvents = ResizeArray()
let mutable i, ru = 0, 0.
try let mutable ok = true
use e = query.GetAsyncEnumerator(ct)
while ok do
let batchLog = readLog |> Log.prop "batchIndex" i
match maxRequests with
| Some mr when i + 1 >= mr -> batchLog.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded"
| _ -> ()
let! more = e.MoveNextAsync()
if not more then ok <- false else // rest of block does not happen, while exits
let events, _pos, rus = e.Current
ru <- ru + rus
allEvents.AddRange(events)
let acc = ResizeArray()
for x in events do
match tryDecode x with
| Some e when isOrigin e ->
let used, residual = events |> calculateUsedVersusDroppedPayload x.Index
log.Information("EqxCosmos Stop stream={stream} at={index} {case} used={used} residual={residual}",
stream, x.Index, x.EventType, used, residual)
ok <- false
acc.Add e
| Some e -> acc.Add e
| None -> ()
i <- i + 1
yield acc.ToArray()
finally
let endTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let t = StopwatchInterval(startTicks, endTicks)
log |> logQuery direction (container, stream) t (i, allEvents.ToArray()) -1L ru }
/// Manages coalescing of spans of events obtained from various sources:
/// 1) Tip Data and/or Conflicting events
/// 2) Querying Primary for predecessors of what's obtained from 1
/// 3) Querying Archive for predecessors of what's obtained from 2
let load (log: ILogger) (minIndex, maxIndex) (tip: ScanResult<'event> option)
(primary: int64 option * int64 option * CancellationToken -> Task<ScanResult<'event> option>)
// Choice1Of2 -> indicates whether it's acceptable to ignore missing events; Choice2Of2 -> Fallback store
(fallback: Choice<bool, int64 option * int64 option * CancellationToken -> Task<ScanResult<'event> option>>) ct
: Task<Position * 'event[]> = task {
let minI = defaultArg minIndex 0L
match tip with
| Some { found = true; maybeTipPos = Some p; events = e } -> return p, e
| Some { minIndex = i; maybeTipPos = Some p; events = e } when i <= minI -> return p, e
| _ ->
let i, events, pos =
match tip with
| Some { minIndex = i; maybeTipPos = p; events = e } -> Some i, e, p
| None -> maxIndex, Array.empty, None
let! primary = primary (minIndex, i, ct)
let events, pos =
match primary with
| None -> events, pos |> Option.defaultValue Position.fromKnownEmpty
| Some p -> Array.append p.events events, pos |> Option.orElse p.maybeTipPos |> Option.defaultValue (Position.fromIndex p.next)
let inline logMissing (minIndex, maxIndex) message =
if log.IsEnabled Events.LogEventLevel.Debug then
(log|> fun log -> match minIndex with None -> log | Some mi -> log |> Log.prop "minIndex" mi
|> fun log -> match maxIndex with None -> log | Some mi -> log |> Log.prop "maxIndex" mi)
.Debug(message)
match primary, fallback with
| Some { found = true }, _ -> return pos, events // origin found in primary, no need to look in fallback
| Some { minIndex = i }, _ when i <= minI -> return pos, events // primary had required earliest event Index, no need to look at fallback
| None, _ when Option.isNone tip -> return pos, events // initial load where no documents present in stream
| _, Choice1Of2 allowMissing ->
logMissing (minIndex, i) "Origin event not found; no Archive Container supplied"
if allowMissing then return pos, events
else return failwith "Origin event not found; no Archive Container supplied"
| _, Choice2Of2 fallback ->
let maxIndex = match primary with Some p -> Some p.minIndex | None -> maxIndex // if no batches in primary, high water mark from tip is max
let! fallback = fallback (minIndex, maxIndex, ct)
let events =
match fallback with
| Some s -> Array.append s.events events
| None -> events
match fallback with
| Some { minIndex = i } when i <= minI -> ()
| Some { found = true } -> ()
| _ -> logMissing (minIndex, maxIndex) "Origin event not found in Archive Container"
return pos, events }
// Manages deletion of (full) Batches, and trimming of events in Tip, maintaining ordering guarantees by never updating non-Tip batches
// Additionally, the nature of the fallback algorithm requires that deletions be carried out in sequential order so as not to leave gaps
// NOTE: module is public so BatchIndices can be deserialized into
module Prune =
type BatchIndices = { id: string; i: int64; n: int64 }
let until (log: ILogger) (container: Container, stream: string) (maxItems: int) indexInclusive ct: Task<int * int * int64> = task {
let log = log |> Log.prop "stream" stream
let deleteItem id count: Task<float> = task {
let ro = ItemRequestOptions(EnableContentResponseOnWrite = false) // https://devblogs.microsoft.com/cosmosdb/enable-content-response-on-write/
let! t, res = (fun ct -> container.DeleteItemAsync(id, PartitionKey stream, ro, ct)) |> Stopwatch.time ct
let rc, ms = res.RequestCharge, t.ElapsedMilliseconds
let reqMetric: Log.Measurement = { database = container.Database.Id; container = container.Id; stream = stream; interval = t; bytes = -1; count = count; ru = rc }
let log = let evt = Log.Metric.Delete reqMetric in log |> Log.event evt
log.Information("EqxCosmos {action:l} {id} {ms:f1}ms {ru}RU", "Delete", id, ms, rc)
return rc }
let trimTip expectedI count = task {
match! container.TryReadItem<Tip>(PartitionKey stream, Tip.WellKnownDocumentId, ct) with
| _, ReadResult.NotModified -> return failwith "unexpected NotModified; no etag supplied"
| _, ReadResult.NotFound -> return failwith "unexpected NotFound"
| _, ReadResult.Found tip when tip.i <> expectedI -> return failwith $"Concurrent write detected; Expected i=%d{expectedI} actual=%d{tip.i}"
| tipRu, ReadResult.Found tip ->
let tip = { tip with i = tip.i + int64 count; e = Array.skip count tip.e }
let ro = ItemRequestOptions(EnableContentResponseOnWrite = false, IfMatchEtag = tip._etag)
let! t, updateRes = (fun ct -> container.ReplaceItemAsync(tip, tip.id, PartitionKey stream, ro, ct)) |> Stopwatch.time ct
let rc, ms = tipRu + updateRes.RequestCharge, t.ElapsedMilliseconds
let reqMetric: Log.Measurement = { database = container.Database.Id; container = container.Id; stream = stream; interval = t; bytes = -1; count = count; ru = rc }
let log = let evt = Log.Metric.Trim reqMetric in log |> Log.event evt
log.Information("EqxCosmos {action:l} {count} {ms:f1}ms {ru}RU", "Trim", count, ms, rc)
return rc }
let log = log |> Log.prop "index" indexInclusive
let query: FeedIterator<BatchIndices> =
let qro = QueryRequestOptions(PartitionKey = PartitionKey stream, MaxItemCount = maxItems)
// sort by i to guarantee we don't ever leave an observable gap in the sequence
container.GetItemQueryIterator<_>("SELECT c.id, c.i, c.n FROM c ORDER by c.i", requestOptions = qro)
let mapPage i (t: StopwatchInterval) (page: FeedResponse<BatchIndices>) =
let batches, rc, ms = Array.ofSeq page, page.RequestCharge, t.ElapsedMilliseconds
let next = Array.tryLast batches |> Option.map _.n
let reqMetric: Log.Measurement = { database = container.Database.Id; container = container.Id; stream = stream; interval = t; bytes = -1; count = batches.Length; ru = rc }
let log = let evt = Log.Metric.PruneResponse reqMetric in log |> Log.prop "batchIndex" i |> Log.event evt
log.Information("EqxCosmos {action:l} {batches} {ms:f1}ms n={next} {ru}RU", "PruneResponse", batches.Length, ms, Option.toNullable next, rc)
batches, rc
let! pt, outcomes =
let isTip (x: BatchIndices) = x.id = Tip.WellKnownDocumentId
let isRelevant x = x.i <= indexInclusive || isTip x
let handle (batches: BatchIndices[], rc) = task {
let mutable delCharges, batchesDeleted, trimCharges, batchesTrimmed, eventsDeleted, eventsDeferred = 0., 0, 0., 0, 0, 0
let mutable lwm = None
for x in batches |> Seq.takeWhile (fun x -> isRelevant x || Option.isNone lwm) do
let batchSize = x.n - x.i |> int
let eligibleEvents = max 0 (min batchSize (int (indexInclusive + 1L - x.i)))
if isTip x then // Even if we remove the last event from the Tip, we need to retain a) unfolds b) position (n)
if eligibleEvents <> 0 then
let! charge = trimTip x.i eligibleEvents
trimCharges <- trimCharges + charge
batchesTrimmed <- batchesTrimmed + 1
eventsDeleted <- eventsDeleted + eligibleEvents
if lwm = None then
lwm <- Some (x.i + int64 eligibleEvents)
elif x.n <= indexInclusive + 1L then
let! charge = deleteItem x.id batchSize
delCharges <- delCharges + charge
batchesDeleted <- batchesDeleted + 1
eventsDeleted <- eventsDeleted + batchSize
else // can't update a non-Tip batch, or it'll be ordered wrong from a CFP perspective
eventsDeferred <- eventsDeferred + eligibleEvents
if lwm = None then
lwm <- Some x.i
return (rc, delCharges, trimCharges), lwm, (batchesDeleted + batchesTrimmed, eventsDeleted, eventsDeferred) }
let hasRelevantItems (batches, _rc) = batches |> Array.exists isRelevant
let loadOutcomes ct =
Query.feedIteratorMapTi mapPage query ct
|> TaskSeq.takeWhile hasRelevantItems
|> TaskSeq.mapAsync handle
|> TaskSeq.toArrayAsync
loadOutcomes |> Stopwatch.time ct
let mutable lwm, queryCharges, delCharges, trimCharges, responses, batches, eventsDeleted, eventsDeferred = None, 0., 0., 0., 0, 0, 0, 0
let accumulate ((qc, dc, tc), bLwm, (bCount, eDel, eDef)) =
lwm <- max lwm bLwm
queryCharges <- queryCharges + qc
delCharges <- delCharges + dc
trimCharges <- trimCharges + tc
responses <- responses + 1
batches <- batches + bCount
eventsDeleted <- eventsDeleted + eDel
eventsDeferred <- eventsDeferred + eDef
outcomes |> Array.iter accumulate
let reqMetric: Log.Measurement = { database = container.Database.Id; container = container.Id; stream = stream; interval = pt; bytes = eventsDeleted; count = batches; ru = queryCharges }
let log = let evt = Log.Metric.Prune (responses, reqMetric) in log |> Log.event evt
let lwm = lwm |> Option.defaultValue 0L // If we've seen no batches at all, then the write position is 0L
log.Information("EqxCosmos {action:l} {events}/{batches} lwm={lwm} {ms:f1}ms queryRu={queryRu} deleteRu={deleteRu} trimRu={trimRu}",
"Prune", eventsDeleted, batches, lwm, pt.ElapsedMilliseconds, queryCharges, delCharges, trimCharges)
return eventsDeleted, eventsDeferred, lwm }
[<NoComparison; NoEquality>]
type Token = { pos: Position }
module Token =