Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Projection library and eqx project tool-command #87

Merged
merged 31 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9e51872
Projector initial work
bartelink Jan 16, 2019
36eb52c
Add Event parsing
JingleCEF Jan 17, 2019
d676761
Add event parsing to project command
bartelink Jan 17, 2019
ec4bdbd
Base Kaka wrapper impl
eiriktsarpalis Jan 20, 2019
1aa273d
Apply house style to Kafka infrastructure
bartelink Jan 21, 2019
ed03c6e
Tidy Consumer config
bartelink Jan 21, 2019
ad18876
Align Producer API with consumer
bartelink Jan 21, 2019
a7e5f50
Fix tests
bartelink Jan 21, 2019
e946ed3
Wire up Kafka projection
bartelink Jan 23, 2019
270fa0d
Extract Equinox.Projection.Codec
bartelink Jan 24, 2019
0156eb2
Tidy Kafka wrapper
bartelink Jan 24, 2019
dae9e9f
Flatten/rename RenderedEvent; tidying
bartelink Jan 28, 2019
222527d
Port ConsumerConfig to CK
bartelink Jan 30, 2019
9cc2cb6
use CK ProducerConfig
bartelink Jan 30, 2019
e2cc698
added logging, changed some cancellation logic
michaelliao5 Feb 4, 2019
154e0dd
all tests for kafka pass now
michaelliao5 Feb 5, 2019
26a0028
removing redundant lines
michaelliao5 Feb 5, 2019
2a041ad
minor clean up
michaelliao5 Feb 5, 2019
2485fc3
reinstated the assign/unassign events
michaelliao5 Feb 6, 2019
025b58b
Cleanup Kafka wrapping
bartelink Feb 6, 2019
24fc55a
Inline RunPoll
bartelink Feb 6, 2019
671e878
Tidy CFP wrapper; fix lag monitoring
bartelink Feb 7, 2019
dc90063
Initial impl
bartelink Feb 4, 2019
3f033a9
Add FeedValidator
bartelink Feb 5, 2019
9b307af
Polish logging
bartelink Feb 7, 2019
a3eb21b
Clarify lag message
bartelink Feb 7, 2019
5cb7cb3
Support net461 projection; minor review edits
bartelink Feb 7, 2019
1191f42
Initial port to CK 1.0.0-beta3
bartelink Feb 8, 2019
d45d9e8
Rejig event handling to work with CK 1.0.0-beta3
bartelink Feb 8, 2019
3072fc3
Tidy Producer defaults
bartelink Feb 8, 2019
12ae3ea
Fix Validator test edge case
bartelink Feb 8, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleanup Kafka wrapping
  • Loading branch information
bartelink committed Feb 8, 2019
commit 025b58b59cccb4d1c896de1637f3bc319f560e38
7 changes: 1 addition & 6 deletions src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />

<PackageReference Include="Confluent.Kafka" Version="1.0.0-beta2" />
<!-- TODO remove special casing
While the general policy is to use 11.0.2 across the board for consistency (11.0.2 being the first version that cleanly supports netstandard2.0),
in this case we're relaxing the constraint for net461 so as to not trigger a set of transitive dependency updates at the present time
should also be able to remove [<Trait("KnownFailOn","Mono")>] in JsonConverterTests when this special casing goes away -->
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="Serilog" Version="2.7.1" />
</ItemGroup>

Expand Down
50 changes: 24 additions & 26 deletions src/Equinox.Projection.Kafka/Kafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ module Config =

