Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Projection library and eqx project tool-command #87

Merged
merged 31 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9e51872
Projector initial work
bartelink Jan 16, 2019
36eb52c
Add Event parsing
JingleCEF Jan 17, 2019
d676761
Add event parsing to project command
bartelink Jan 17, 2019
ec4bdbd
Base Kaka wrapper impl
eiriktsarpalis Jan 20, 2019
1aa273d
Apply house style to Kafka infrastructure
bartelink Jan 21, 2019
ed03c6e
Tidy Consumer config
bartelink Jan 21, 2019
ad18876
Align Producer API with consumer
bartelink Jan 21, 2019
a7e5f50
Fix tests
bartelink Jan 21, 2019
e946ed3
Wire up Kafka projection
bartelink Jan 23, 2019
270fa0d
Extract Equinox.Projection.Codec
bartelink Jan 24, 2019
0156eb2
Tidy Kafka wrapper
bartelink Jan 24, 2019
dae9e9f
Flatten/rename RenderedEvent; tidying
bartelink Jan 28, 2019
222527d
Port ConsumerConfig to CK
bartelink Jan 30, 2019
9cc2cb6
use CK ProducerConfig
bartelink Jan 30, 2019
e2cc698
added logging, changed some cancellation logic
michaelliao5 Feb 4, 2019
154e0dd
all tests for kafka pass now
michaelliao5 Feb 5, 2019
26a0028
removing redundant lines
michaelliao5 Feb 5, 2019
2a041ad
minor clean up
michaelliao5 Feb 5, 2019
2485fc3
reinstated the assign/unassign events
michaelliao5 Feb 6, 2019
025b58b
Cleanup Kafka wrapping
bartelink Feb 6, 2019
24fc55a
Inline RunPoll
bartelink Feb 6, 2019
671e878
Tidy CFP wrapper; fix lag monitoring
bartelink Feb 7, 2019
dc90063
Initial impl
bartelink Feb 4, 2019
3f033a9
Add FeedValidator
bartelink Feb 5, 2019
9b307af
Polish logging
bartelink Feb 7, 2019
a3eb21b
Clarify lag message
bartelink Feb 7, 2019
5cb7cb3
Support net461 projection; minor review edits
bartelink Feb 7, 2019
1191f42
Initial port to CK 1.0.0-beta3
bartelink Feb 8, 2019
d45d9e8
Rejig event handling to work with CK 1.0.0-beta3
bartelink Feb 8, 2019
3072fc3
Tidy Producer defaults
bartelink Feb 8, 2019
12ae3ea
Fix Validator test edge case
bartelink Feb 8, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Tidy Kafka wrapper
  • Loading branch information
bartelink committed Feb 8, 2019
commit 0156eb20696930e8fa895954f1856e7dfedcc6e8
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Kafka.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
522 changes: 2 additions & 520 deletions src/Equinox.Projection.Kafka/Infrastructure.fs

Large diffs are not rendered by default.

523 changes: 523 additions & 0 deletions src/Equinox.Projection.Kafka/Kafka.fs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="Tests.fs" />
<Compile Include="KafkaIntegration.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Equinox.Projection.Kafka.Infrastructure.Tests
namespace Equinox.Projection.Kafka.KafkaIntegration

open Equinox.Projection.Kafka.Infrastructure
open Equinox.Projection.Kafka
open Newtonsoft.Json
open Serilog
open Swensen.Unquote
Expand Down
15 changes: 14 additions & 1 deletion tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ open Domain.Infrastructure
#if !NET461
open Equinox.Cosmos.Projection
open Equinox.Projection.Codec
open Equinox.Projection.Kafka.Infrastructure
open Equinox.Projection.Kafka
#endif
open Equinox.Store.Infrastructure
open Equinox.Tool.Infrastructure
Expand Down Expand Up @@ -340,9 +340,22 @@ let main argv =
let es = [| for e in events -> e.h.s, Newtonsoft.Json.JsonConvert.SerializeObject e |]
let! et,_ = producer.ProduceBatch es |> Stopwatch.Time
return et }

log.Information("Range {rangeId} Fetch: {requestCharge:n0}RU {count} docs {l:n1}s; Parse: {events} events {p:n3}s; Emit: {e:n1}s",
ctx.PartitionKeyRangeId, ctx.FeedResponse.RequestCharge, docs.Count,float sw.ElapsedMilliseconds / 1000.,
events.Length, (let e = pt.Elapsed in e.TotalSeconds), (let e = et.Elapsed in e.TotalSeconds))
if log.IsEnabled LogEventLevel.Debug then
let f = ctx.FeedResponse
let rendered =
[ "cq", box f.CollectionQuota; "csq",box f.CollectionSizeQuota; "cu",box f.CollectionUsage
"csu",box f.CollectionSizeUsage
"dq",box f.DatabaseQuota; "du",box f.DatabaseUsage
"mrq",box f.MaxResourceQuota; "crqu", box f.CurrentResourceQuotaUsage
"pq",box f.PermissionQuota; "pu",box f.PermissionUsage
"_con",box f.ResponseContinuation
"_st",box f.SessionToken ]
let rhs = let hs = ctx.FeedResponse.ResponseHeaders in [for h in hs -> h, hs.[h]]
log.Debug("FeedResponse {0}, Headers {1}", rendered, rhs)
sw.Restart() // restart the clock as we handoff back to the CFP
}
IChangeFeedObserver.Create(log, projectBatch, disposeProducer)
Expand Down