diff --git a/Directory.Build.props b/Directory.Build.props index 27b579099..24c2ea368 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -22,6 +22,9 @@ $(NoWarn);FS2003;NU5105 + + + 1.1 diff --git a/Equinox.sln b/Equinox.sln index 661569631..553b25ee0 100644 --- a/Equinox.sln +++ b/Equinox.sln @@ -58,6 +58,16 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{8F3E EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Todo", "Todo", "{8CDE1CC3-8619-44DE-8B4D-4102CE476C35}" EndProject +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Cosmos.Projection", "src\Equinox.Cosmos.Projection\Equinox.Cosmos.Projection.fsproj", "{D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}" +EndProject +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Kafka", "src\Equinox.Projection.Kafka\Equinox.Projection.Kafka.fsproj", "{F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}" +EndProject +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Kafka.Integration", "tests\Equinox.Projection.Kafka.Integration\Equinox.Projection.Kafka.Integration.fsproj", "{C8383D84-80F5-46B8-B59B-48B2359E6A37}" +EndProject +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Codec", "src\Equinox.Projection.Codec\Equinox.Projection.Codec.fsproj", "{65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}" +EndProject +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Integration", "tests\Equinox.Projection.Integration\Equinox.Projection.Integration.fsproj", "{047F782D-DC37-4599-8FA0-F9B4D4C09C7B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -128,6 +138,26 @@ Global {EC2EC658-3D85-44F3-AD2F-52AFCAFF8871}.Debug|Any CPU.Build.0 = Debug|Any CPU {EC2EC658-3D85-44F3-AD2F-52AFCAFF8871}.Release|Any CPU.ActiveCfg = Release|Any CPU {EC2EC658-3D85-44F3-AD2F-52AFCAFF8871}.Release|Any CPU.Build.0 = Release|Any CPU + {D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D4641679-9C2B-4B4C-ABA9-B2EC1CB1D79D}.Release|Any CPU.Build.0 = Release|Any CPU + {F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F76A4BA7-3DA8-4012-843B-9E28A4D2EC86}.Release|Any CPU.Build.0 = Release|Any CPU + {C8383D84-80F5-46B8-B59B-48B2359E6A37}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C8383D84-80F5-46B8-B59B-48B2359E6A37}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C8383D84-80F5-46B8-B59B-48B2359E6A37}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C8383D84-80F5-46B8-B59B-48B2359E6A37}.Release|Any CPU.Build.0 = Release|Any CPU + {65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {65F8FC6A-FF31-43A0-B7D9-4C9CF407DEB3}.Release|Any CPU.Build.0 = Release|Any CPU + {047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/build.proj b/build.proj index e2b9ab63b..aa2ff0bf9 100644 --- a/build.proj +++ b/build.proj @@ -19,8 +19,11 @@ + + + @@ -30,6 +33,8 @@ + + diff --git a/global.json b/global.json index 455cd36d7..21e27cede 100644 --- a/global.json +++ b/global.json @@ -1,5 +1,5 @@ { "sdk": { - "version": "2.1.402" + "version": "2.1.500" } } \ No newline at end of file diff --git a/src/Equinox.Cosmos.Projection/ChangeFeedProcessor.fs b/src/Equinox.Cosmos.Projection/ChangeFeedProcessor.fs new file mode 100644 index 000000000..fdfbf3a9d --- /dev/null +++ b/src/Equinox.Cosmos.Projection/ChangeFeedProcessor.fs @@ -0,0 +1,112 @@ +namespace Equinox.Cosmos.Projection + +open Equinox.Cosmos.Projection +open Equinox.Store.Infrastructure +open Microsoft.Azure.Documents +open Microsoft.Azure.Documents.Client +open Microsoft.Azure.Documents.ChangeFeedProcessor +open Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing +open Serilog +open System +open System.Collections.Generic + +type ChangeFeedObserver() = + static member Create(log: ILogger, onChange : IChangeFeedObserverContext -> IReadOnlyList -> Async, ?dispose: unit -> unit) = + { new IChangeFeedObserver with + member __.ProcessChangesAsync(ctx, docs, ct) = (UnitTaskBuilder ct) { + try do! onChange ctx docs + with e -> + log.Warning(e, "Range {partitionKeyRangeId} Handler Threw", ctx.PartitionKeyRangeId) + do! Async.Raise e } + member __.OpenAsync ctx = UnitTaskBuilder() { + log.Information("Range {partitionKeyRangeId} Assigned", ctx.PartitionKeyRangeId) } + member __.CloseAsync (ctx, reason) = UnitTaskBuilder() { + log.Information("Range {partitionKeyRangeId} Revoked {reason}", ctx.PartitionKeyRangeId, reason) } + interface IDisposable with + member __.Dispose() = + match dispose with Some f -> f () | None -> () } + +type ChangeFeedObserverFactory = + static member FromFunction (f : unit -> #IChangeFeedObserver) = + { new IChangeFeedObserverFactory with member __.CreateObserver () = f () :> _ } + +type CosmosCollection = { database: string; collection: string } + +/// Wraps the [Azure CosmosDb ChangeFeedProcessor library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) +type ChangeFeedProcessor = + static member Start + ( log : ILogger, endpoint : Uri, accountKey : string, connectionPolicy : ConnectionPolicy, source : CosmosCollection, + /// The aux, non-partitioned collection holding the partition leases. + // Aux coll should always read from the write region to keep the number of write conflicts to minimum when the sdk + // updates the leases. Since the non-write region(s) might lag behind due to us using non-strong consistency, during + // failover we are likely to reprocess some messages, but that's okay since we are idempotent. + aux : CosmosCollection, + createObserver : unit -> IChangeFeedObserver, + ?leaseOwnerId : string, + /// Identifier to disambiguate multiple independent feed processor positions (akin to a 'consumer group') + ?leasePrefix : string, + /// (NB Only applies if this is the first time this leasePrefix is presented) + /// Specify `true` to request starting of projection from the present write position. + /// Default: false (projecting all events from start beforehand) + ?forceSkipExistingEvents : bool, + /// Limit on items to take in a batch when querying for changes (in addition to 4MB response size limit). Default 1000 + /// NB While a larger limit may deliver better throughput, it's important to balance that .concern with the risk of throttling + ?cfBatchSize : int, + /// Frequency to check for partitions without a processor. Default 1s + ?leaseAcquireInterval : TimeSpan, + /// Frequency to renew leases held by processors under our control. Default 3s + ?leaseRenewInterval : TimeSpan, + /// Duration to take lease when acquired/renewed. Default 10s + ?leaseTtl : TimeSpan, + /// Delay before re-polling a partitition after backlog has been drained + ?feedPollDelay : TimeSpan, + /// Continuously fed per-partion lag information until parent Async completes + /// callback should Async.Sleep until next update is desired + ?reportLagAndAwaitNextEstimation) = async { + + let leaseOwnerId = defaultArg leaseOwnerId (ChangeFeedProcessor.mkLeaseOwnerIdForProcess()) + let cfBatchSize = defaultArg cfBatchSize 1000 + let feedPollDelay = defaultArg feedPollDelay (TimeSpan.FromSeconds 1.) + let leaseAcquireInterval = defaultArg leaseAcquireInterval (TimeSpan.FromSeconds 1.) + let leaseRenewInterval = defaultArg leaseRenewInterval (TimeSpan.FromSeconds 3.) + let leaseTtl = defaultArg leaseTtl (TimeSpan.FromSeconds 10.) + + let inline s (x : TimeSpan) = x.TotalSeconds + log.Information("Processing Lease acquireS={leaseAcquireIntervalS:n0} ttlS={ttlS:n0} renewS={renewS:n0} feedPollDelayS={feedPollDelayS:n0}", + s leaseAcquireInterval, s leaseTtl, s leaseRenewInterval, s feedPollDelay) + + let builder = + let feedProcessorOptions = + ChangeFeedProcessorOptions( + StartFromBeginning = not (defaultArg forceSkipExistingEvents false), + MaxItemCount = Nullable cfBatchSize, + LeaseAcquireInterval = leaseAcquireInterval, LeaseExpirationInterval = leaseTtl, LeaseRenewInterval = leaseRenewInterval, + FeedPollDelay = feedPollDelay) + leasePrefix |> Option.iter (fun lp -> feedProcessorOptions.LeasePrefix <- lp + ":") + let mk d c = DocumentCollectionInfo(Uri = endpoint, DatabaseName = d, CollectionName = c, MasterKey = accountKey, ConnectionPolicy = connectionPolicy) + ChangeFeedProcessorBuilder() + .WithHostName(leaseOwnerId) + .WithFeedCollection(mk source.database source.collection) + .WithLeaseCollection(mk aux.database aux.collection) + .WithProcessorOptions(feedProcessorOptions) + match reportLagAndAwaitNextEstimation with + | None -> () + | Some lagMonitorCallback -> + let! estimator = builder.BuildEstimatorAsync() |> Async.AwaitTaskCorrect + let rec emitLagMetrics () = async { + let! remainingWork = estimator.GetEstimatedRemainingWorkPerPartitionAsync() |> Async.AwaitTaskCorrect + do! lagMonitorCallback <| List.ofSeq (seq { for r in remainingWork -> int (r.PartitionKeyRangeId.Trim[|'"'|]),r.RemainingWork } |> Seq.sortBy fst) + return! emitLagMetrics () } + let! _ = Async.StartChild(emitLagMetrics ()) in () + let! processor = builder.WithObserverFactory(ChangeFeedObserverFactory.FromFunction createObserver).BuildAsync() |> Async.AwaitTaskCorrect + do! processor.StartAsync() |> Async.AwaitTaskCorrect + return processor } + static member private mkLeaseOwnerIdForProcess() = + // If k>1 processes share an owner id, then they will compete for same partitions. + // In that scenario, redundant processing happen on assigned partitions, but checkpoint will process on only 1 consumer. + // Including the processId should eliminate the possibility that a broken process manager causes k>1 scenario to happen. + // The only downside is that upon redeploy, lease experation / TTL would have to be observed before a consumer can pick it up. + let processName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name + let processId = System.Diagnostics.Process.GetCurrentProcess().Id + let hostName = System.Environment.MachineName + sprintf "%s-%s-%d" hostName processName processId \ No newline at end of file diff --git a/src/Equinox.Cosmos.Projection/DocumentParser.fs b/src/Equinox.Cosmos.Projection/DocumentParser.fs new file mode 100644 index 000000000..21d03981f --- /dev/null +++ b/src/Equinox.Cosmos.Projection/DocumentParser.fs @@ -0,0 +1,33 @@ +namespace Equinox.Cosmos.Projection + +open Microsoft.Azure.Documents +open System + +[] +module DocumentParser = + type Document with + member document.Cast<'T>() = + let tmp = new Document() + tmp.SetPropertyValue("content", document) + tmp.GetPropertyValue<'T>("content") + /// Determines whether this document represents an index page [and hence should not be expected to contain any events] + let isIndex (d : Document) = d.Id = "-1" + type IEvent = + inherit Equinox.Cosmos.Store.IIndexedEvent + abstract member Stream : string + abstract member TimeStamp : DateTimeOffset + /// Infers whether the document represents a valid Event-Batch + let enumEvents (d : Document) = seq { + if not (isIndex d) + && d.GetPropertyValue("p") <> null && d.GetPropertyValue("i") <> null + && d.GetPropertyValue("n") <> null && d.GetPropertyValue("e") <> null then + let batch = d.Cast() + yield! batch.e |> Seq.mapi (fun offset x -> + { new IEvent with + member __.Index = batch.i + int64 offset + member __.IsUnfold = false + member __.EventType = x.c + member __.Data = x.d + member __.Meta = x.m + member __.TimeStamp = x.t + member __.Stream = batch.p } ) } \ No newline at end of file diff --git a/src/Equinox.Cosmos.Projection/Equinox.Cosmos.Projection.fsproj b/src/Equinox.Cosmos.Projection/Equinox.Cosmos.Projection.fsproj new file mode 100644 index 000000000..4617df044 --- /dev/null +++ b/src/Equinox.Cosmos.Projection/Equinox.Cosmos.Projection.fsproj @@ -0,0 +1,32 @@ + + + + netstandard2.0;net461 + 5 + false + true + true + true + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Equinox.Cosmos.Projection/Infrastructure.fs b/src/Equinox.Cosmos.Projection/Infrastructure.fs new file mode 100644 index 000000000..273ac2ae8 --- /dev/null +++ b/src/Equinox.Cosmos.Projection/Infrastructure.fs @@ -0,0 +1,50 @@ +namespace Equinox.Cosmos.Projection + +open System +open System.Threading +open System.Threading.Tasks + +[] +module Impl = + type Async with + /// Re-raise an exception so that the current stacktrace is preserved + static member Raise(e : #exn) : Async<'T> = Async.FromContinuations (fun (_,ec,_) -> ec e) + static member Sleep(t : TimeSpan) : Async = Async.Sleep(int t.TotalMilliseconds) + + /// eagerly cancels the parent workflow without waiting for cooperative cancellation by the child computation. + static member TimeoutAfterEager (timeout : TimeSpan) (workflow : Async<'T>) : Async<'T> = async { + let! ct = Async.CancellationToken + return! Async.FromContinuations(fun (sk,ek,ck) -> + let mutable latch = 0 + let inline protect k t = if Interlocked.Increment &latch = 1 then k t + let cts = CancellationTokenSource.CreateLinkedTokenSource(ct) + do cts.CancelAfter timeout + let _ = let t = cts.Token in t.Register(fun () -> protect ek (TimeoutException("async workflow has timed out"))) + Async.StartWithContinuations(workflow, protect sk, protect ek, protect ck, cts.Token)) + } + +[] +type AsyncBuilderAbstract() = + member __.Zero() = async.Zero() + member __.Return t = async.Return t + member __.ReturnFrom t = async.ReturnFrom t + member __.Bind(f,g) = async.Bind(f,g) + member __.Combine(f,g) = async.Combine(f,g) + member __.Delay f = async.Delay f + member __.While(c,b) = async.While(c,b) + member __.For(xs,b) = async.For(xs,b) + member __.Using(d,b) = async.Using(d,b) + member __.TryWith(b,e) = async.TryWith(b,e) + member __.TryFinally(b,f) = async.TryFinally(b,f) + +type TaskBuilder(?ct : CancellationToken) = + inherit AsyncBuilderAbstract() + member __.Run f : Task<'T> = Async.StartAsTask(f, ?cancellationToken = ct) + +type UnitTaskBuilder(?ct : CancellationToken, ?timeout : TimeSpan) = + inherit AsyncBuilderAbstract() + + member __.Run workflow : Task = + let wrapped = match timeout with None -> workflow | Some t -> Async.TimeoutAfterEager t workflow + let task = Async.StartAsTask(wrapped, ?cancellationToken = ct) + task :> _ \ No newline at end of file diff --git a/src/Equinox.Projection.Codec/Equinox.Projection.Codec.fsproj b/src/Equinox.Projection.Codec/Equinox.Projection.Codec.fsproj new file mode 100644 index 000000000..1bd774858 --- /dev/null +++ b/src/Equinox.Projection.Codec/Equinox.Projection.Codec.fsproj @@ -0,0 +1,28 @@ + + + + netstandard2.0;net461 + 5 + false + true + true + true + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Equinox.Projection.Codec/FeedValidator.fs b/src/Equinox.Projection.Codec/FeedValidator.fs new file mode 100644 index 000000000..bdb176125 --- /dev/null +++ b/src/Equinox.Projection.Codec/FeedValidator.fs @@ -0,0 +1,110 @@ +module Equinox.Projection.Validation + +open System.Collections.Generic + +module private Option = + let defaultValue def option = defaultArg option def + +/// Represents the categorisation of an item being ingested +type IngestResult = + /// The item is a correct item at the tail of the known sequence + | New + /// Consistent as far as we know (but this Validator has not seen the head) + | Ok + /// The item represents a duplicate of an earlier item + | Duplicate + /// The item is beyond the tail of the stream and likely represets a gap + | Gap + +/// Represents the present state of a given stream +type StreamState = + /// We've observed the stream from the start + | All of pos: int + /// We've not observed the stream from the start + | Partial of min: int * pos: int + +module StreamState = + let combine (state : StreamState option) index : IngestResult*StreamState = + match state, index with + | None, 0 -> New, All 0 + | None, x -> Ok, Partial (x,x) + | Some (All x), i when i <= x -> Duplicate, All x + | Some (All x), i when i = x + 1 -> New, All i + | Some (All x), _ -> Gap, All x + | Some (Partial (min=1; pos=pos)), 0 -> Duplicate, All pos + | Some (Partial (min=min; pos=x)), i when i <= min -> Duplicate, Partial (i, x) + | Some (Partial (min=min; pos=x)), i when i = x + 1 -> Ok, Partial (min, i) + | Some (Partial (min=min)), i -> New, Partial (min, i) + + +type FeedStats = { complete: int; partial: int } + +/// Maintains the state of a set of streams being ingested into a processor for consistency checking purposes +/// - to determine whether an incoming event on a stream should be considered a duplicate and hence not processed +/// - to allow inconsistencies to be logged +type FeedValidator() = + let streamStates = System.Collections.Generic.Dictionary() + + /// Thread safe operation to a) classify b) track change implied by a new message as encountered + member __.Ingest(stream, index) : IngestResult * StreamState = + lock streamStates <| fun () -> + let state = + match streamStates.TryGetValue stream with + | true, state -> Some state + | false, _ -> None + let (res, state') = StreamState.combine state index + streamStates.[stream] <- state' + res, state' + + /// Determine count of streams being tracked + member __.Stats = + lock streamStates <| fun () -> + let raw = streamStates |> Seq.countBy (fun x -> match x.Value with All _ -> true | Partial _ -> false) |> Seq.toList + { complete = raw |> List.tryPick (function (true,c) -> Some c | (false,_) -> None) |> Option.defaultValue 0 + partial = raw |> List.tryPick (function (false,c) -> Some c | (true,_) -> None) |> Option.defaultValue 0 } + +type [] StreamSummary = { mutable fresh : int; mutable ok : int; mutable dup : int; mutable gap : int; mutable complete: bool } + +type BatchStats = { fresh : int; ok : int; dup : int; gap : int; categories : int; streams : BatchStreamStats } with + member s.TotalStreams = let s = s.streams in s.complete + s.incomplete + +and BatchStreamStats = { complete: int; incomplete: int } + +/// Used to establish aggregate stats for a batch of inputs for logging purposes +/// The Ingested inputs are passed to the supplied validator in order to classify them +type BatchValidator(validator : FeedValidator) = + let streams = System.Collections.Generic.Dictionary() + let streamSummary (streamName : string) = + match streams.TryGetValue streamName with + | true, acc -> acc + | false, _ -> let t = { fresh = 0; ok = 0; dup = 0; gap = 0; complete = false } in streams.[streamName] <- t; t + + /// Collate into Feed Validator and Batch stats + member __.TryIngest(stream, index) : IngestResult = + let res, state = validator.Ingest(stream, index) + let streamStats = streamSummary stream + match state with + | All _ -> streamStats.complete <- true + | Partial _ -> streamStats.complete <- false + match res with + | New -> streamStats.fresh <- streamStats.fresh + 1 + | Ok -> streamStats.ok <- streamStats.ok + 1 + | Duplicate -> streamStats.dup <- streamStats.dup + 1 + | Gap -> streamStats.gap <- streamStats.gap + 1 + res + + member __.Enum() : IEnumerable> = upcast streams + + member __.Stats : BatchStats = + let mutable fresh, ok, dup, gap, complete, incomplete = 0, 0, 0, 0, 0, 0 + let cats = HashSet() + for KeyValue(k,v) in streams do + fresh <- fresh + v.fresh + ok <- ok + v.ok + dup <- dup + v.dup + gap <- gap + v.gap + match k.IndexOf('-') with + | -1 -> () + | i -> cats.Add(k.Substring(0, i)) |> ignore + if v.complete then complete <- complete + 1 else incomplete <- incomplete + 1 + { fresh = fresh; ok = ok; dup = dup; gap = gap; categories = cats.Count; streams = { complete = complete; incomplete = incomplete } } \ No newline at end of file diff --git a/src/Equinox.Projection.Codec/RenderedEvent.fs b/src/Equinox.Projection.Codec/RenderedEvent.fs new file mode 100644 index 000000000..57b2a378c --- /dev/null +++ b/src/Equinox.Projection.Codec/RenderedEvent.fs @@ -0,0 +1,29 @@ +namespace Equinox.Projection + +open Newtonsoft.Json +open System + +module Codec = + /// Default rendition of an event when being projected to Kafka + type [] RenderedEvent = + { /// Stream Name + s: string + + /// Index within stream + i: int64 + + /// Event Type associated with event data in `d` + c: string + + /// Timestamp of original write + t: DateTimeOffset // ISO 8601 + + /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb + // TOCONSIDER if we don't inline `h`, we need to inline this + [)>] + d: byte[] // required + + /// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing) + [)>] + [] + m: byte[] } \ No newline at end of file diff --git a/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj b/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj new file mode 100644 index 000000000..2452863fe --- /dev/null +++ b/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj @@ -0,0 +1,28 @@ + + + + netstandard2.0;net461 + 5 + false + true + true + true + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Equinox.Projection.Kafka/Infrastructure.fs b/src/Equinox.Projection.Kafka/Infrastructure.fs new file mode 100644 index 000000000..6a7e89710 --- /dev/null +++ b/src/Equinox.Projection.Kafka/Infrastructure.fs @@ -0,0 +1,31 @@ +namespace Equinox.Projection.Kafka + +open System +open System.Threading.Tasks + +[] +module private AsyncHelpers = + type Async with + static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> = + Async.FromContinuations <| fun (k,ek,_) -> + task.ContinueWith (fun (t:Task<'T>) -> + if t.IsFaulted then + let e = t.Exception + if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0] + else ek e + elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled.")) + elif t.IsCompleted then k t.Result + else ek(Exception "invalid Task state!")) + |> ignore + + static member AwaitTaskCorrect (task : Task) : Async = + Async.FromContinuations <| fun (k,ek,_) -> + task.ContinueWith (fun (t:Task) -> + if t.IsFaulted then + let e = t.Exception + if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0] + else ek e + elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled.")) + elif t.IsCompleted then k () + else ek(Exception "invalid Task state!")) + |> ignore diff --git a/src/Equinox.Projection.Kafka/Kafka.fs b/src/Equinox.Projection.Kafka/Kafka.fs new file mode 100644 index 000000000..30137bb47 --- /dev/null +++ b/src/Equinox.Projection.Kafka/Kafka.fs @@ -0,0 +1,356 @@ +namespace Equinox.Projection.Kafka + +open Confluent.Kafka +open Serilog +open System +open System.Collections.Concurrent +open System.Collections.Generic +open System.Threading +open System.Threading.Tasks + +module private Config = + let validateBrokerUri (u:Uri) = + if not u.IsAbsoluteUri then invalidArg "broker" "should be of 'host:port' format" + if String.IsNullOrEmpty u.Authority then + // handle a corner case in which Uri instances are erroneously putting the hostname in the `scheme` field. + if System.Text.RegularExpressions.Regex.IsMatch(string u, "^\S+:[0-9]+$") then string u + else invalidArg "broker" "should be of 'host:port' format" + + else u.Authority + +type Compression = Uncompressed | GZip | Snappy | LZ4 // as soon as CK provides such an Enum, this can go + +/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings +[] +type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression : Compression, acks : Acks) = + member val Conf : ProducerConfig = conf + member val Acks = acks + member val Broker = broker + member val Compression = compression + member __.Kvps = Seq.append conf cfgs + + /// Creates a Kafka producer instance with supplied configuration + static member Create + ( clientId : string, broker : Uri, acks : Acks, + /// Message compression. Defaults to Uncompressed/'none'. + ?compression, + /// Maximum in-flight requests; <> 1 implies potential reordering of writes should a batch fail and then succeed in a subsequent retry. Defaults to 1. + ?maxInFlight, + /// Number of retries. Defaults to 60. + ?retries, + /// Backoff interval. Defaults to 1 second. + ?retryBackoff, + /// Statistics Interval. Defaults to no stats. + ?statisticsInterval, + /// Defaults to true. + ?socketKeepAlive, + /// Defaults to 10 ms. + ?linger : TimeSpan, + /// Defaults to 'consistent_random'. + ?partitioner, + /// Misc configuration parameter to be passed to the underlying CK producer. + ?custom) = + let compression = defaultArg compression Uncompressed + let cfgs = seq { + yield KeyValuePair("compression.codec", match compression with Uncompressed -> "none" | GZip -> "gzip" | Snappy -> "snappy" | LZ4 -> "lz4") + match custom with None -> () | Some miscConfig -> yield! miscConfig } + let c = + ProducerConfig( + ClientId = clientId, BootstrapServers = Config.validateBrokerUri broker, + RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), + MessageSendMaxRetries = Nullable (defaultArg retries 60), + Acks = Nullable acks, + LingerMs = Nullable (match linger with Some t -> int t.TotalMilliseconds | None -> 10), + SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true), + Partitioner = Nullable (defaultArg partitioner Partitioner.ConsistentRandom), + MaxInFlight = Nullable (defaultArg maxInFlight 1000000), + LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 + statisticsInterval |> Option.iter (fun (i : TimeSpan) -> c.StatisticsIntervalMs <- Nullable (int i.TotalMilliseconds)) + KafkaProducerConfig(c, cfgs, broker, compression, acks) + +type KafkaProducer private (log: ILogger, producer : Producer, topic : string) = + member __.Topic = topic + + interface IDisposable with member __.Dispose() = for d in [(*d1;d2;*)producer:>IDisposable] do d.Dispose() + + /// Produces a batch of supplied key/value messages. Results are returned in order of writing. + member __.ProduceBatch(keyValueBatch : (string * string)[]) = async { + if Array.isEmpty keyValueBatch then return [||] else + + let! ct = Async.CancellationToken + + let tcs = new TaskCompletionSource[]>() + let numMessages = keyValueBatch.Length + let results = Array.zeroCreate> numMessages + let numCompleted = ref 0 + + use _ = ct.Register(fun _ -> tcs.TrySetCanceled() |> ignore) + + let handler (m : DeliveryReport) = + if m.Error.IsError then + let errorMsg = exn (sprintf "Error on message topic=%s code=%O reason=%s" m.Topic m.Error.Code m.Error.Reason) + tcs.TrySetException errorMsg |> ignore + else + let i = Interlocked.Increment numCompleted + results.[i - 1] <- m + if i = numMessages then tcs.TrySetResult results |> ignore + for key,value in keyValueBatch do + producer.BeginProduce(topic, Message<_,_>(Key=key, Value=value), deliveryHandler = handler) + producer.Flush(ct) + log.Debug("Produced {count}",!numCompleted) + return! Async.AwaitTaskCorrect tcs.Task } + + static member Create(log : ILogger, config : KafkaProducerConfig, topic : string) = + if String.IsNullOrEmpty topic then nullArg "topic" + log.Information("Producing... {broker} / {topic} compression={compression} acks={acks}", config.Broker, topic, config.Compression, config.Acks) + let producer = + ProducerBuilder(config.Kvps) + .SetLogHandler(fun _p m -> log.Information("{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) + .SetErrorHandler(fun _p e -> log.Error("{reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) + .Build() + new KafkaProducer(log, producer, topic) + + type ConsumerBufferingConfig = { minInFlightBytes : int64; maxInFlightBytes : int64; maxBatchSize : int; maxBatchDelay : TimeSpan } + + module private ConsumerImpl = + /// guesstimate approximate message size in bytes + let approximateMessageBytes (message : ConsumeResult) = + let inline len (x:string) = match x with null -> 0 | x -> sizeof * x.Length + 16 + len message.Key + len message.Value |> int64 + + type BlockingCollection<'T> with + member bc.FillBuffer(buffer : 'T[], maxDelay : TimeSpan) : int = + let cts = new CancellationTokenSource() + do cts.CancelAfter maxDelay + + let n = buffer.Length + let mutable i = 0 + let mutable t = Unchecked.defaultof<'T> + + while i < n && not cts.IsCancellationRequested do + if bc.TryTake(&t, 5 (* ms *)) then + buffer.[i] <- t ; i <- i + 1 + while i < n && not cts.IsCancellationRequested && bc.TryTake(&t) do + buffer.[i] <- t ; i <- i + 1 + i + + type PartitionedBlockingCollection<'Key, 'Message when 'Key : equality>(?perPartitionCapacity : int) = + let collections = new ConcurrentDictionary<'Key, Lazy>>() + let onPartitionAdded = new Event<'Key * BlockingCollection<'Message>>() + + let createCollection() = + match perPartitionCapacity with + | None -> new BlockingCollection<'Message>() + | Some c -> new BlockingCollection<'Message>(boundedCapacity = c) + + [] + member __.OnPartitionAdded = onPartitionAdded.Publish + + member __.Add (key : 'Key, message : 'Message) = + let factory key = lazy( + let coll = createCollection() + onPartitionAdded.Trigger(key, coll) + coll) + + let buffer = collections.GetOrAdd(key, factory) + buffer.Value.Add message + + member __.Revoke(key : 'Key) = + match collections.TryRemove key with + | true, coll -> Task.Delay(10000).ContinueWith(fun _ -> coll.Value.CompleteAdding()) |> ignore + | _ -> () + + type InFlightMessageCounter(log: ILogger, minInFlightBytes : int64, maxInFlightBytes : int64) = + do if minInFlightBytes < 1L then invalidArg "minInFlightBytes" "must be positive value" + if maxInFlightBytes < 1L then invalidArg "maxInFlightBytes" "must be positive value" + if minInFlightBytes > maxInFlightBytes then invalidArg "maxInFlightBytes" "must be greater than minInFlightBytes" + + let mutable inFlightBytes = 0L + + member __.Add(numBytes : int64) = Interlocked.Add(&inFlightBytes, numBytes) |> ignore + + member __.AwaitThreshold() = + if inFlightBytes > maxInFlightBytes then + log.Warning("Consumer reached in-flight message threshold, breaking off polling, bytes={max}", inFlightBytes) + while inFlightBytes > minInFlightBytes do Thread.Sleep 5 + log.Information "Consumer resuming polling" + + let mkBatchedMessageConsumer (log: ILogger) (buf : ConsumerBufferingConfig) (ct : CancellationToken) (consumer : Consumer) + (partitionedCollection: PartitionedBlockingCollection>) + (handler : ConsumeResult[] -> Async) = async { + let tcs = new TaskCompletionSource() + use cts = CancellationTokenSource.CreateLinkedTokenSource(ct) + use _ = ct.Register(fun _ -> tcs.TrySetResult () |> ignore) + + use _ = consumer + + let counter = new InFlightMessageCounter(log, buf.minInFlightBytes, buf.maxInFlightBytes) + + // starts a tail recursive loop that dequeues batches for a given partition buffer and schedules the user callback + let consumePartition (collection : BlockingCollection>) = + let buffer = Array.zeroCreate buf.maxBatchSize + let nextBatch () = + let count = collection.FillBuffer(buffer, buf.maxBatchDelay) + if count <> 0 then log.Debug("Consuming {count}", count) + let batch = Array.init count (fun i -> buffer.[i]) + Array.Clear(buffer, 0, count) + batch + + let rec loop () = async { + if not collection.IsCompleted then + try match nextBatch() with + | [||] -> () + | batch -> + // run the handler function + do! handler batch + + // store completed offsets + let tpo = + let maxOffset = batch |> Array.maxBy (fun m -> let o = m.Offset in o.Value) + let raw = maxOffset.TopicPartitionOffset + TopicPartitionOffset(raw.Topic, raw.Partition, Offset(let o = raw.Offset in o.Value + 1L)) + consumer.StoreOffsets[| tpo |] + + // decrement in-flight message counter + let batchSize = batch |> Array.sumBy approximateMessageBytes + counter.Add -batchSize + with e -> + tcs.TrySetException e |> ignore + cts.Cancel() + do! loop() } + + Async.Start(loop(), cts.Token) + + use _ = partitionedCollection.OnPartitionAdded.Subscribe (fun (_key,buffer) -> consumePartition buffer) + + // run the consumer + let ct = cts.Token + try while not ct.IsCancellationRequested do + counter.AwaitThreshold() + try let message = consumer.Consume(cts.Token) // NB don't use TimeSpan overload unless you want AVEs on1.0.0-beta2 + if message <> null then + counter.Add(approximateMessageBytes message) + partitionedCollection.Add(message.TopicPartition, message) + with| :? ConsumeException as e -> log.Warning(e, "Consume exception") + | :? System.OperationCanceledException -> log.Warning("Consumer:{name} | Consuming cancelled", consumer.Name) + finally + consumer.Close() + + // await for handler faults or external cancellation + do! Async.AwaitTaskCorrect tcs.Task + } + +/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings +[] +type KafkaConsumerConfig = private { conf: ConsumerConfig; custom: seq>; topics: string list; buffering: ConsumerBufferingConfig } with + member __.Kvps = Seq.append __.conf __.custom + /// Builds a Kafka Consumer Config suitable for KafkaConsumer.Start* + static member Create + ( /// Identify this consumer in logs etc + clientId, broker : Uri, topics, + /// Consumer group identifier. + groupId, + /// Default Earliest. + ?autoOffsetReset, + /// Default 100kB. + ?fetchMaxBytes, + /// Default 10B. + ?fetchMinBytes, + ?statisticsInterval, + /// Consumed offsets commit interval. Default 10s. (WAS 1s) + ?offsetCommitInterval, + /// Misc configuration parameter to be passed to the underlying CK consumer. + ?custom, + + (* Client side batching *) + + /// Maximum number of messages to group per batch on consumer callbacks. Default 1000. + ?maxBatchSize, + /// Message batch linger time. Default 500ms. + ?maxBatchDelay, + /// Minimum total size of consumed messages in-memory for the consumer to attempt to fill. Default 16MiB. + ?minInFlightBytes, + /// Maximum total size of consumed messages in-memory before broker polling is throttled. Default 24MiB. + ?maxInFlightBytes) = + let conf = + ConsumerConfig( + ClientId=clientId, BootstrapServers=Config.validateBrokerUri broker, GroupId=groupId, + AutoOffsetReset = Nullable (defaultArg autoOffsetReset AutoOffsetReset.Earliest), + FetchMaxBytes = Nullable (defaultArg fetchMaxBytes 100000), + MessageMaxBytes = Nullable (defaultArg fetchMaxBytes 100000), + FetchMinBytes = Nullable (defaultArg fetchMinBytes 10), // TODO check if sane default + EnableAutoCommit = Nullable true, + AutoCommitIntervalMs = Nullable (match offsetCommitInterval with Some (i: TimeSpan) -> int i.TotalMilliseconds | None -> 1000*10), + EnableAutoOffsetStore = Nullable false, + LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 + statisticsInterval |> Option.iter (fun (i : TimeSpan) -> conf.StatisticsIntervalMs <- Nullable (int i.TotalMilliseconds)) + { conf = conf; custom = match custom with None -> Seq.empty | Some c -> List.toSeq c + topics = match Seq.toList topics with [] -> invalidArg "topics" "must be non-empty collection" | ts -> ts + buffering = { + maxBatchSize = defaultArg maxBatchSize 1000 + maxBatchDelay = defaultArg maxBatchDelay (TimeSpan.FromMilliseconds 500.) + minInFlightBytes = defaultArg minInFlightBytes (16L * 1024L * 1024L) + maxInFlightBytes = defaultArg maxInFlightBytes (24L * 1024L * 1024L) } } + +type KafkaConsumer private (log : ILogger, consumer : Consumer, task : Task, cts : CancellationTokenSource) = + + /// Asynchronously awaits until consumer stops or is faulted + member __.AwaitCompletion() = Async.AwaitTaskCorrect task + interface IDisposable with member __.Dispose() = __.Stop() + + member __.Status = task.Status + member __.Stop() = + log.Information("Consuming ... Stopping {name}", consumer.Name) + cts.Cancel(); + + /// Starts a kafka consumer with provider configuration and batch message handler. + /// Batches are grouped by topic partition. Batches belonging to the same topic partition will be scheduled sequentially and monotonically, + /// however batches from different partitions can be run concurrently. + static member Start (log : ILogger) (config : KafkaConsumerConfig) (partitionHandler : ConsumeResult[] -> Async) = + if List.isEmpty config.topics then invalidArg "config" "must specify at least one topic" + log.Information("Consuming... {broker} {topics} {groupId}" (*autoOffsetReset={autoOffsetReset}*) + " fetchMaxBytes={fetchMaxB} maxInFlightBytes={maxInFlightB} maxBatchSize={maxBatchB} maxBatchDelay={maxBatchDelay}s", + config.conf.BootstrapServers, config.topics, config.conf.GroupId, (*config.conf.AutoOffsetReset.Value,*) config.conf.FetchMaxBytes, + config.buffering.maxInFlightBytes, config.buffering.maxBatchSize, (let t = config.buffering.maxBatchDelay in t.TotalSeconds)) + + let partitionedCollection = new ConsumerImpl.PartitionedBlockingCollection>() + let consumer = + ConsumerBuilder<_,_>(config.Kvps) + .SetLogHandler(fun _c m -> log.Information("consumer_info|{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) + .SetErrorHandler(fun _c e -> log.Error("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) + .SetRebalanceHandler(fun _c m -> + for topic,partitions in m.Partitions |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> let p = p.Partition in p.Value |]) do + if m.IsAssignment then log.Information("Consuming... Assigned {topic:l} {partitions}", topic, partitions) + else log.Information("Consuming... Revoked {topic:l} {partitions}", topic, partitions) + if m.IsRevocation then m.Partitions |> Seq.iter partitionedCollection.Revoke) + //let d5 = c.OnPartitionEOF.Subscribe(fun tpo -> + // log.Verbose("consumer_partition_eof|topic={topic}|partition={partition}|offset={offset}", tpo.Topic, tpo.Partition, let o = tpo.Offset in o.Value)) + .SetOffsetsCommittedHandler(fun _c cos -> + for t,ps in cos.Offsets |> Seq.groupBy (fun p -> p.Topic) do + let o = [for p in ps -> let pp = p.Partition in pp.Value, let o = p.Offset in if o.IsSpecial then box (string o) else box o.Value(*, fmtError p.Error*)] + let e = cos.Error + if not e.IsError then log.Information("Consuming... Committed {topic} {@offsets}", t, o) + else log.Warning("Consuming... Committed {topic} {@offsets} reason={error} code={code} isBrokerError={isBrokerError}", t, o, e.Reason, e.Code, e.IsBrokerError)) + .Build() + consumer.Subscribe config.topics + let cts = new CancellationTokenSource() + let task = ConsumerImpl.mkBatchedMessageConsumer log config.buffering cts.Token consumer partitionedCollection partitionHandler |> Async.StartAsTask + new KafkaConsumer(log, consumer, task, cts) + + /// Starts a Kafka consumer instance that schedules handlers grouped by message key. Additionally accepts a global degreeOfParallelism parameter + /// that controls the number of handlers running concurrently across partitions for the given consumer instance. + static member StartByKey (log: ILogger) (config : KafkaConsumerConfig) (degreeOfParallelism : int) (keyHandler : ConsumeResult<_,_> [] -> Async) = + let semaphore = new SemaphoreSlim(degreeOfParallelism) + let partitionHandler (messages : ConsumeResult<_,_>[]) = async { + return! + messages + |> Seq.groupBy (fun m -> m.Key) + |> Seq.map (fun (_,gp) -> async { + let! ct = Async.CancellationToken + let! _ = semaphore.WaitAsync ct |> Async.AwaitTaskCorrect + try do! keyHandler (Seq.toArray gp) + finally semaphore.Release() |> ignore }) + |> Async.Parallel + |> Async.Ignore + } + + KafkaConsumer.Start log config partitionHandler \ No newline at end of file diff --git a/tests/Equinox.Projection.Integration/Equinox.Projection.Integration.fsproj b/tests/Equinox.Projection.Integration/Equinox.Projection.Integration.fsproj new file mode 100644 index 000000000..55cb4ff0b --- /dev/null +++ b/tests/Equinox.Projection.Integration/Equinox.Projection.Integration.fsproj @@ -0,0 +1,29 @@ + + + + netcoreapp2.1;net461 + 5 + false + + + + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers + + + + + diff --git a/tests/Equinox.Projection.Integration/FeedValidatorTests.fs b/tests/Equinox.Projection.Integration/FeedValidatorTests.fs new file mode 100644 index 000000000..c02bb3c69 --- /dev/null +++ b/tests/Equinox.Projection.Integration/FeedValidatorTests.fs @@ -0,0 +1,70 @@ +module Equinox.Projection.Tests + +open Equinox.Projection.Validation +open FsCheck.Xunit +open Swensen.Unquote +open Xunit + +let [] ``Properties`` state index = + let result,state' = StreamState.combine state index + match state with + | Some (Partial (_, pos)) when pos <= 1 -> () + | Some (Partial (min, pos)) when min > pos || min = 0 -> () + | None -> + match state' with + | All _ -> result =! New + | Partial _ -> result =! Ok + | Some (All x) -> + match state',result with + | All x' , Duplicate -> x' =! x + | All x', New -> x' =! x+1 + | All x', Gap -> x' =! x + | x -> failwithf "Unexpected %A" x + | Some (Partial (min, pos)) -> + match state',result with + | All 0,Duplicate when min=0 && index = 0 -> () + | All x,Duplicate when min=1 && pos = x && index = 0 -> () + | Partial (min', pos'), Duplicate -> min' =! max min' index; pos' =! pos + | Partial (min', pos'), Ok + | Partial (min', pos'), New -> min' =! min; pos' =! index + | x -> failwithf "Unexpected %A" x + +let [] ``Zero on unknown stream is New`` () = + let result,state = StreamState.combine None 0 + New =! result + All 0 =! state + +let [] ``Non-zero on unknown stream is Ok`` () = + let result,state = StreamState.combine None 1 + Ok =! result + Partial (1,1) =! state + +let [] ``valid successor is New`` () = + let result,state = StreamState.combine (All 0 |> Some) 1 + New =! result + All 1 =! state + +let [] ``single immediate repeat is flagged`` () = + let result,state = StreamState.combine (All 0 |> Some) 0 + Duplicate =! result + All 0 =! state + +let [] ``non-immediate repeat is flagged`` () = + let result,state = StreamState.combine (All 1 |> Some) 0 + Duplicate =! result + All 1 =! state + +let [] ``Gap is flagged`` () = + let result,state = StreamState.combine (All 1 |> Some) 3 + Gap =! result + All 1 =! state + +let [] ``Potential gaps are not flagged as such when we're processing a Partial`` () = + let result,state = StreamState.combine (Some (Partial (1,1))) 3 + New =! result + Partial (1,3) =! state + +let [] ``Earlier values widen the min when we're processing a Partial`` () = + let result,state = StreamState.combine (Some (Partial (2,3))) 1 + Duplicate =! result + Partial (1,3) =! state \ No newline at end of file diff --git a/tests/Equinox.Projection.Kafka.Integration/Equinox.Projection.Kafka.Integration.fsproj b/tests/Equinox.Projection.Kafka.Integration/Equinox.Projection.Kafka.Integration.fsproj new file mode 100644 index 000000000..d0bdc08e4 --- /dev/null +++ b/tests/Equinox.Projection.Kafka.Integration/Equinox.Projection.Kafka.Integration.fsproj @@ -0,0 +1,29 @@ + + + + netcoreapp2.1;net461 + 5 + false + + + + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers + + + + + diff --git a/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs b/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs new file mode 100644 index 000000000..96d2d3967 --- /dev/null +++ b/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs @@ -0,0 +1,327 @@ +namespace Equinox.Projection.Kafka.KafkaIntegration + +open Equinox.Projection.Kafka +open Newtonsoft.Json +open Serilog +open Swensen.Unquote +open System +open System.Collections.Concurrent +open System.ComponentModel +open System.Threading +open System.Threading.Tasks +open Xunit + +[] +[] +module Helpers = + open Confluent.Kafka + + // Derived from https://github.com/damianh/CapturingLogOutputWithXunit2AndParallelTests + // NB VS does not surface these atm, but other test runners / test reports do + type TestOutputAdapter(testOutput : Xunit.Abstractions.ITestOutputHelper) = + let formatter = Serilog.Formatting.Display.MessageTemplateTextFormatter("{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level}] {Message}{NewLine}{Exception}", null); + let writeSerilogEvent logEvent = + use writer = new System.IO.StringWriter() + formatter.Format(logEvent, writer); + writer |> string |> testOutput.WriteLine + writer |> string |> System.Diagnostics.Debug.WriteLine + interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent + + let createLogger sink = + LoggerConfiguration() + .WriteTo.Sink(sink) + .WriteTo.Seq("http://localhost:5341") + .CreateLogger() + + let getTestBroker() = + match Environment.GetEnvironmentVariable "EQUINOX_KAFKA_BROKER" with + | x when String.IsNullOrEmpty x -> invalidOp "missing environment variable 'EQUINOX_KAFKA_BROKER'" + | x -> Uri x + + let newId () = let g = System.Guid.NewGuid() in g.ToString("N") + + type Async with + static member ParallelThrottled degreeOfParallelism jobs = async { + let s = new SemaphoreSlim(degreeOfParallelism) + return! + jobs + |> Seq.map (fun j -> async { + let! ct = Async.CancellationToken + do! s.WaitAsync ct |> Async.AwaitTask + try return! j + finally s.Release() |> ignore }) + |> Async.Parallel + } + + type KafkaConsumer with + member c.StopAfter(delay : TimeSpan) = + Task.Delay(delay).ContinueWith(fun (_:Task) -> c.Stop()) |> ignore + + type TestMessage = { producerId : int ; messageId : int } + [] + type ConsumedTestMessage = { consumerId : int ; message : ConsumeResult ; payload : TestMessage } + type ConsumerCallback = KafkaConsumer -> ConsumedTestMessage [] -> Async + + let runProducers log (broker : Uri) (topic : string) (numProducers : int) (messagesPerProducer : int) = async { + let runProducer (producerId : int) = async { + use producer = KafkaProducer.Create(log, KafkaProducerConfig.Create("panther", broker, Acks.Leader, maxInFlight = 10000), topic) + + let! results = + [1 .. messagesPerProducer] + |> Seq.map (fun msgId -> + let key = string msgId + let value = JsonConvert.SerializeObject { producerId = producerId ; messageId = msgId } + key, value) + + |> Seq.chunkBySize 100 + |> Seq.map producer.ProduceBatch + |> Async.ParallelThrottled 7 + + return Array.concat results + } + + return! Async.Parallel [for i in 1 .. numProducers -> runProducer i] + } + + let runConsumers log (config : KafkaConsumerConfig) (numConsumers : int) (timeout : TimeSpan option) (handler : ConsumerCallback) = async { + let mkConsumer (consumerId : int) = async { + let deserialize (msg : ConsumeResult<_,_>) = { consumerId = consumerId ; message = msg ; payload = JsonConvert.DeserializeObject<_> msg.Value } + + // need to pass the consumer instance to the handler callback + // do a bit of cyclic dependency fixups + let consumerCell = ref None + let rec getConsumer() = + // avoid potential race conditions by polling + match !consumerCell with + | None -> Thread.SpinWait 20; getConsumer() + | Some c -> c + + let consumer = KafkaConsumer.Start log config (fun batch -> handler (getConsumer()) (Array.map deserialize batch)) + + consumerCell := Some consumer + + timeout |> Option.iter consumer.StopAfter + + do! consumer.AwaitCompletion() + } + + do! Async.Parallel [for i in 1 .. numConsumers -> mkConsumer i] |> Async.Ignore + } + +type FactIfBroker() = + inherit FactAttribute() + override __.Skip = if null <> Environment.GetEnvironmentVariable "EQUINOX_KAFKA_BROKER" then null else "Skipping as no EQUINOX_KAFKA_BROKER supplied" + +type T1(testOutputHelper) = + let log, broker = createLogger (TestOutputAdapter testOutputHelper), getTestBroker () + + let [] ``ConfluentKafka producer-consumer basic roundtrip`` () = async { + let numProducers = 10 + let numConsumers = 10 + let messagesPerProducer = 1000 + + let topic = newId() // dev kafka topics are created and truncated automatically + let groupId = newId() + + let consumedBatches = new ConcurrentBag() + let consumerCallback (consumer:KafkaConsumer) batch = async { + do consumedBatches.Add batch + let messageCount = consumedBatches |> Seq.sumBy Array.length + // signal cancellation if consumed items reaches expected size + if messageCount >= numProducers * messagesPerProducer then + consumer.Stop() + } + + // Section: run the test + let producers = runProducers log broker topic numProducers messagesPerProducer |> Async.Ignore + + let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId) + let consumers = runConsumers log config numConsumers None consumerCallback + + do! [ producers ; consumers ] + |> Async.Parallel + |> Async.Ignore + + // Section: assertion checks + let ``consumed batches should be non-empty`` = + consumedBatches + |> Seq.forall (not << Array.isEmpty) + + test <@ ``consumed batches should be non-empty`` @> // "consumed batches should all be non-empty") + + let ``batches should be grouped by partition`` = + consumedBatches + |> Seq.map (fun batch -> batch |> Seq.distinctBy (fun b -> b.message.Partition) |> Seq.length) + |> Seq.forall (fun numKeys -> numKeys = 1) + + test <@ ``batches should be grouped by partition`` @> // "batches should be grouped by partition" + + let allMessages = + consumedBatches + |> Seq.concat + |> Seq.toArray + + let ``all message keys should have expected value`` = + allMessages |> Array.forall (fun msg -> int msg.message.Key = msg.payload.messageId) + + test <@ ``all message keys should have expected value`` @> // "all message keys should have expected value" + + let ``should have consumed all expected messages`` = + allMessages + |> Array.groupBy (fun msg -> msg.payload.producerId) + |> Array.map (fun (_, gp) -> gp |> Array.distinctBy (fun msg -> msg.payload.messageId)) + |> Array.forall (fun gp -> gp.Length = messagesPerProducer) + + test <@ ``should have consumed all expected messages`` @> // "should have consumed all expected messages" + } + +// separated test type to allow the tests to run in parallel +type T2(testOutputHelper) = + let log, broker = createLogger (TestOutputAdapter testOutputHelper), getTestBroker () + + let [] ``ConfluentKafka consumer should have expected exception semantics`` () = async { + let topic = newId() // dev kafka topics are created and truncated automatically + let groupId = newId() + + let! _ = runProducers log broker topic 1 10 // populate the topic with a few messages + + let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId) + + let! r = Async.Catch <| runConsumers log config 1 None (fun _ _ -> raise <|IndexOutOfRangeException()) + test <@ match r with Choice2Of2 (:? IndexOutOfRangeException) -> true | x -> failwithf "%A" x @> + } + + let [] ``Given a topic different consumer group ids should be consuming the same message set`` () = async { + let numMessages = 10 + + let topic = newId() // dev kafka topics are created and truncated automatically + + let! _ = runProducers log broker topic 1 numMessages // populate the topic with a few messages + + let messageCount = ref 0 + let groupId1 = newId() + let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId1) + do! runConsumers log config 1 None + (fun c b -> async { if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() }) + + test <@ numMessages = !messageCount @> + + let messageCount = ref 0 + let groupId2 = newId() + let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId2) + do! runConsumers log config 1 None + (fun c b -> async { if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() }) + + test <@ numMessages = !messageCount @> + } + + let [] ``Spawning a new consumer with same consumer group id should not receive new messages`` () = async { + let numMessages = 10 + let topic = newId() // dev kafka topics are created and truncated automatically + let groupId = newId() + let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId) + + let! _ = runProducers log broker topic 1 numMessages // populate the topic with a few messages + + // expected to read 10 messages from the first consumer + let messageCount = ref 0 + do! runConsumers log config 1 None + (fun c b -> async { + if Interlocked.Add(messageCount, b.Length) >= numMessages then + c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be stored + + test <@ numMessages = !messageCount @> + + // expected to read no messages from the subsequent consumer + let messageCount = ref 0 + do! runConsumers log config 1 (Some (TimeSpan.FromSeconds 10.)) + (fun c b -> async { + if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() }) + + test <@ 0 = !messageCount @> + } + +// separated test type to allow the tests to run in parallel +type T3(testOutputHelper) = + let log, broker = createLogger (TestOutputAdapter testOutputHelper), getTestBroker () + + let [] ``Commited offsets should not result in missing messages`` () = async { + let numMessages = 10 + let topic = newId() // dev kafka topics are created and truncated automatically + let groupId = newId() + let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId) + + let! _ = runProducers log broker topic 1 numMessages // populate the topic with a few messages + + // expected to read 10 messages from the first consumer + let messageCount = ref 0 + do! runConsumers log config 1 None + (fun c b -> async { + if Interlocked.Add(messageCount, b.Length) >= numMessages then + c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be committed) + + test <@ numMessages = !messageCount @> + + let! _ = runProducers log broker topic 1 numMessages // produce more messages + + // expected to read 10 messages from the subsequent consumer, + // this is to verify there are no off-by-one errors in how offsets are committed + let messageCount = ref 0 + do! runConsumers log config 1 None + (fun c b -> async { + if Interlocked.Add(messageCount, b.Length) >= numMessages then + c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be committed) + + test <@ numMessages = !messageCount @> + } + + let [] ``Consumers should never schedule two batches of the same partition concurrently`` () = async { + // writes 2000 messages down a topic with a shuffled partition key + // then attempts to consume the topic, while verifying that per-partition batches + // are never scheduled for concurrent execution. also checks that batches are + // monotonic w.r.t. offsets + let numMessages = 2000 + let maxBatchSize = 5 + let topic = newId() // dev kafka topics are created and truncated automatically + let groupId = newId() + let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId, maxBatchSize = maxBatchSize) + + // Produce messages in the topic + do! runProducers log broker topic 1 numMessages |> Async.Ignore + + let globalMessageCount = ref 0 + + let getPartitionOffset = + let state = new ConcurrentDictionary() + fun partition -> state.GetOrAdd(partition, fun _ -> ref -1L) + + let getBatchPartitionCount = + let state = new ConcurrentDictionary() + fun partition -> state.GetOrAdd(partition, fun _ -> ref 0) + + do! runConsumers log config 1 None + (fun c b -> async { + let partition = let p = b.[0].message.Partition in p.Value + + // check batch sizes are bounded by maxBatchSize + test <@ b.Length <= maxBatchSize @> // "batch sizes should never exceed maxBatchSize") + + // check per-partition handlers are serialized + let concurrentBatchCell = getBatchPartitionCount partition + let concurrentBatches = Interlocked.Increment concurrentBatchCell + test <@ 1 = concurrentBatches @> // "partitions should never schedule more than one batch concurrently") + + // check for message monotonicity + let offset = getPartitionOffset partition + for msg in b do + Assert.True((let o = msg.message.Offset in o.Value) > !offset, "offset for partition should be monotonic") + offset := let o = msg.message.Offset in o.Value + + do! Async.Sleep 100 + + let _ = Interlocked.Decrement concurrentBatchCell + + if Interlocked.Add(globalMessageCount, b.Length) >= numMessages then c.Stop() }) + + test <@ numMessages = !globalMessageCount @> + } \ No newline at end of file diff --git a/tools/Equinox.Tool/Equinox.Tool.fsproj b/tools/Equinox.Tool/Equinox.Tool.fsproj index a612d28f7..2817b2fd1 100644 --- a/tools/Equinox.Tool/Equinox.Tool.fsproj +++ b/tools/Equinox.Tool/Equinox.Tool.fsproj @@ -8,6 +8,7 @@ false true true + $(DefineConstants);NET461 Equinox.Tool eqx @@ -35,6 +36,9 @@ + + + diff --git a/tools/Equinox.Tool/Infrastructure/Infrastructure.fs b/tools/Equinox.Tool/Infrastructure/Infrastructure.fs index dbed6d114..6115010f0 100644 --- a/tools/Equinox.Tool/Infrastructure/Infrastructure.fs +++ b/tools/Equinox.Tool/Infrastructure/Infrastructure.fs @@ -30,13 +30,23 @@ type Exception with (System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture this).Throw () Unchecked.defaultof<_> +#nowarn "21" // re AwaitKeyboardInterrupt +#nowarn "40" // re AwaitKeyboardInterrupt type Async with + static member Sleep(t : TimeSpan) : Async = Async.Sleep(int t.TotalMilliseconds) /// /// Raises an exception using Async's continuation mechanism directly. /// /// Exception to be raised. static member Raise (exn : #exn) = Async.FromContinuations(fun (_,ec,_) -> ec exn) + /// Asynchronously awaits the next keyboard interrupt event + static member AwaitKeyboardInterrupt () : Async = + Async.FromContinuations(fun (sc,_,_) -> + let isDisposed = ref 0 + let rec callback _ = Tasks.Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore + and d : IDisposable = System.Console.CancelKeyPress.Subscribe callback + in ()) #if NET461 static member Choice<'T>(workflows : seq>) : Async<'T option> = async { try diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 620496996..93a3a5ec0 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -2,8 +2,14 @@ open Argu open Domain.Infrastructure +open Equinox.Cosmos.Projection +open Equinox.Projection.Codec +open Equinox.Projection.Kafka +open Equinox.Projection.Validation +open Equinox.Store.Infrastructure open Equinox.Tool.Infrastructure open FSharp.UMX +open Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing open Microsoft.Extensions.DependencyInjection open Samples.Infrastructure.Log open Samples.Infrastructure.Storage @@ -12,6 +18,8 @@ open Serilog.Events open System open System.Net.Http open System.Threading +open System.Collections.Generic +open System.Diagnostics [] type Arguments = @@ -21,8 +29,8 @@ type Arguments = | [] LogFile of string | [] Run of ParseResults | [] Init of ParseResults - | [] // this command is not useful unless you have access to the [presently closed-source] Equinox.Cosmos.Projector - [] InitAux of ParseResults + | [] InitAux of ParseResults + | [] Project of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Verbose -> "Include low level logging regarding specific test runs." @@ -31,7 +39,8 @@ type Arguments = | LogFile _ -> "specify a log file to write the result breakdown into (default: eqx.log)." | Run _ -> "Run a load test" | Init _ -> "Initialize store (presently only relevant for `cosmos`, where it creates database+collection+stored proc if not already present)." - | InitAux _ -> "Initialize auxilliary store (presently only relevant for `cosmos`, when you intend to run the [presently closed source] Projector)." + | InitAux _ -> "Initialize auxilliary store (presently only relevant for `cosmos`, when you intend to run the Projector)." + | Project _ -> "Project from store specified as the last argument, storing state in the specified `aux` Store (see initAux)." and []InitArguments = | [] Rus of int | [] SkipStoredProc @@ -50,6 +59,38 @@ and []InitAuxArguments = | Rus _ -> "Specify RU/s level to provision for the Application Collection." | Suffix _ -> "Specify Collection Name suffix (default: `-aux`)." | Cosmos _ -> "Cosmos Connection parameters." +and []ProjectArguments = + | [] LeaseId of string + | [] Suffix of string + | [] ForceStartFromHere + | [] ChangeFeedBatchSize of int + | [] LagFreqS of float + | [] Stats of ParseResults + | [] Kafka of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | LeaseId _ -> "Projector instance context name." + | Suffix _ -> "Specify Collection Name suffix (default: `-aux`)." + | ForceStartFromHere _ -> "(iff `suffix` represents a fresh projection) - force starting from present Position. Default: Ensure each and every event is projected from the start." + | ChangeFeedBatchSize _ -> "Maximum item count to supply to Changefeed Api when querying. Default: 1000" + | LagFreqS _ -> "Specify frequency to dump lag stats. Default: off" + + | Stats _ -> "Do not emit events, only stats." + | Kafka _ -> "Project to Kafka." +and [] KafkaTarget = + | [] Topic of string + | [] Broker of string + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | Topic _ -> "Specify target topic. Default: Use $env:EQUINOX_KAFKA_TOPIC" + | Broker _ -> "Specify target broker. Default: Use $env:EQUINOX_KAFKA_BROKER" + | Cosmos _ -> "Cosmos Connection parameters." +and [] StatsTarget = + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | Cosmos _ -> "Cosmos Connection parameters." and []WebArguments = | [] Endpoint of string interface IArgParserTemplate with @@ -240,10 +281,96 @@ let main argv = do! Equinox.Cosmos.Store.Sync.Initialization.createDatabaseIfNotExists conn.Client dbName do! Equinox.Cosmos.Store.Sync.Initialization.createAuxCollectionIfNotExists conn.Client (dbName,collName) rus } | _ -> failwith "please specify a `cosmos` endpoint" + | Project pargs -> + let envBackstop msg key = + match Environment.GetEnvironmentVariable key with + | null -> failwithf "Please provide a %s, either as an argment or via the %s environment variable" msg key + | x -> x + let broker, topic, storeArgs = + match pargs.GetSubCommand() with + | Kafka kargs -> + let broker = match kargs.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "EQUINOX_KAFKA_BROKER" + let topic = match kargs.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "EQUINOX_KAFKA_TOPIC" + Some broker, Some topic,kargs.GetResult KafkaTarget.Cosmos + | Stats sargs -> None, None, sargs.GetResult StatsTarget.Cosmos + | x -> failwithf "Invalid subcommand %A" x + let storeLog = createStoreLog (storeArgs.Contains CosmosArguments.VerboseStore) verboseConsole maybeSeq + let (endpointUri, masterKey), dbName, collName, connectionPolicy = Cosmos.connectionPolicy (log,storeLog) storeArgs + pargs.TryGetResult ChangeFeedBatchSize |> Option.iter (fun bs -> log.Information("ChangeFeed BatchSize {batchSize}", bs)) + pargs.TryGetResult LagFreqS |> Option.iter (fun s -> log.Information("Dumping lag stats at {lagS:n0}s intervals", s)) + let auxCollName = collName + pargs.GetResult(ProjectArguments.Suffix,"-aux") + let leaseId = pargs.GetResult(LeaseId) + log.Information("Processing using LeaseId {leaseId} in Aux coll {auxCollName}", leaseId, auxCollName) + if pargs.Contains ForceStartFromHere then log.Warning("(If new projection prefix) Skipping projection of all existing events.") + let source = { database = dbName; collection = collName } + let aux = { database = dbName; collection = auxCollName } + let validator = FeedValidator() + + let buildRangeProjector () = + let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch + let producer, disposeProducer = + match broker,topic with + | Some b,Some t -> + let cfg = KafkaProducerConfig.Create("equinox-tool", Uri b, Confluent.Kafka.Acks.Leader, LZ4) + let p = KafkaProducer.Create(log, cfg, t) + Some p, (p :> IDisposable).Dispose + | _ -> None, id + let projectBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async { + sw.Stop() // Stop the clock after CFP hands off to us + let validator = BatchValidator(validator) + let toKafkaEvent (e: DocumentParser.IEvent) : Equinox.Projection.Codec.RenderedEvent = + { s = e.Stream; i = e.Index; c = e.EventType; t = e.TimeStamp; d = e.Data; m = e.Meta } + let validate (e: DocumentParser.IEvent) = + match validator.TryIngest(e.Stream, int e.Index) with + | Gap -> None // We cannot emit if we have evidence that this will leave a gap + | Duplicate // Just because this is a re-delivery does not mean we can drop it - the state does not get reset if we retraverse a batch + | Ok | New -> toKafkaEvent e |> Some + let pt, events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.choose validate |> Array.ofSeq) |> Stopwatch.Time + let! et = async { + match producer with + | None -> + let! et,() = ctx.CheckpointAsync() |> Async.AwaitTaskCorrect |> Stopwatch.Time + return et + | Some producer -> + let es = [| for e in events -> e.s, Newtonsoft.Json.JsonConvert.SerializeObject e |] + let! et,_ = producer.ProduceBatch es |> Stopwatch.Time + return et } + + if log.IsEnabled LogEventLevel.Debug then log.Debug("Response Headers {0}", let hs = ctx.FeedResponse.ResponseHeaders in [for h in hs -> h, hs.[h]]) + let r, s = ctx.FeedResponse, validator.Stats + log.Information("{range} Fetch: {token} {requestCharge:n0}RU {count} docs {l:n1}s; Parse: c {cats} s {streams} e {events} {p:n3}s; Emit: {e:n1}s", + ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], r.RequestCharge, docs.Count, float sw.ElapsedMilliseconds / 1000., + s.categories, s.TotalStreams, events.Length, (let e = pt.Elapsed in e.TotalSeconds), (let e = et.Elapsed in e.TotalSeconds)) + if s.gap <> 0 then + log.Error("Feed inconsistency: gaps led to dropping of {gap} events in batch", s.gap) + for KeyValue(stream,stats) in validator.Enum() do + log.Error("Gap in Stream {stream}: dropped {gap} events ({fresh} consistent {ok} ok {dups} dups)", stream, stats.gap, stats.fresh, stats.ok, stats.dup) + elif s.dup <> 0 then + let streamsWithDupsCount = validator.Enum() |> Seq.filter (function KeyValue(_,{dup = d }) -> d <> 0) |> Seq.length + log.Information("Feed at least once delivery: {dups} duplicates encountered across {streams} affected streams", s.dup, streamsWithDupsCount) + sw.Restart() // restart the clock as we handoff back to the CFP + } + ChangeFeedObserver.Create(log, projectBatch, disposeProducer) + + let run = async { + let logLag (interval : TimeSpan) remainingWork = async { + let logLevel = if remainingWork |> Seq.exists (fun (_r,rw) -> rw <> 0L) then Events.LogEventLevel.Information else Events.LogEventLevel.Debug + log.Write(logLevel, "Lags {@rangeLags} <- [Range Id, documents count] ", remainingWork) + return! Async.Sleep interval } + let maybeLogLag = pargs.TryGetResult LagFreqS |> Option.map (TimeSpan.FromSeconds >> logLag) + let! _cfp = + ChangeFeedProcessor.Start + ( log, endpointUri, masterKey, connectionPolicy, source, aux, buildRangeProjector, + leasePrefix = leaseId, + forceSkipExistingEvents = pargs.Contains ForceStartFromHere, + ?cfBatchSize = pargs.TryGetResult ChangeFeedBatchSize, + ?reportLagAndAwaitNextEstimation = maybeLogLag) + do! Async.AwaitKeyboardInterrupt() } + Async.RunSynchronously run | Run rargs -> let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs - | _ -> failwith "Please specify a valid subcommand :- init, initAux or run" + | _ -> failwith "Please specify a valid subcommand :- init, initAux, project or run" 0 with e -> printfn "%s" e.Message