// NB we deliberately wrap all types exposed by Confluent.Kafka to avoid needing to reference librdkafka everywhere
[<NoComparison>]
type KafkaDeliveredMessage internal (message : DeliveryReportResult<string,string>) =
type DeliveryReport internal (message : DeliveryReportResult<string,string>) =
member internal __.UnderlyingMessage = message
static member TryWrap (msg : DeliveryReportResult<string,string>) =
if msg.Error.IsError then
let errorMsg = sprintf "Error on message topic=%s code=%O reason=%s" msg.Topic msg.Error.Code msg.Error.Reason
Choice2Of2 (Exception errorMsg)
else
Choice1Of2 (KafkaDeliveredMessage msg)
static member WrapOrThrow msg = match KafkaDeliveredMessage.TryWrap msg with Choice2Of2 e -> raise e | Choice1Of2 m -> m
Choice1Of2 (DeliveryReport msg)
static member WrapOrThrow msg = match DeliveryReport.TryWrap msg with Choice2Of2 e -> raise e | Choice1Of2 m -> m
member __.Topic = message.Topic
member __.Partition : int = let p = message.Partition in p.Value
member __.Offset : int64 = let o = message.Offset in o.Value
Expand All @@ -42,7 +42,7 @@ type KafkaDeliveredMessage internal (message : DeliveryReportResult<string,strin

// NB we deliberately wrap all types exposed by Confluent.Kafka to avoid needing to reference librdkafka everywhere
[<NoComparison>]
type KafkaConsumerMessage internal (message : ConsumeResult<string,string>) =
type KafkaMessage internal (message : ConsumeResult<string,string>) =
member internal __.UnderlyingMessage = message
member __.Topic = message.Topic
member __.Partition : int = let p = message.Partition in p.Value
Expand Down Expand Up @@ -124,15 +124,15 @@ type KafkaProducer private (log: ILogger, producer : Producer<string, string>, t

let! ct = Async.CancellationToken

let tcs = new TaskCompletionSource<KafkaDeliveredMessage[]>()
let tcs = new TaskCompletionSource<DeliveryReport[]>()
let numMessages = keyValueBatch.Length
let results = Array.zeroCreate<KafkaDeliveredMessage> numMessages
let results = Array.zeroCreate<DeliveryReport> numMessages
let numCompleted = ref 0

use _ = ct.Register(fun _ -> tcs.TrySetCanceled() |> ignore)

let handler (m : DeliveryReportResult<string,string>) =
match KafkaDeliveredMessage.TryWrap m with
match DeliveryReport.TryWrap m with
| Choice2Of2 e ->
tcs.TrySetException e |> ignore
| Choice1Of2 m ->
Expand Down Expand Up @@ -161,7 +161,7 @@ type KafkaProducer private (log: ILogger, producer : Producer<string, string>, t
let inline len (x:string) = match x with null -> 0 | x -> sizeof<char> * x.Length
16 + len message.Key + len message.Value |> int64

let getBatchOffset (batch : KafkaConsumerMessage[]) : TopicPartitionOffset =
let getBatchOffset (batch : KafkaMessage[]) : TopicPartitionOffset =
let maxOffset = batch |> Array.maxBy (fun m -> m.Offset)
maxOffset.UnderlyingMessage.TopicPartitionOffset

Expand Down Expand Up @@ -231,17 +231,15 @@ type KafkaProducer private (log: ILogger, producer : Producer<string, string>, t
member c.StoreOffset(tpo : TopicPartitionOffset) : unit =
c.StoreOffsets[| TopicPartitionOffset(tpo.Topic, tpo.Partition, Offset(let o = tpo.Offset in o.Value + 1L)) |]

member c.RunPoll(log : ILogger, counter : InFlightMessageCounter, cancellationToken: CancellationToken , onMessage: ConsumeResult<'Key,'Value> -> unit) =
while not cancellationToken.IsCancellationRequested do
counter.AwaitThreshold()
try let message = c.Consume(cancellationToken) // NB don't use TimeSpan overload unless you want GPFs on1.0.0-beta2
if message <> null then
onMessage message
with
| :? ConsumeException as e -> log.Warning(e, "Consume exception")
| :? System.OperationCanceledException -> log.Warning("Consumer:{name} | Consuming cancelled", c.Name)

c.Close()
member internal c.RunPoll(log : ILogger, counter : InFlightMessageCounter, cancellationToken: CancellationToken , onMessage: ConsumeResult<'Key,'Value> -> unit) =
try while not cancellationToken.IsCancellationRequested do
counter.AwaitThreshold()
try let message = c.Consume(cancellationToken) // NB don't use TimeSpan overload unless you want AVEs on1.0.0-beta2
if message <> null then
onMessage message
with| :? ConsumeException as e -> log.Warning(e, "Consume exception")
| :? System.OperationCanceledException -> log.Warning("Consumer:{name} | Consuming cancelled", c.Name)
finally c.Close()

member c.WithLogging(log: ILogger) =
let d1 = c.OnLog.Subscribe(fun m ->
Expand Down Expand Up @@ -277,7 +275,7 @@ type KafkaProducer private (log: ILogger, producer : Producer<string, string>, t
consumer

let mkBatchedMessageConsumer (log: ILogger) (minInFlightBytes, maxInFlightBytes, maxBatchSize, maxBatchDelay)
(ct : CancellationToken) (consumer : Consumer<string, string>) (handler : KafkaConsumerMessage[] -> Async<unit>) = async {
(ct : CancellationToken) (consumer : Consumer<string, string>) (handler : KafkaMessage[] -> Async<unit>) = async {
let tcs = new TaskCompletionSource<unit>()
use cts = CancellationTokenSource.CreateLinkedTokenSource(ct)
use _ = ct.Register(fun _ -> tcs.TrySetResult () |> ignore)
Expand All @@ -294,7 +292,7 @@ type KafkaProducer private (log: ILogger, producer : Producer<string, string>, t
let nextBatch () =
let count = collection.FillBuffer(buffer, maxBatchDelay)
if count <> 0 then log.Information("Consuming {count}", count)
let batch = Array.init count (fun i -> KafkaConsumerMessage(buffer.[i]))
let batch = Array.init count (fun i -> KafkaMessage(buffer.[i]))
Array.Clear(buffer, 0, count)
batch

Expand Down Expand Up @@ -395,16 +393,16 @@ type KafkaConsumer private (log : ILogger, consumer : Consumer<string, string>,
member __.Status = task.Status
/// Asynchronously awaits until consumer stops or is faulted
member __.AwaitConsumer() = Async.AwaitTaskCorrect task
member c.Stop() =
member __.Stop() =
log.Information("consumer:{name} stopping", consumer.Name)
cts.Cancel();

//interface IDisposable with member __.Dispose() = __.Stop()
interface IDisposable with member __.Dispose() = __.Stop()

/// Starts a kafka consumer with provider configuration and batch message handler.
/// Batches are grouped by topic partition. Batches belonging to the same topic partition will be scheduled sequentially and monotonically,
/// however batches from different partitions can be run concurrently.
static member Start (log : ILogger) (config : KafkaConsumerConfig) (partitionHandler : KafkaConsumerMessage[] -> Async<unit>) =
static member Start (log : ILogger) (config : KafkaConsumerConfig) (partitionHandler : KafkaMessage[] -> Async<unit>) =
if List.isEmpty config.topics then invalidArg "config" "must specify at least one topic"
log.Information("Starting Kafka consumer on broker={broker} topics={topics} groupId={groupId} " (*autoOffsetReset={autoOffsetReset}*) + "fetchMaxBytes={fetchMaxB} maxInFlightBytes={maxInFlightB} maxBatchSize={maxBatchB} maxBatchDelay={maxBatchDelay}",
config.conf.BootstrapServers, config.topics, config.conf.GroupId, (*config.conf.AutoOffsetReset.Value,*) config.conf.FetchMaxBytes,
Expand All @@ -418,9 +416,9 @@ type KafkaConsumer private (log : ILogger, consumer : Consumer<string, string>,

/// Starts a Kafka consumer instance that schedules handlers grouped by message key. Additionally accepts a global degreeOfParallelism parameter
/// that controls the number of handlers running concurrently across partitions for the given consumer instance.
static member StartByKey (log: ILogger) (config : KafkaConsumerConfig) (degreeOfParallelism : int) (keyHandler : KafkaConsumerMessage [] -> Async<unit>) =
static member StartByKey (log: ILogger) (config : KafkaConsumerConfig) (degreeOfParallelism : int) (keyHandler : KafkaMessage [] -> Async<unit>) =
let semaphore = new SemaphoreSlim(degreeOfParallelism)
let partitionHandler (messages : KafkaConsumerMessage[]) = async {
let partitionHandler (messages : KafkaMessage[]) = async {
return!
messages
|> Seq.groupBy (fun m -> m.Key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ module Helpers =

type TestMessage = { producerId : int ; messageId : int }
[<NoComparison; NoEquality>]
type ConsumedTestMessage = { consumerId : int ; message : KafkaConsumerMessage ; payload : TestMessage }
type ConsumedTestMessage = { consumerId : int ; message : KafkaMessage ; payload : TestMessage }
type ConsumerCallback = KafkaConsumer -> ConsumedTestMessage [] -> Async<unit>

let runProducers log (broker : Uri) (topic : string) (numProducers : int) (messagesPerProducer : int) = async {
Expand All @@ -84,7 +84,7 @@ module Helpers =

let runConsumers log (config : KafkaConsumerConfig) (numConsumers : int) (timeout : TimeSpan option) (handler : ConsumerCallback) = async {
let mkConsumer (consumerId : int) = async {
let deserialize (msg : KafkaConsumerMessage) = { consumerId = consumerId ; message = msg ; payload = JsonConvert.DeserializeObject<_> msg.Value }
let deserialize (msg : KafkaMessage) = { consumerId = consumerId ; message = msg ; payload = JsonConvert.DeserializeObject<_> msg.Value }

// need to pass the consumer instance to the handler callback
// do a bit of cyclic dependency fixups
Expand Down Expand Up @@ -137,7 +137,7 @@ type T1(testOutputHelper) =
let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId)
let consumers = runConsumers log config numConsumers None consumerCallback

do! [ producers; consumers ]
do! [ producers ; consumers ]
|> Async.Parallel
|> Async.Ignore

Expand Down