diff --git a/Directory.Build.props b/Directory.Build.props
index 27b579099..24c2ea368 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -22,6 +22,9 @@
$(NoWarn);FS2003;NU5105
+
+
+ 1.1
diff --git a/Equinox.sln b/Equinox.sln
index 661569631..553b25ee0 100644
--- a/Equinox.sln
+++ b/Equinox.sln
@@ -58,6 +58,16 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{8F3E
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Todo", "Todo", "{8CDE1CC3-8619-44DE-8B4D-4102CE476C35}"
EndProject
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Cosmos.Projection", "src\Equinox.Cosmos.Projection\Equinox.Cosmos.Projection.fsproj", "{D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}"
+EndProject
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Kafka", "src\Equinox.Projection.Kafka\Equinox.Projection.Kafka.fsproj", "{F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}"
+EndProject
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Kafka.Integration", "tests\Equinox.Projection.Kafka.Integration\Equinox.Projection.Kafka.Integration.fsproj", "{C8383D84-80F5-46B8-B59B-48B2359E6A37}"
+EndProject
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Codec", "src\Equinox.Projection.Codec\Equinox.Projection.Codec.fsproj", "{65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}"
+EndProject
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Integration", "tests\Equinox.Projection.Integration\Equinox.Projection.Integration.fsproj", "{047F782D-DC37-4599-8FA0-F9B4D4C09C7B}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -128,6 +138,26 @@ Global
{EC2EC658-3D85-44F3-AD2F-52AFCAFF8871}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EC2EC658-3D85-44F3-AD2F-52AFCAFF8871}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EC2EC658-3D85-44F3-AD2F-52AFCAFF8871}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C8383D84-80F5-46B8-B59B-48B2359E6A37}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C8383D84-80F5-46B8-B59B-48B2359E6A37}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C8383D84-80F5-46B8-B59B-48B2359E6A37}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C8383D84-80F5-46B8-B59B-48B2359E6A37}.Release|Any CPU.Build.0 = Release|Any CPU
+ {65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}.Release|Any CPU.Build.0 = Release|Any CPU
+ {047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/build.proj b/build.proj
index e2b9ab63b..aa2ff0bf9 100644
--- a/build.proj
+++ b/build.proj
@@ -19,8 +19,11 @@
+
+
+
@@ -30,6 +33,8 @@
+
+
diff --git a/global.json b/global.json
index 455cd36d7..21e27cede 100644
--- a/global.json
+++ b/global.json
@@ -1,5 +1,5 @@
{
"sdk": {
- "version": "2.1.402"
+ "version": "2.1.500"
}
}
\ No newline at end of file
diff --git a/src/Equinox.Cosmos.Projection/ChangeFeedProcessor.fs b/src/Equinox.Cosmos.Projection/ChangeFeedProcessor.fs
new file mode 100644
index 000000000..fdfbf3a9d
--- /dev/null
+++ b/src/Equinox.Cosmos.Projection/ChangeFeedProcessor.fs
@@ -0,0 +1,112 @@
+namespace Equinox.Cosmos.Projection
+
+open Equinox.Cosmos.Projection
+open Equinox.Store.Infrastructure
+open Microsoft.Azure.Documents
+open Microsoft.Azure.Documents.Client
+open Microsoft.Azure.Documents.ChangeFeedProcessor
+open Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing
+open Serilog
+open System
+open System.Collections.Generic
+
+type ChangeFeedObserver() =
+ static member Create(log: ILogger, onChange : IChangeFeedObserverContext -> IReadOnlyList -> Async, ?dispose: unit -> unit) =
+ { new IChangeFeedObserver with
+ member __.ProcessChangesAsync(ctx, docs, ct) = (UnitTaskBuilder ct) {
+ try do! onChange ctx docs
+ with e ->
+ log.Warning(e, "Range {partitionKeyRangeId} Handler Threw", ctx.PartitionKeyRangeId)
+ do! Async.Raise e }
+ member __.OpenAsync ctx = UnitTaskBuilder() {
+ log.Information("Range {partitionKeyRangeId} Assigned", ctx.PartitionKeyRangeId) }
+ member __.CloseAsync (ctx, reason) = UnitTaskBuilder() {
+ log.Information("Range {partitionKeyRangeId} Revoked {reason}", ctx.PartitionKeyRangeId, reason) }
+ interface IDisposable with
+ member __.Dispose() =
+ match dispose with Some f -> f () | None -> () }
+
+type ChangeFeedObserverFactory =
+ static member FromFunction (f : unit -> #IChangeFeedObserver) =
+ { new IChangeFeedObserverFactory with member __.CreateObserver () = f () :> _ }
+
+type CosmosCollection = { database: string; collection: string }
+
+/// Wraps the [Azure CosmosDb ChangeFeedProcessor library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet)
+type ChangeFeedProcessor =
+ static member Start
+ ( log : ILogger, endpoint : Uri, accountKey : string, connectionPolicy : ConnectionPolicy, source : CosmosCollection,
+ /// The aux, non-partitioned collection holding the partition leases.
+ // Aux coll should always read from the write region to keep the number of write conflicts to minimum when the sdk
+ // updates the leases. Since the non-write region(s) might lag behind due to us using non-strong consistency, during
+ // failover we are likely to reprocess some messages, but that's okay since we are idempotent.
+ aux : CosmosCollection,
+ createObserver : unit -> IChangeFeedObserver,
+ ?leaseOwnerId : string,
+ /// Identifier to disambiguate multiple independent feed processor positions (akin to a 'consumer group')
+ ?leasePrefix : string,
+ /// (NB Only applies if this is the first time this leasePrefix is presented)
+ /// Specify `true` to request starting of projection from the present write position.
+ /// Default: false (projecting all events from start beforehand)
+ ?forceSkipExistingEvents : bool,
+ /// Limit on items to take in a batch when querying for changes (in addition to 4MB response size limit). Default 1000
+ /// NB While a larger limit may deliver better throughput, it's important to balance that .concern with the risk of throttling
+ ?cfBatchSize : int,
+ /// Frequency to check for partitions without a processor. Default 1s
+ ?leaseAcquireInterval : TimeSpan,
+ /// Frequency to renew leases held by processors under our control. Default 3s
+ ?leaseRenewInterval : TimeSpan,
+ /// Duration to take lease when acquired/renewed. Default 10s
+ ?leaseTtl : TimeSpan,
+ /// Delay before re-polling a partitition after backlog has been drained
+ ?feedPollDelay : TimeSpan,
+ /// Continuously fed per-partion lag information until parent Async completes
+ /// callback should Async.Sleep until next update is desired
+ ?reportLagAndAwaitNextEstimation) = async {
+
+ let leaseOwnerId = defaultArg leaseOwnerId (ChangeFeedProcessor.mkLeaseOwnerIdForProcess())
+ let cfBatchSize = defaultArg cfBatchSize 1000
+ let feedPollDelay = defaultArg feedPollDelay (TimeSpan.FromSeconds 1.)
+ let leaseAcquireInterval = defaultArg leaseAcquireInterval (TimeSpan.FromSeconds 1.)
+ let leaseRenewInterval = defaultArg leaseRenewInterval (TimeSpan.FromSeconds 3.)
+ let leaseTtl = defaultArg leaseTtl (TimeSpan.FromSeconds 10.)
+
+ let inline s (x : TimeSpan) = x.TotalSeconds
+ log.Information("Processing Lease acquireS={leaseAcquireIntervalS:n0} ttlS={ttlS:n0} renewS={renewS:n0} feedPollDelayS={feedPollDelayS:n0}",
+ s leaseAcquireInterval, s leaseTtl, s leaseRenewInterval, s feedPollDelay)
+
+ let builder =
+ let feedProcessorOptions =
+ ChangeFeedProcessorOptions(
+ StartFromBeginning = not (defaultArg forceSkipExistingEvents false),
+ MaxItemCount = Nullable cfBatchSize,
+ LeaseAcquireInterval = leaseAcquireInterval, LeaseExpirationInterval = leaseTtl, LeaseRenewInterval = leaseRenewInterval,
+ FeedPollDelay = feedPollDelay)
+ leasePrefix |> Option.iter (fun lp -> feedProcessorOptions.LeasePrefix <- lp + ":")
+ let mk d c = DocumentCollectionInfo(Uri = endpoint, DatabaseName = d, CollectionName = c, MasterKey = accountKey, ConnectionPolicy = connectionPolicy)
+ ChangeFeedProcessorBuilder()
+ .WithHostName(leaseOwnerId)
+ .WithFeedCollection(mk source.database source.collection)
+ .WithLeaseCollection(mk aux.database aux.collection)
+ .WithProcessorOptions(feedProcessorOptions)
+ match reportLagAndAwaitNextEstimation with
+ | None -> ()
+ | Some lagMonitorCallback ->
+ let! estimator = builder.BuildEstimatorAsync() |> Async.AwaitTaskCorrect
+ let rec emitLagMetrics () = async {
+ let! remainingWork = estimator.GetEstimatedRemainingWorkPerPartitionAsync() |> Async.AwaitTaskCorrect
+ do! lagMonitorCallback <| List.ofSeq (seq { for r in remainingWork -> int (r.PartitionKeyRangeId.Trim[|'"'|]),r.RemainingWork } |> Seq.sortBy fst)
+ return! emitLagMetrics () }
+ let! _ = Async.StartChild(emitLagMetrics ()) in ()
+ let! processor = builder.WithObserverFactory(ChangeFeedObserverFactory.FromFunction createObserver).BuildAsync() |> Async.AwaitTaskCorrect
+ do! processor.StartAsync() |> Async.AwaitTaskCorrect
+ return processor }
+ static member private mkLeaseOwnerIdForProcess() =
+ // If k>1 processes share an owner id, then they will compete for same partitions.
+ // In that scenario, redundant processing happen on assigned partitions, but checkpoint will process on only 1 consumer.
+ // Including the processId should eliminate the possibility that a broken process manager causes k>1 scenario to happen.
+ // The only downside is that upon redeploy, lease experation / TTL would have to be observed before a consumer can pick it up.
+ let processName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name
+ let processId = System.Diagnostics.Process.GetCurrentProcess().Id
+ let hostName = System.Environment.MachineName
+ sprintf "%s-%s-%d" hostName processName processId
\ No newline at end of file
diff --git a/src/Equinox.Cosmos.Projection/DocumentParser.fs b/src/Equinox.Cosmos.Projection/DocumentParser.fs
new file mode 100644
index 000000000..21d03981f
--- /dev/null
+++ b/src/Equinox.Cosmos.Projection/DocumentParser.fs
@@ -0,0 +1,33 @@
+namespace Equinox.Cosmos.Projection
+
+open Microsoft.Azure.Documents
+open System
+
+[]
+module DocumentParser =
+ type Document with
+ member document.Cast<'T>() =
+ let tmp = new Document()
+ tmp.SetPropertyValue("content", document)
+ tmp.GetPropertyValue<'T>("content")
+ /// Determines whether this document represents an index page [and hence should not be expected to contain any events]
+ let isIndex (d : Document) = d.Id = "-1"
+ type IEvent =
+ inherit Equinox.Cosmos.Store.IIndexedEvent
+ abstract member Stream : string
+ abstract member TimeStamp : DateTimeOffset
+ /// Infers whether the document represents a valid Event-Batch
+ let enumEvents (d : Document) = seq {
+ if not (isIndex d)
+ && d.GetPropertyValue("p") <> null && d.GetPropertyValue("i") <> null
+ && d.GetPropertyValue("n") <> null && d.GetPropertyValue("e") <> null then
+ let batch = d.Cast()
+ yield! batch.e |> Seq.mapi (fun offset x ->
+ { new IEvent with
+ member __.Index = batch.i + int64 offset
+ member __.IsUnfold = false
+ member __.EventType = x.c
+ member __.Data = x.d
+ member __.Meta = x.m
+ member __.TimeStamp = x.t
+ member __.Stream = batch.p } ) }
\ No newline at end of file
diff --git a/src/Equinox.Cosmos.Projection/Equinox.Cosmos.Projection.fsproj b/src/Equinox.Cosmos.Projection/Equinox.Cosmos.Projection.fsproj
new file mode 100644
index 000000000..4617df044
--- /dev/null
+++ b/src/Equinox.Cosmos.Projection/Equinox.Cosmos.Projection.fsproj
@@ -0,0 +1,32 @@
+
+
+
+ netstandard2.0;net461
+ 5
+ false
+ true
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Equinox.Cosmos.Projection/Infrastructure.fs b/src/Equinox.Cosmos.Projection/Infrastructure.fs
new file mode 100644
index 000000000..273ac2ae8
--- /dev/null
+++ b/src/Equinox.Cosmos.Projection/Infrastructure.fs
@@ -0,0 +1,50 @@
+namespace Equinox.Cosmos.Projection
+
+open System
+open System.Threading
+open System.Threading.Tasks
+
+[]
+module Impl =
+ type Async with
+ /// Re-raise an exception so that the current stacktrace is preserved
+ static member Raise(e : #exn) : Async<'T> = Async.FromContinuations (fun (_,ec,_) -> ec e)
+ static member Sleep(t : TimeSpan) : Async = Async.Sleep(int t.TotalMilliseconds)
+
+ /// eagerly cancels the parent workflow without waiting for cooperative cancellation by the child computation.
+ static member TimeoutAfterEager (timeout : TimeSpan) (workflow : Async<'T>) : Async<'T> = async {
+ let! ct = Async.CancellationToken
+ return! Async.FromContinuations(fun (sk,ek,ck) ->
+ let mutable latch = 0
+ let inline protect k t = if Interlocked.Increment &latch = 1 then k t
+ let cts = CancellationTokenSource.CreateLinkedTokenSource(ct)
+ do cts.CancelAfter timeout
+ let _ = let t = cts.Token in t.Register(fun () -> protect ek (TimeoutException("async workflow has timed out")))
+ Async.StartWithContinuations(workflow, protect sk, protect ek, protect ck, cts.Token))
+ }
+
+[]
+type AsyncBuilderAbstract() =
+ member __.Zero() = async.Zero()
+ member __.Return t = async.Return t
+ member __.ReturnFrom t = async.ReturnFrom t
+ member __.Bind(f,g) = async.Bind(f,g)
+ member __.Combine(f,g) = async.Combine(f,g)
+ member __.Delay f = async.Delay f
+ member __.While(c,b) = async.While(c,b)
+ member __.For(xs,b) = async.For(xs,b)
+ member __.Using(d,b) = async.Using(d,b)
+ member __.TryWith(b,e) = async.TryWith(b,e)
+ member __.TryFinally(b,f) = async.TryFinally(b,f)
+
+type TaskBuilder(?ct : CancellationToken) =
+ inherit AsyncBuilderAbstract()
+ member __.Run f : Task<'T> = Async.StartAsTask(f, ?cancellationToken = ct)
+
+type UnitTaskBuilder(?ct : CancellationToken, ?timeout : TimeSpan) =
+ inherit AsyncBuilderAbstract()
+
+ member __.Run workflow : Task =
+ let wrapped = match timeout with None -> workflow | Some t -> Async.TimeoutAfterEager t workflow
+ let task = Async.StartAsTask(wrapped, ?cancellationToken = ct)
+ task :> _
\ No newline at end of file
diff --git a/src/Equinox.Projection.Codec/Equinox.Projection.Codec.fsproj b/src/Equinox.Projection.Codec/Equinox.Projection.Codec.fsproj
new file mode 100644
index 000000000..1bd774858
--- /dev/null
+++ b/src/Equinox.Projection.Codec/Equinox.Projection.Codec.fsproj
@@ -0,0 +1,28 @@
+
+
+
+ netstandard2.0;net461
+ 5
+ false
+ true
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Equinox.Projection.Codec/FeedValidator.fs b/src/Equinox.Projection.Codec/FeedValidator.fs
new file mode 100644
index 000000000..bdb176125
--- /dev/null
+++ b/src/Equinox.Projection.Codec/FeedValidator.fs
@@ -0,0 +1,110 @@
+module Equinox.Projection.Validation
+
+open System.Collections.Generic
+
+module private Option =
+ let defaultValue def option = defaultArg option def
+
+/// Represents the categorisation of an item being ingested
+type IngestResult =
+ /// The item is a correct item at the tail of the known sequence
+ | New
+ /// Consistent as far as we know (but this Validator has not seen the head)
+ | Ok
+ /// The item represents a duplicate of an earlier item
+ | Duplicate
+ /// The item is beyond the tail of the stream and likely represets a gap
+ | Gap
+
+/// Represents the present state of a given stream
+type StreamState =
+ /// We've observed the stream from the start
+ | All of pos: int
+ /// We've not observed the stream from the start
+ | Partial of min: int * pos: int
+
+module StreamState =
+ let combine (state : StreamState option) index : IngestResult*StreamState =
+ match state, index with
+ | None, 0 -> New, All 0
+ | None, x -> Ok, Partial (x,x)
+ | Some (All x), i when i <= x -> Duplicate, All x
+ | Some (All x), i when i = x + 1 -> New, All i
+ | Some (All x), _ -> Gap, All x
+ | Some (Partial (min=1; pos=pos)), 0 -> Duplicate, All pos
+ | Some (Partial (min=min; pos=x)), i when i <= min -> Duplicate, Partial (i, x)
+ | Some (Partial (min=min; pos=x)), i when i = x + 1 -> Ok, Partial (min, i)
+ | Some (Partial (min=min)), i -> New, Partial (min, i)
+
+
+type FeedStats = { complete: int; partial: int }
+
+/// Maintains the state of a set of streams being ingested into a processor for consistency checking purposes
+/// - to determine whether an incoming event on a stream should be considered a duplicate and hence not processed
+/// - to allow inconsistencies to be logged
+type FeedValidator() =
+ let streamStates = System.Collections.Generic.Dictionary()
+
+ /// Thread safe operation to a) classify b) track change implied by a new message as encountered
+ member __.Ingest(stream, index) : IngestResult * StreamState =
+ lock streamStates <| fun () ->
+ let state =
+ match streamStates.TryGetValue stream with
+ | true, state -> Some state
+ | false, _ -> None
+ let (res, state') = StreamState.combine state index
+ streamStates.[stream] <- state'
+ res, state'
+
+ /// Determine count of streams being tracked
+ member __.Stats =
+ lock streamStates <| fun () ->
+ let raw = streamStates |> Seq.countBy (fun x -> match x.Value with All _ -> true | Partial _ -> false) |> Seq.toList
+ { complete = raw |> List.tryPick (function (true,c) -> Some c | (false,_) -> None) |> Option.defaultValue 0
+ partial = raw |> List.tryPick (function (false,c) -> Some c | (true,_) -> None) |> Option.defaultValue 0 }
+
+type [] StreamSummary = { mutable fresh : int; mutable ok : int; mutable dup : int; mutable gap : int; mutable complete: bool }
+
+type BatchStats = { fresh : int; ok : int; dup : int; gap : int; categories : int; streams : BatchStreamStats } with
+ member s.TotalStreams = let s = s.streams in s.complete + s.incomplete
+
+and BatchStreamStats = { complete: int; incomplete: int }
+
+/// Used to establish aggregate stats for a batch of inputs for logging purposes
+/// The Ingested inputs are passed to the supplied validator in order to classify them
+type BatchValidator(validator : FeedValidator) =
+ let streams = System.Collections.Generic.Dictionary()
+ let streamSummary (streamName : string) =
+ match streams.TryGetValue streamName with
+ | true, acc -> acc
+ | false, _ -> let t = { fresh = 0; ok = 0; dup = 0; gap = 0; complete = false } in streams.[streamName] <- t; t
+
+ /// Collate into Feed Validator and Batch stats
+ member __.TryIngest(stream, index) : IngestResult =
+ let res, state = validator.Ingest(stream, index)
+ let streamStats = streamSummary stream
+ match state with
+ | All _ -> streamStats.complete <- true
+ | Partial _ -> streamStats.complete <- false
+ match res with
+ | New -> streamStats.fresh <- streamStats.fresh + 1
+ | Ok -> streamStats.ok <- streamStats.ok + 1
+ | Duplicate -> streamStats.dup <- streamStats.dup + 1
+ | Gap -> streamStats.gap <- streamStats.gap + 1
+ res
+
+ member __.Enum() : IEnumerable> = upcast streams
+
+ member __.Stats : BatchStats =
+ let mutable fresh, ok, dup, gap, complete, incomplete = 0, 0, 0, 0, 0, 0
+ let cats = HashSet()
+ for KeyValue(k,v) in streams do
+ fresh <- fresh + v.fresh
+ ok <- ok + v.ok
+ dup <- dup + v.dup
+ gap <- gap + v.gap
+ match k.IndexOf('-') with
+ | -1 -> ()
+ | i -> cats.Add(k.Substring(0, i)) |> ignore
+ if v.complete then complete <- complete + 1 else incomplete <- incomplete + 1
+ { fresh = fresh; ok = ok; dup = dup; gap = gap; categories = cats.Count; streams = { complete = complete; incomplete = incomplete } }
\ No newline at end of file
diff --git a/src/Equinox.Projection.Codec/RenderedEvent.fs b/src/Equinox.Projection.Codec/RenderedEvent.fs
new file mode 100644
index 000000000..57b2a378c
--- /dev/null
+++ b/src/Equinox.Projection.Codec/RenderedEvent.fs
@@ -0,0 +1,29 @@
+namespace Equinox.Projection
+
+open Newtonsoft.Json
+open System
+
+module Codec =
+ /// Default rendition of an event when being projected to Kafka
+ type [] RenderedEvent =
+ { /// Stream Name
+ s: string
+
+ /// Index within stream
+ i: int64
+
+ /// Event Type associated with event data in `d`
+ c: string
+
+ /// Timestamp of original write
+ t: DateTimeOffset // ISO 8601
+
+ /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb
+ // TOCONSIDER if we don't inline `h`, we need to inline this
+ [)>]
+ d: byte[] // required
+
+ /// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing)
+ [)>]
+ []
+ m: byte[] }
\ No newline at end of file
diff --git a/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj b/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj
new file mode 100644
index 000000000..2452863fe
--- /dev/null
+++ b/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj
@@ -0,0 +1,28 @@
+
+
+
+ netstandard2.0;net461
+ 5
+ false
+ true
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Equinox.Projection.Kafka/Infrastructure.fs b/src/Equinox.Projection.Kafka/Infrastructure.fs
new file mode 100644
index 000000000..6a7e89710
--- /dev/null
+++ b/src/Equinox.Projection.Kafka/Infrastructure.fs
@@ -0,0 +1,31 @@
+namespace Equinox.Projection.Kafka
+
+open System
+open System.Threading.Tasks
+
+[]
+module private AsyncHelpers =
+ type Async with
+ static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> =
+ Async.FromContinuations <| fun (k,ek,_) ->
+ task.ContinueWith (fun (t:Task<'T>) ->
+ if t.IsFaulted then
+ let e = t.Exception
+ if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
+ else ek e
+ elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
+ elif t.IsCompleted then k t.Result
+ else ek(Exception "invalid Task state!"))
+ |> ignore
+
+ static member AwaitTaskCorrect (task : Task) : Async =
+ Async.FromContinuations <| fun (k,ek,_) ->
+ task.ContinueWith (fun (t:Task) ->
+ if t.IsFaulted then
+ let e = t.Exception
+ if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
+ else ek e
+ elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
+ elif t.IsCompleted then k ()
+ else ek(Exception "invalid Task state!"))
+ |> ignore
diff --git a/src/Equinox.Projection.Kafka/Kafka.fs b/src/Equinox.Projection.Kafka/Kafka.fs
new file mode 100644
index 000000000..30137bb47
--- /dev/null
+++ b/src/Equinox.Projection.Kafka/Kafka.fs
@@ -0,0 +1,356 @@
+namespace Equinox.Projection.Kafka
+
+open Confluent.Kafka
+open Serilog
+open System
+open System.Collections.Concurrent
+open System.Collections.Generic
+open System.Threading
+open System.Threading.Tasks
+
+module private Config =
+ let validateBrokerUri (u:Uri) =
+ if not u.IsAbsoluteUri then invalidArg "broker" "should be of 'host:port' format"
+ if String.IsNullOrEmpty u.Authority then
+ // handle a corner case in which Uri instances are erroneously putting the hostname in the `scheme` field.
+ if System.Text.RegularExpressions.Regex.IsMatch(string u, "^\S+:[0-9]+$") then string u
+ else invalidArg "broker" "should be of 'host:port' format"
+
+ else u.Authority
+
+type Compression = Uncompressed | GZip | Snappy | LZ4 // as soon as CK provides such an Enum, this can go
+
+/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings
+[]
+type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression : Compression, acks : Acks) =
+ member val Conf : ProducerConfig = conf
+ member val Acks = acks
+ member val Broker = broker
+ member val Compression = compression
+ member __.Kvps = Seq.append conf cfgs
+
+ /// Creates a Kafka producer instance with supplied configuration
+ static member Create
+ ( clientId : string, broker : Uri, acks : Acks,
+ /// Message compression. Defaults to Uncompressed/'none'.
+ ?compression,
+ /// Maximum in-flight requests; <> 1 implies potential reordering of writes should a batch fail and then succeed in a subsequent retry. Defaults to 1.
+ ?maxInFlight,
+ /// Number of retries. Defaults to 60.
+ ?retries,
+ /// Backoff interval. Defaults to 1 second.
+ ?retryBackoff,
+ /// Statistics Interval. Defaults to no stats.
+ ?statisticsInterval,
+ /// Defaults to true.
+ ?socketKeepAlive,
+ /// Defaults to 10 ms.
+ ?linger : TimeSpan,
+ /// Defaults to 'consistent_random'.
+ ?partitioner,
+ /// Misc configuration parameter to be passed to the underlying CK producer.
+ ?custom) =
+ let compression = defaultArg compression Uncompressed
+ let cfgs = seq {
+ yield KeyValuePair("compression.codec", match compression with Uncompressed -> "none" | GZip -> "gzip" | Snappy -> "snappy" | LZ4 -> "lz4")
+ match custom with None -> () | Some miscConfig -> yield! miscConfig }
+ let c =
+ ProducerConfig(
+ ClientId = clientId, BootstrapServers = Config.validateBrokerUri broker,
+ RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000),
+ MessageSendMaxRetries = Nullable (defaultArg retries 60),
+ Acks = Nullable acks,
+ LingerMs = Nullable (match linger with Some t -> int t.TotalMilliseconds | None -> 10),
+ SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true),
+ Partitioner = Nullable (defaultArg partitioner Partitioner.ConsistentRandom),
+ MaxInFlight = Nullable (defaultArg maxInFlight 1000000),
+ LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
+ statisticsInterval |> Option.iter (fun (i : TimeSpan) -> c.StatisticsIntervalMs <- Nullable (int i.TotalMilliseconds))
+ KafkaProducerConfig(c, cfgs, broker, compression, acks)
+
+type KafkaProducer private (log: ILogger, producer : Producer, topic : string) =
+ member __.Topic = topic
+
+ interface IDisposable with member __.Dispose() = for d in [(*d1;d2;*)producer:>IDisposable] do d.Dispose()
+
+ /// Produces a batch of supplied key/value messages. Results are returned in order of writing.
+ member __.ProduceBatch(keyValueBatch : (string * string)[]) = async {
+ if Array.isEmpty keyValueBatch then return [||] else
+
+ let! ct = Async.CancellationToken
+
+ let tcs = new TaskCompletionSource[]>()
+ let numMessages = keyValueBatch.Length
+ let results = Array.zeroCreate> numMessages
+ let numCompleted = ref 0
+
+ use _ = ct.Register(fun _ -> tcs.TrySetCanceled() |> ignore)
+
+ let handler (m : DeliveryReport) =
+ if m.Error.IsError then
+ let errorMsg = exn (sprintf "Error on message topic=%s code=%O reason=%s" m.Topic m.Error.Code m.Error.Reason)
+ tcs.TrySetException errorMsg |> ignore
+ else
+ let i = Interlocked.Increment numCompleted
+ results.[i - 1] <- m
+ if i = numMessages then tcs.TrySetResult results |> ignore
+ for key,value in keyValueBatch do
+ producer.BeginProduce(topic, Message<_,_>(Key=key, Value=value), deliveryHandler = handler)
+ producer.Flush(ct)
+ log.Debug("Produced {count}",!numCompleted)
+ return! Async.AwaitTaskCorrect tcs.Task }
+
+ static member Create(log : ILogger, config : KafkaProducerConfig, topic : string) =
+ if String.IsNullOrEmpty topic then nullArg "topic"
+ log.Information("Producing... {broker} / {topic} compression={compression} acks={acks}", config.Broker, topic, config.Compression, config.Acks)
+ let producer =
+ ProducerBuilder(config.Kvps)
+ .SetLogHandler(fun _p m -> log.Information("{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
+ .SetErrorHandler(fun _p e -> log.Error("{reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
+ .Build()
+ new KafkaProducer(log, producer, topic)
+
+ type ConsumerBufferingConfig = { minInFlightBytes : int64; maxInFlightBytes : int64; maxBatchSize : int; maxBatchDelay : TimeSpan }
+
+ module private ConsumerImpl =
+ /// guesstimate approximate message size in bytes
+ let approximateMessageBytes (message : ConsumeResult) =
+ let inline len (x:string) = match x with null -> 0 | x -> sizeof * x.Length
+ 16 + len message.Key + len message.Value |> int64
+
+ type BlockingCollection<'T> with
+ member bc.FillBuffer(buffer : 'T[], maxDelay : TimeSpan) : int =
+ let cts = new CancellationTokenSource()
+ do cts.CancelAfter maxDelay
+
+ let n = buffer.Length
+ let mutable i = 0
+ let mutable t = Unchecked.defaultof<'T>
+
+ while i < n && not cts.IsCancellationRequested do
+ if bc.TryTake(&t, 5 (* ms *)) then
+ buffer.[i] <- t ; i <- i + 1
+ while i < n && not cts.IsCancellationRequested && bc.TryTake(&t) do
+ buffer.[i] <- t ; i <- i + 1
+ i
+
+ type PartitionedBlockingCollection<'Key, 'Message when 'Key : equality>(?perPartitionCapacity : int) =
+ let collections = new ConcurrentDictionary<'Key, Lazy>>()
+ let onPartitionAdded = new Event<'Key * BlockingCollection<'Message>>()
+
+ let createCollection() =
+ match perPartitionCapacity with
+ | None -> new BlockingCollection<'Message>()
+ | Some c -> new BlockingCollection<'Message>(boundedCapacity = c)
+
+ []
+ member __.OnPartitionAdded = onPartitionAdded.Publish
+
+ member __.Add (key : 'Key, message : 'Message) =
+ let factory key = lazy(
+ let coll = createCollection()
+ onPartitionAdded.Trigger(key, coll)
+ coll)
+
+ let buffer = collections.GetOrAdd(key, factory)
+ buffer.Value.Add message
+
+ member __.Revoke(key : 'Key) =
+ match collections.TryRemove key with
+ | true, coll -> Task.Delay(10000).ContinueWith(fun _ -> coll.Value.CompleteAdding()) |> ignore
+ | _ -> ()
+
+ type InFlightMessageCounter(log: ILogger, minInFlightBytes : int64, maxInFlightBytes : int64) =
+ do if minInFlightBytes < 1L then invalidArg "minInFlightBytes" "must be positive value"
+ if maxInFlightBytes < 1L then invalidArg "maxInFlightBytes" "must be positive value"
+ if minInFlightBytes > maxInFlightBytes then invalidArg "maxInFlightBytes" "must be greater than minInFlightBytes"
+
+ let mutable inFlightBytes = 0L
+
+ member __.Add(numBytes : int64) = Interlocked.Add(&inFlightBytes, numBytes) |> ignore
+
+ member __.AwaitThreshold() =
+ if inFlightBytes > maxInFlightBytes then
+ log.Warning("Consumer reached in-flight message threshold, breaking off polling, bytes={max}", inFlightBytes)
+ while inFlightBytes > minInFlightBytes do Thread.Sleep 5
+ log.Information "Consumer resuming polling"
+
+ let mkBatchedMessageConsumer (log: ILogger) (buf : ConsumerBufferingConfig) (ct : CancellationToken) (consumer : Consumer)
+ (partitionedCollection: PartitionedBlockingCollection>)
+ (handler : ConsumeResult[] -> Async) = async {
+ let tcs = new TaskCompletionSource()
+ use cts = CancellationTokenSource.CreateLinkedTokenSource(ct)
+ use _ = ct.Register(fun _ -> tcs.TrySetResult () |> ignore)
+
+ use _ = consumer
+
+ let counter = new InFlightMessageCounter(log, buf.minInFlightBytes, buf.maxInFlightBytes)
+
+ // starts a tail recursive loop that dequeues batches for a given partition buffer and schedules the user callback
+ let consumePartition (collection : BlockingCollection>) =
+ let buffer = Array.zeroCreate buf.maxBatchSize
+ let nextBatch () =
+ let count = collection.FillBuffer(buffer, buf.maxBatchDelay)
+ if count <> 0 then log.Debug("Consuming {count}", count)
+ let batch = Array.init count (fun i -> buffer.[i])
+ Array.Clear(buffer, 0, count)
+ batch
+
+ let rec loop () = async {
+ if not collection.IsCompleted then
+ try match nextBatch() with
+ | [||] -> ()
+ | batch ->
+ // run the handler function
+ do! handler batch
+
+ // store completed offsets
+ let tpo =
+ let maxOffset = batch |> Array.maxBy (fun m -> let o = m.Offset in o.Value)
+ let raw = maxOffset.TopicPartitionOffset
+ TopicPartitionOffset(raw.Topic, raw.Partition, Offset(let o = raw.Offset in o.Value + 1L))
+ consumer.StoreOffsets[| tpo |]
+
+ // decrement in-flight message counter
+ let batchSize = batch |> Array.sumBy approximateMessageBytes
+ counter.Add -batchSize
+ with e ->
+ tcs.TrySetException e |> ignore
+ cts.Cancel()
+ do! loop() }
+
+ Async.Start(loop(), cts.Token)
+
+ use _ = partitionedCollection.OnPartitionAdded.Subscribe (fun (_key,buffer) -> consumePartition buffer)
+
+ // run the consumer
+ let ct = cts.Token
+ try while not ct.IsCancellationRequested do
+ counter.AwaitThreshold()
+ try let message = consumer.Consume(cts.Token) // NB don't use TimeSpan overload unless you want AVEs on1.0.0-beta2
+ if message <> null then
+ counter.Add(approximateMessageBytes message)
+ partitionedCollection.Add(message.TopicPartition, message)
+ with| :? ConsumeException as e -> log.Warning(e, "Consume exception")
+ | :? System.OperationCanceledException -> log.Warning("Consumer:{name} | Consuming cancelled", consumer.Name)
+ finally
+ consumer.Close()
+
+ // await for handler faults or external cancellation
+ do! Async.AwaitTaskCorrect tcs.Task
+ }
+
+/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings
+[]
+type KafkaConsumerConfig = private { conf: ConsumerConfig; custom: seq>; topics: string list; buffering: ConsumerBufferingConfig } with
+ member __.Kvps = Seq.append __.conf __.custom
+ /// Builds a Kafka Consumer Config suitable for KafkaConsumer.Start*
+ static member Create
+ ( /// Identify this consumer in logs etc
+ clientId, broker : Uri, topics,
+ /// Consumer group identifier.
+ groupId,
+ /// Default Earliest.
+ ?autoOffsetReset,
+ /// Default 100kB.
+ ?fetchMaxBytes,
+ /// Default 10B.
+ ?fetchMinBytes,
+ ?statisticsInterval,
+ /// Consumed offsets commit interval. Default 10s. (WAS 1s)
+ ?offsetCommitInterval,
+ /// Misc configuration parameter to be passed to the underlying CK consumer.
+ ?custom,
+
+ (* Client side batching *)
+
+ /// Maximum number of messages to group per batch on consumer callbacks. Default 1000.
+ ?maxBatchSize,
+ /// Message batch linger time. Default 500ms.
+ ?maxBatchDelay,
+ /// Minimum total size of consumed messages in-memory for the consumer to attempt to fill. Default 16MiB.
+ ?minInFlightBytes,
+ /// Maximum total size of consumed messages in-memory before broker polling is throttled. Default 24MiB.
+ ?maxInFlightBytes) =
+ let conf =
+ ConsumerConfig(
+ ClientId=clientId, BootstrapServers=Config.validateBrokerUri broker, GroupId=groupId,
+ AutoOffsetReset = Nullable (defaultArg autoOffsetReset AutoOffsetReset.Earliest),
+ FetchMaxBytes = Nullable (defaultArg fetchMaxBytes 100000),
+ MessageMaxBytes = Nullable (defaultArg fetchMaxBytes 100000),
+ FetchMinBytes = Nullable (defaultArg fetchMinBytes 10), // TODO check if sane default
+ EnableAutoCommit = Nullable true,
+ AutoCommitIntervalMs = Nullable (match offsetCommitInterval with Some (i: TimeSpan) -> int i.TotalMilliseconds | None -> 1000*10),
+ EnableAutoOffsetStore = Nullable false,
+ LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
+ statisticsInterval |> Option.iter (fun (i : TimeSpan) -> conf.StatisticsIntervalMs <- Nullable (int i.TotalMilliseconds))
+ { conf = conf; custom = match custom with None -> Seq.empty | Some c -> List.toSeq c
+ topics = match Seq.toList topics with [] -> invalidArg "topics" "must be non-empty collection" | ts -> ts
+ buffering = {
+ maxBatchSize = defaultArg maxBatchSize 1000
+ maxBatchDelay = defaultArg maxBatchDelay (TimeSpan.FromMilliseconds 500.)
+ minInFlightBytes = defaultArg minInFlightBytes (16L * 1024L * 1024L)
+ maxInFlightBytes = defaultArg maxInFlightBytes (24L * 1024L * 1024L) } }
+
+type KafkaConsumer private (log : ILogger, consumer : Consumer, task : Task, cts : CancellationTokenSource) =
+
+ /// Asynchronously awaits until consumer stops or is faulted
+ member __.AwaitCompletion() = Async.AwaitTaskCorrect task
+ interface IDisposable with member __.Dispose() = __.Stop()
+
+ member __.Status = task.Status
+ member __.Stop() =
+ log.Information("Consuming ... Stopping {name}", consumer.Name)
+ cts.Cancel();
+
+ /// Starts a kafka consumer with provider configuration and batch message handler.
+ /// Batches are grouped by topic partition. Batches belonging to the same topic partition will be scheduled sequentially and monotonically,
+ /// however batches from different partitions can be run concurrently.
+ static member Start (log : ILogger) (config : KafkaConsumerConfig) (partitionHandler : ConsumeResult[] -> Async) =
+ if List.isEmpty config.topics then invalidArg "config" "must specify at least one topic"
+ log.Information("Consuming... {broker} {topics} {groupId}" (*autoOffsetReset={autoOffsetReset}*) + " fetchMaxBytes={fetchMaxB} maxInFlightBytes={maxInFlightB} maxBatchSize={maxBatchB} maxBatchDelay={maxBatchDelay}s",
+ config.conf.BootstrapServers, config.topics, config.conf.GroupId, (*config.conf.AutoOffsetReset.Value,*) config.conf.FetchMaxBytes,
+ config.buffering.maxInFlightBytes, config.buffering.maxBatchSize, (let t = config.buffering.maxBatchDelay in t.TotalSeconds))
+
+ let partitionedCollection = new ConsumerImpl.PartitionedBlockingCollection>()
+ let consumer =
+ ConsumerBuilder<_,_>(config.Kvps)
+ .SetLogHandler(fun _c m -> log.Information("consumer_info|{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
+ .SetErrorHandler(fun _c e -> log.Error("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
+ .SetRebalanceHandler(fun _c m ->
+ for topic,partitions in m.Partitions |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> let p = p.Partition in p.Value |]) do
+ if m.IsAssignment then log.Information("Consuming... Assigned {topic:l} {partitions}", topic, partitions)
+ else log.Information("Consuming... Revoked {topic:l} {partitions}", topic, partitions)
+ if m.IsRevocation then m.Partitions |> Seq.iter partitionedCollection.Revoke)
+ //let d5 = c.OnPartitionEOF.Subscribe(fun tpo ->
+ // log.Verbose("consumer_partition_eof|topic={topic}|partition={partition}|offset={offset}", tpo.Topic, tpo.Partition, let o = tpo.Offset in o.Value))
+ .SetOffsetsCommittedHandler(fun _c cos ->
+ for t,ps in cos.Offsets |> Seq.groupBy (fun p -> p.Topic) do
+ let o = [for p in ps -> let pp = p.Partition in pp.Value, let o = p.Offset in if o.IsSpecial then box (string o) else box o.Value(*, fmtError p.Error*)]
+ let e = cos.Error
+ if not e.IsError then log.Information("Consuming... Committed {topic} {@offsets}", t, o)
+ else log.Warning("Consuming... Committed {topic} {@offsets} reason={error} code={code} isBrokerError={isBrokerError}", t, o, e.Reason, e.Code, e.IsBrokerError))
+ .Build()
+ consumer.Subscribe config.topics
+ let cts = new CancellationTokenSource()
+ let task = ConsumerImpl.mkBatchedMessageConsumer log config.buffering cts.Token consumer partitionedCollection partitionHandler |> Async.StartAsTask
+ new KafkaConsumer(log, consumer, task, cts)
+
+ /// Starts a Kafka consumer instance that schedules handlers grouped by message key. Additionally accepts a global degreeOfParallelism parameter
+ /// that controls the number of handlers running concurrently across partitions for the given consumer instance.
+ static member StartByKey (log: ILogger) (config : KafkaConsumerConfig) (degreeOfParallelism : int) (keyHandler : ConsumeResult<_,_> [] -> Async) =
+ let semaphore = new SemaphoreSlim(degreeOfParallelism)
+ let partitionHandler (messages : ConsumeResult<_,_>[]) = async {
+ return!
+ messages
+ |> Seq.groupBy (fun m -> m.Key)
+ |> Seq.map (fun (_,gp) -> async {
+ let! ct = Async.CancellationToken
+ let! _ = semaphore.WaitAsync ct |> Async.AwaitTaskCorrect
+ try do! keyHandler (Seq.toArray gp)
+ finally semaphore.Release() |> ignore })
+ |> Async.Parallel
+ |> Async.Ignore
+ }
+
+ KafkaConsumer.Start log config partitionHandler
\ No newline at end of file
diff --git a/tests/Equinox.Projection.Integration/Equinox.Projection.Integration.fsproj b/tests/Equinox.Projection.Integration/Equinox.Projection.Integration.fsproj
new file mode 100644
index 000000000..55cb4ff0b
--- /dev/null
+++ b/tests/Equinox.Projection.Integration/Equinox.Projection.Integration.fsproj
@@ -0,0 +1,29 @@
+
+
+
+ netcoreapp2.1;net461
+ 5
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers
+
+
+
+
+
diff --git a/tests/Equinox.Projection.Integration/FeedValidatorTests.fs b/tests/Equinox.Projection.Integration/FeedValidatorTests.fs
new file mode 100644
index 000000000..c02bb3c69
--- /dev/null
+++ b/tests/Equinox.Projection.Integration/FeedValidatorTests.fs
@@ -0,0 +1,70 @@
+module Equinox.Projection.Tests
+
+open Equinox.Projection.Validation
+open FsCheck.Xunit
+open Swensen.Unquote
+open Xunit
+
+let [] ``Properties`` state index =
+ let result,state' = StreamState.combine state index
+ match state with
+ | Some (Partial (_, pos)) when pos <= 1 -> ()
+ | Some (Partial (min, pos)) when min > pos || min = 0 -> ()
+ | None ->
+ match state' with
+ | All _ -> result =! New
+ | Partial _ -> result =! Ok
+ | Some (All x) ->
+ match state',result with
+ | All x' , Duplicate -> x' =! x
+ | All x', New -> x' =! x+1
+ | All x', Gap -> x' =! x
+ | x -> failwithf "Unexpected %A" x
+ | Some (Partial (min, pos)) ->
+ match state',result with
+ | All 0,Duplicate when min=0 && index = 0 -> ()
+ | All x,Duplicate when min=1 && pos = x && index = 0 -> ()
+ | Partial (min', pos'), Duplicate -> min' =! max min' index; pos' =! pos
+ | Partial (min', pos'), Ok
+ | Partial (min', pos'), New -> min' =! min; pos' =! index
+ | x -> failwithf "Unexpected %A" x
+
+let [] ``Zero on unknown stream is New`` () =
+ let result,state = StreamState.combine None 0
+ New =! result
+ All 0 =! state
+
+let [] ``Non-zero on unknown stream is Ok`` () =
+ let result,state = StreamState.combine None 1
+ Ok =! result
+ Partial (1,1) =! state
+
+let [] ``valid successor is New`` () =
+ let result,state = StreamState.combine (All 0 |> Some) 1
+ New =! result
+ All 1 =! state
+
+let [] ``single immediate repeat is flagged`` () =
+ let result,state = StreamState.combine (All 0 |> Some) 0
+ Duplicate =! result
+ All 0 =! state
+
+let [] ``non-immediate repeat is flagged`` () =
+ let result,state = StreamState.combine (All 1 |> Some) 0
+ Duplicate =! result
+ All 1 =! state
+
+let [] ``Gap is flagged`` () =
+ let result,state = StreamState.combine (All 1 |> Some) 3
+ Gap =! result
+ All 1 =! state
+
+let [] ``Potential gaps are not flagged as such when we're processing a Partial`` () =
+ let result,state = StreamState.combine (Some (Partial (1,1))) 3
+ New =! result
+ Partial (1,3) =! state
+
+let [] ``Earlier values widen the min when we're processing a Partial`` () =
+ let result,state = StreamState.combine (Some (Partial (2,3))) 1
+ Duplicate =! result
+ Partial (1,3) =! state
\ No newline at end of file
diff --git a/tests/Equinox.Projection.Kafka.Integration/Equinox.Projection.Kafka.Integration.fsproj b/tests/Equinox.Projection.Kafka.Integration/Equinox.Projection.Kafka.Integration.fsproj
new file mode 100644
index 000000000..d0bdc08e4
--- /dev/null
+++ b/tests/Equinox.Projection.Kafka.Integration/Equinox.Projection.Kafka.Integration.fsproj
@@ -0,0 +1,29 @@
+
+
+
+ netcoreapp2.1;net461
+ 5
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers
+
+
+
+
+
diff --git a/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs b/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs
new file mode 100644
index 000000000..96d2d3967
--- /dev/null
+++ b/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs
@@ -0,0 +1,327 @@
+namespace Equinox.Projection.Kafka.KafkaIntegration
+
+open Equinox.Projection.Kafka
+open Newtonsoft.Json
+open Serilog
+open Swensen.Unquote
+open System
+open System.Collections.Concurrent
+open System.ComponentModel
+open System.Threading
+open System.Threading.Tasks
+open Xunit
+
+[]
+[]
+module Helpers =
+ open Confluent.Kafka
+
+ // Derived from https://github.com/damianh/CapturingLogOutputWithXunit2AndParallelTests
+ // NB VS does not surface these atm, but other test runners / test reports do
+ type TestOutputAdapter(testOutput : Xunit.Abstractions.ITestOutputHelper) =
+ let formatter = Serilog.Formatting.Display.MessageTemplateTextFormatter("{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level}] {Message}{NewLine}{Exception}", null);
+ let writeSerilogEvent logEvent =
+ use writer = new System.IO.StringWriter()
+ formatter.Format(logEvent, writer);
+ writer |> string |> testOutput.WriteLine
+ writer |> string |> System.Diagnostics.Debug.WriteLine
+ interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent
+
+ let createLogger sink =
+ LoggerConfiguration()
+ .WriteTo.Sink(sink)
+ .WriteTo.Seq("http://localhost:5341")
+ .CreateLogger()
+
+ let getTestBroker() =
+ match Environment.GetEnvironmentVariable "EQUINOX_KAFKA_BROKER" with
+ | x when String.IsNullOrEmpty x -> invalidOp "missing environment variable 'EQUINOX_KAFKA_BROKER'"
+ | x -> Uri x
+
+ let newId () = let g = System.Guid.NewGuid() in g.ToString("N")
+
+ type Async with
+ static member ParallelThrottled degreeOfParallelism jobs = async {
+ let s = new SemaphoreSlim(degreeOfParallelism)
+ return!
+ jobs
+ |> Seq.map (fun j -> async {
+ let! ct = Async.CancellationToken
+ do! s.WaitAsync ct |> Async.AwaitTask
+ try return! j
+ finally s.Release() |> ignore })
+ |> Async.Parallel
+ }
+
+ type KafkaConsumer with
+ member c.StopAfter(delay : TimeSpan) =
+ Task.Delay(delay).ContinueWith(fun (_:Task) -> c.Stop()) |> ignore
+
+ type TestMessage = { producerId : int ; messageId : int }
+ []
+ type ConsumedTestMessage = { consumerId : int ; message : ConsumeResult ; payload : TestMessage }
+ type ConsumerCallback = KafkaConsumer -> ConsumedTestMessage [] -> Async
+
+ let runProducers log (broker : Uri) (topic : string) (numProducers : int) (messagesPerProducer : int) = async {
+ let runProducer (producerId : int) = async {
+ use producer = KafkaProducer.Create(log, KafkaProducerConfig.Create("panther", broker, Acks.Leader, maxInFlight = 10000), topic)
+
+ let! results =
+ [1 .. messagesPerProducer]
+ |> Seq.map (fun msgId ->
+ let key = string msgId
+ let value = JsonConvert.SerializeObject { producerId = producerId ; messageId = msgId }
+ key, value)
+
+ |> Seq.chunkBySize 100
+ |> Seq.map producer.ProduceBatch
+ |> Async.ParallelThrottled 7
+
+ return Array.concat results
+ }
+
+ return! Async.Parallel [for i in 1 .. numProducers -> runProducer i]
+ }
+
+ let runConsumers log (config : KafkaConsumerConfig) (numConsumers : int) (timeout : TimeSpan option) (handler : ConsumerCallback) = async {
+ let mkConsumer (consumerId : int) = async {
+ let deserialize (msg : ConsumeResult<_,_>) = { consumerId = consumerId ; message = msg ; payload = JsonConvert.DeserializeObject<_> msg.Value }
+
+ // need to pass the consumer instance to the handler callback
+ // do a bit of cyclic dependency fixups
+ let consumerCell = ref None
+ let rec getConsumer() =
+ // avoid potential race conditions by polling
+ match !consumerCell with
+ | None -> Thread.SpinWait 20; getConsumer()
+ | Some c -> c
+
+ let consumer = KafkaConsumer.Start log config (fun batch -> handler (getConsumer()) (Array.map deserialize batch))
+
+ consumerCell := Some consumer
+
+ timeout |> Option.iter consumer.StopAfter
+
+ do! consumer.AwaitCompletion()
+ }
+
+ do! Async.Parallel [for i in 1 .. numConsumers -> mkConsumer i] |> Async.Ignore
+ }
+
+type FactIfBroker() =
+ inherit FactAttribute()
+ override __.Skip = if null <> Environment.GetEnvironmentVariable "EQUINOX_KAFKA_BROKER" then null else "Skipping as no EQUINOX_KAFKA_BROKER supplied"
+
+type T1(testOutputHelper) =
+ let log, broker = createLogger (TestOutputAdapter testOutputHelper), getTestBroker ()
+
+ let [] ``ConfluentKafka producer-consumer basic roundtrip`` () = async {
+ let numProducers = 10
+ let numConsumers = 10
+ let messagesPerProducer = 1000
+
+ let topic = newId() // dev kafka topics are created and truncated automatically
+ let groupId = newId()
+
+ let consumedBatches = new ConcurrentBag()
+ let consumerCallback (consumer:KafkaConsumer) batch = async {
+ do consumedBatches.Add batch
+ let messageCount = consumedBatches |> Seq.sumBy Array.length
+ // signal cancellation if consumed items reaches expected size
+ if messageCount >= numProducers * messagesPerProducer then
+ consumer.Stop()
+ }
+
+ // Section: run the test
+ let producers = runProducers log broker topic numProducers messagesPerProducer |> Async.Ignore
+
+ let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId)
+ let consumers = runConsumers log config numConsumers None consumerCallback
+
+ do! [ producers ; consumers ]
+ |> Async.Parallel
+ |> Async.Ignore
+
+ // Section: assertion checks
+ let ``consumed batches should be non-empty`` =
+ consumedBatches
+ |> Seq.forall (not << Array.isEmpty)
+
+ test <@ ``consumed batches should be non-empty`` @> // "consumed batches should all be non-empty")
+
+ let ``batches should be grouped by partition`` =
+ consumedBatches
+ |> Seq.map (fun batch -> batch |> Seq.distinctBy (fun b -> b.message.Partition) |> Seq.length)
+ |> Seq.forall (fun numKeys -> numKeys = 1)
+
+ test <@ ``batches should be grouped by partition`` @> // "batches should be grouped by partition"
+
+ let allMessages =
+ consumedBatches
+ |> Seq.concat
+ |> Seq.toArray
+
+ let ``all message keys should have expected value`` =
+ allMessages |> Array.forall (fun msg -> int msg.message.Key = msg.payload.messageId)
+
+ test <@ ``all message keys should have expected value`` @> // "all message keys should have expected value"
+
+ let ``should have consumed all expected messages`` =
+ allMessages
+ |> Array.groupBy (fun msg -> msg.payload.producerId)
+ |> Array.map (fun (_, gp) -> gp |> Array.distinctBy (fun msg -> msg.payload.messageId))
+ |> Array.forall (fun gp -> gp.Length = messagesPerProducer)
+
+ test <@ ``should have consumed all expected messages`` @> // "should have consumed all expected messages"
+ }
+
+// separated test type to allow the tests to run in parallel
+type T2(testOutputHelper) =
+ let log, broker = createLogger (TestOutputAdapter testOutputHelper), getTestBroker ()
+
+ let [] ``ConfluentKafka consumer should have expected exception semantics`` () = async {
+ let topic = newId() // dev kafka topics are created and truncated automatically
+ let groupId = newId()
+
+ let! _ = runProducers log broker topic 1 10 // populate the topic with a few messages
+
+ let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId)
+
+ let! r = Async.Catch <| runConsumers log config 1 None (fun _ _ -> raise <|IndexOutOfRangeException())
+ test <@ match r with Choice2Of2 (:? IndexOutOfRangeException) -> true | x -> failwithf "%A" x @>
+ }
+
+ let [] ``Given a topic different consumer group ids should be consuming the same message set`` () = async {
+ let numMessages = 10
+
+ let topic = newId() // dev kafka topics are created and truncated automatically
+
+ let! _ = runProducers log broker topic 1 numMessages // populate the topic with a few messages
+
+ let messageCount = ref 0
+ let groupId1 = newId()
+ let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId1)
+ do! runConsumers log config 1 None
+ (fun c b -> async { if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() })
+
+ test <@ numMessages = !messageCount @>
+
+ let messageCount = ref 0
+ let groupId2 = newId()
+ let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId2)
+ do! runConsumers log config 1 None
+ (fun c b -> async { if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() })
+
+ test <@ numMessages = !messageCount @>
+ }
+
+ let [] ``Spawning a new consumer with same consumer group id should not receive new messages`` () = async {
+ let numMessages = 10
+ let topic = newId() // dev kafka topics are created and truncated automatically
+ let groupId = newId()
+ let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId)
+
+ let! _ = runProducers log broker topic 1 numMessages // populate the topic with a few messages
+
+ // expected to read 10 messages from the first consumer
+ let messageCount = ref 0
+ do! runConsumers log config 1 None
+ (fun c b -> async {
+ if Interlocked.Add(messageCount, b.Length) >= numMessages then
+ c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be stored
+
+ test <@ numMessages = !messageCount @>
+
+ // expected to read no messages from the subsequent consumer
+ let messageCount = ref 0
+ do! runConsumers log config 1 (Some (TimeSpan.FromSeconds 10.))
+ (fun c b -> async {
+ if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() })
+
+ test <@ 0 = !messageCount @>
+ }
+
+// separated test type to allow the tests to run in parallel
+type T3(testOutputHelper) =
+ let log, broker = createLogger (TestOutputAdapter testOutputHelper), getTestBroker ()
+
+ let [] ``Commited offsets should not result in missing messages`` () = async {
+ let numMessages = 10
+ let topic = newId() // dev kafka topics are created and truncated automatically
+ let groupId = newId()
+ let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId)
+
+ let! _ = runProducers log broker topic 1 numMessages // populate the topic with a few messages
+
+ // expected to read 10 messages from the first consumer
+ let messageCount = ref 0
+ do! runConsumers log config 1 None
+ (fun c b -> async {
+ if Interlocked.Add(messageCount, b.Length) >= numMessages then
+ c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be committed)
+
+ test <@ numMessages = !messageCount @>
+
+ let! _ = runProducers log broker topic 1 numMessages // produce more messages
+
+ // expected to read 10 messages from the subsequent consumer,
+ // this is to verify there are no off-by-one errors in how offsets are committed
+ let messageCount = ref 0
+ do! runConsumers log config 1 None
+ (fun c b -> async {
+ if Interlocked.Add(messageCount, b.Length) >= numMessages then
+ c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be committed)
+
+ test <@ numMessages = !messageCount @>
+ }
+
+ let [] ``Consumers should never schedule two batches of the same partition concurrently`` () = async {
+ // writes 2000 messages down a topic with a shuffled partition key
+ // then attempts to consume the topic, while verifying that per-partition batches
+ // are never scheduled for concurrent execution. also checks that batches are
+ // monotonic w.r.t. offsets
+ let numMessages = 2000
+ let maxBatchSize = 5
+ let topic = newId() // dev kafka topics are created and truncated automatically
+ let groupId = newId()
+ let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId, maxBatchSize = maxBatchSize)
+
+ // Produce messages in the topic
+ do! runProducers log broker topic 1 numMessages |> Async.Ignore
+
+ let globalMessageCount = ref 0
+
+ let getPartitionOffset =
+ let state = new ConcurrentDictionary()
+ fun partition -> state.GetOrAdd(partition, fun _ -> ref -1L)
+
+ let getBatchPartitionCount =
+ let state = new ConcurrentDictionary()
+ fun partition -> state.GetOrAdd(partition, fun _ -> ref 0)
+
+ do! runConsumers log config 1 None
+ (fun c b -> async {
+ let partition = let p = b.[0].message.Partition in p.Value
+
+ // check batch sizes are bounded by maxBatchSize
+ test <@ b.Length <= maxBatchSize @> // "batch sizes should never exceed maxBatchSize")
+
+ // check per-partition handlers are serialized
+ let concurrentBatchCell = getBatchPartitionCount partition
+ let concurrentBatches = Interlocked.Increment concurrentBatchCell
+ test <@ 1 = concurrentBatches @> // "partitions should never schedule more than one batch concurrently")
+
+ // check for message monotonicity
+ let offset = getPartitionOffset partition
+ for msg in b do
+ Assert.True((let o = msg.message.Offset in o.Value) > !offset, "offset for partition should be monotonic")
+ offset := let o = msg.message.Offset in o.Value
+
+ do! Async.Sleep 100
+
+ let _ = Interlocked.Decrement concurrentBatchCell
+
+ if Interlocked.Add(globalMessageCount, b.Length) >= numMessages then c.Stop() })
+
+ test <@ numMessages = !globalMessageCount @>
+ }
\ No newline at end of file
diff --git a/tools/Equinox.Tool/Equinox.Tool.fsproj b/tools/Equinox.Tool/Equinox.Tool.fsproj
index a612d28f7..2817b2fd1 100644
--- a/tools/Equinox.Tool/Equinox.Tool.fsproj
+++ b/tools/Equinox.Tool/Equinox.Tool.fsproj
@@ -8,6 +8,7 @@
false
true
true
+ $(DefineConstants);NET461
Equinox.Tool
eqx
@@ -35,6 +36,9 @@
+
+
+
diff --git a/tools/Equinox.Tool/Infrastructure/Infrastructure.fs b/tools/Equinox.Tool/Infrastructure/Infrastructure.fs
index dbed6d114..6115010f0 100644
--- a/tools/Equinox.Tool/Infrastructure/Infrastructure.fs
+++ b/tools/Equinox.Tool/Infrastructure/Infrastructure.fs
@@ -30,13 +30,23 @@ type Exception with
(System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture this).Throw ()
Unchecked.defaultof<_>
+#nowarn "21" // re AwaitKeyboardInterrupt
+#nowarn "40" // re AwaitKeyboardInterrupt
type Async with
+ static member Sleep(t : TimeSpan) : Async = Async.Sleep(int t.TotalMilliseconds)
///
/// Raises an exception using Async's continuation mechanism directly.
///
/// Exception to be raised.
static member Raise (exn : #exn) = Async.FromContinuations(fun (_,ec,_) -> ec exn)
+ /// Asynchronously awaits the next keyboard interrupt event
+ static member AwaitKeyboardInterrupt () : Async =
+ Async.FromContinuations(fun (sc,_,_) ->
+ let isDisposed = ref 0
+ let rec callback _ = Tasks.Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore
+ and d : IDisposable = System.Console.CancelKeyPress.Subscribe callback
+ in ())
#if NET461
static member Choice<'T>(workflows : seq>) : Async<'T option> = async {
try
diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs
index 620496996..93a3a5ec0 100644
--- a/tools/Equinox.Tool/Program.fs
+++ b/tools/Equinox.Tool/Program.fs
@@ -2,8 +2,14 @@
open Argu
open Domain.Infrastructure
+open Equinox.Cosmos.Projection
+open Equinox.Projection.Codec
+open Equinox.Projection.Kafka
+open Equinox.Projection.Validation
+open Equinox.Store.Infrastructure
open Equinox.Tool.Infrastructure
open FSharp.UMX
+open Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing
open Microsoft.Extensions.DependencyInjection
open Samples.Infrastructure.Log
open Samples.Infrastructure.Storage
@@ -12,6 +18,8 @@ open Serilog.Events
open System
open System.Net.Http
open System.Threading
+open System.Collections.Generic
+open System.Diagnostics
[]
type Arguments =
@@ -21,8 +29,8 @@ type Arguments =
| [] LogFile of string
| [] Run of ParseResults
| [] Init of ParseResults
- | [] // this command is not useful unless you have access to the [presently closed-source] Equinox.Cosmos.Projector
- [] InitAux of ParseResults
+ | [] InitAux of ParseResults
+ | [] Project of ParseResults
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose -> "Include low level logging regarding specific test runs."
@@ -31,7 +39,8 @@ type Arguments =
| LogFile _ -> "specify a log file to write the result breakdown into (default: eqx.log)."
| Run _ -> "Run a load test"
| Init _ -> "Initialize store (presently only relevant for `cosmos`, where it creates database+collection+stored proc if not already present)."
- | InitAux _ -> "Initialize auxilliary store (presently only relevant for `cosmos`, when you intend to run the [presently closed source] Projector)."
+ | InitAux _ -> "Initialize auxilliary store (presently only relevant for `cosmos`, when you intend to run the Projector)."
+ | Project _ -> "Project from store specified as the last argument, storing state in the specified `aux` Store (see initAux)."
and []InitArguments =
| [] Rus of int
| [] SkipStoredProc
@@ -50,6 +59,38 @@ and []InitAuxArguments =
| Rus _ -> "Specify RU/s level to provision for the Application Collection."
| Suffix _ -> "Specify Collection Name suffix (default: `-aux`)."
| Cosmos _ -> "Cosmos Connection parameters."
+and []ProjectArguments =
+ | [] LeaseId of string
+ | [] Suffix of string
+ | [] ForceStartFromHere
+ | [] ChangeFeedBatchSize of int
+ | [] LagFreqS of float
+ | [] Stats of ParseResults
+ | [] Kafka of ParseResults
+ interface IArgParserTemplate with
+ member a.Usage = a |> function
+ | LeaseId _ -> "Projector instance context name."
+ | Suffix _ -> "Specify Collection Name suffix (default: `-aux`)."
+ | ForceStartFromHere _ -> "(iff `suffix` represents a fresh projection) - force starting from present Position. Default: Ensure each and every event is projected from the start."
+ | ChangeFeedBatchSize _ -> "Maximum item count to supply to Changefeed Api when querying. Default: 1000"
+ | LagFreqS _ -> "Specify frequency to dump lag stats. Default: off"
+
+ | Stats _ -> "Do not emit events, only stats."
+ | Kafka _ -> "Project to Kafka."
+and [] KafkaTarget =
+ | [] Topic of string
+ | [] Broker of string
+ | [] Cosmos of ParseResults
+ interface IArgParserTemplate with
+ member a.Usage = a |> function
+ | Topic _ -> "Specify target topic. Default: Use $env:EQUINOX_KAFKA_TOPIC"
+ | Broker _ -> "Specify target broker. Default: Use $env:EQUINOX_KAFKA_BROKER"
+ | Cosmos _ -> "Cosmos Connection parameters."
+and [] StatsTarget =
+ | [] Cosmos of ParseResults
+ interface IArgParserTemplate with
+ member a.Usage = a |> function
+ | Cosmos _ -> "Cosmos Connection parameters."
and []WebArguments =
| [] Endpoint of string
interface IArgParserTemplate with
@@ -240,10 +281,96 @@ let main argv =
do! Equinox.Cosmos.Store.Sync.Initialization.createDatabaseIfNotExists conn.Client dbName
do! Equinox.Cosmos.Store.Sync.Initialization.createAuxCollectionIfNotExists conn.Client (dbName,collName) rus }
| _ -> failwith "please specify a `cosmos` endpoint"
+ | Project pargs ->
+ let envBackstop msg key =
+ match Environment.GetEnvironmentVariable key with
+ | null -> failwithf "Please provide a %s, either as an argment or via the %s environment variable" msg key
+ | x -> x
+ let broker, topic, storeArgs =
+ match pargs.GetSubCommand() with
+ | Kafka kargs ->
+ let broker = match kargs.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "EQUINOX_KAFKA_BROKER"
+ let topic = match kargs.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "EQUINOX_KAFKA_TOPIC"
+ Some broker, Some topic,kargs.GetResult KafkaTarget.Cosmos
+ | Stats sargs -> None, None, sargs.GetResult StatsTarget.Cosmos
+ | x -> failwithf "Invalid subcommand %A" x
+ let storeLog = createStoreLog (storeArgs.Contains CosmosArguments.VerboseStore) verboseConsole maybeSeq
+ let (endpointUri, masterKey), dbName, collName, connectionPolicy = Cosmos.connectionPolicy (log,storeLog) storeArgs
+ pargs.TryGetResult ChangeFeedBatchSize |> Option.iter (fun bs -> log.Information("ChangeFeed BatchSize {batchSize}", bs))
+ pargs.TryGetResult LagFreqS |> Option.iter (fun s -> log.Information("Dumping lag stats at {lagS:n0}s intervals", s))
+ let auxCollName = collName + pargs.GetResult(ProjectArguments.Suffix,"-aux")
+ let leaseId = pargs.GetResult(LeaseId)
+ log.Information("Processing using LeaseId {leaseId} in Aux coll {auxCollName}", leaseId, auxCollName)
+ if pargs.Contains ForceStartFromHere then log.Warning("(If new projection prefix) Skipping projection of all existing events.")
+ let source = { database = dbName; collection = collName }
+ let aux = { database = dbName; collection = auxCollName }
+ let validator = FeedValidator()
+
+ let buildRangeProjector () =
+ let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch
+ let producer, disposeProducer =
+ match broker,topic with
+ | Some b,Some t ->
+ let cfg = KafkaProducerConfig.Create("equinox-tool", Uri b, Confluent.Kafka.Acks.Leader, LZ4)
+ let p = KafkaProducer.Create(log, cfg, t)
+ Some p, (p :> IDisposable).Dispose
+ | _ -> None, id
+ let projectBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async {
+ sw.Stop() // Stop the clock after CFP hands off to us
+ let validator = BatchValidator(validator)
+ let toKafkaEvent (e: DocumentParser.IEvent) : Equinox.Projection.Codec.RenderedEvent =
+ { s = e.Stream; i = e.Index; c = e.EventType; t = e.TimeStamp; d = e.Data; m = e.Meta }
+ let validate (e: DocumentParser.IEvent) =
+ match validator.TryIngest(e.Stream, int e.Index) with
+ | Gap -> None // We cannot emit if we have evidence that this will leave a gap
+ | Duplicate // Just because this is a re-delivery does not mean we can drop it - the state does not get reset if we retraverse a batch
+ | Ok | New -> toKafkaEvent e |> Some
+ let pt, events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.choose validate |> Array.ofSeq) |> Stopwatch.Time
+ let! et = async {
+ match producer with
+ | None ->
+ let! et,() = ctx.CheckpointAsync() |> Async.AwaitTaskCorrect |> Stopwatch.Time
+ return et
+ | Some producer ->
+ let es = [| for e in events -> e.s, Newtonsoft.Json.JsonConvert.SerializeObject e |]
+ let! et,_ = producer.ProduceBatch es |> Stopwatch.Time
+ return et }
+
+ if log.IsEnabled LogEventLevel.Debug then log.Debug("Response Headers {0}", let hs = ctx.FeedResponse.ResponseHeaders in [for h in hs -> h, hs.[h]])
+ let r, s = ctx.FeedResponse, validator.Stats
+ log.Information("{range} Fetch: {token} {requestCharge:n0}RU {count} docs {l:n1}s; Parse: c {cats} s {streams} e {events} {p:n3}s; Emit: {e:n1}s",
+ ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], r.RequestCharge, docs.Count, float sw.ElapsedMilliseconds / 1000.,
+ s.categories, s.TotalStreams, events.Length, (let e = pt.Elapsed in e.TotalSeconds), (let e = et.Elapsed in e.TotalSeconds))
+ if s.gap <> 0 then
+ log.Error("Feed inconsistency: gaps led to dropping of {gap} events in batch", s.gap)
+ for KeyValue(stream,stats) in validator.Enum() do
+ log.Error("Gap in Stream {stream}: dropped {gap} events ({fresh} consistent {ok} ok {dups} dups)", stream, stats.gap, stats.fresh, stats.ok, stats.dup)
+ elif s.dup <> 0 then
+ let streamsWithDupsCount = validator.Enum() |> Seq.filter (function KeyValue(_,{dup = d }) -> d <> 0) |> Seq.length
+ log.Information("Feed at least once delivery: {dups} duplicates encountered across {streams} affected streams", s.dup, streamsWithDupsCount)
+ sw.Restart() // restart the clock as we handoff back to the CFP
+ }
+ ChangeFeedObserver.Create(log, projectBatch, disposeProducer)
+
+ let run = async {
+ let logLag (interval : TimeSpan) remainingWork = async {
+ let logLevel = if remainingWork |> Seq.exists (fun (_r,rw) -> rw <> 0L) then Events.LogEventLevel.Information else Events.LogEventLevel.Debug
+ log.Write(logLevel, "Lags {@rangeLags} <- [Range Id, documents count] ", remainingWork)
+ return! Async.Sleep interval }
+ let maybeLogLag = pargs.TryGetResult LagFreqS |> Option.map (TimeSpan.FromSeconds >> logLag)
+ let! _cfp =
+ ChangeFeedProcessor.Start
+ ( log, endpointUri, masterKey, connectionPolicy, source, aux, buildRangeProjector,
+ leasePrefix = leaseId,
+ forceSkipExistingEvents = pargs.Contains ForceStartFromHere,
+ ?cfBatchSize = pargs.TryGetResult ChangeFeedBatchSize,
+ ?reportLagAndAwaitNextEstimation = maybeLogLag)
+ do! Async.AwaitKeyboardInterrupt() }
+ Async.RunSynchronously run
| Run rargs ->
let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName
LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs
- | _ -> failwith "Please specify a valid subcommand :- init, initAux or run"
+ | _ -> failwith "Please specify a valid subcommand :- init, initAux, project or run"
0
with e ->
printfn "%s" e.Message