Skip to content

Commit

Permalink
Add eqx dump cosmos (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Nov 8, 2019
1 parent b961123 commit 4473adf
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 61 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Cosmos`: Exposed a `Connector.CreateClient` for interop with V2 ChangeFeedProcessor and `Propulsion.Cosmos` [#171](https://github.com/jet/equinox/pull/171)
- `Cosmos`: Added `eqx stats` command to count streams/docs/events in a CosmosDb Container re [#127](https://github.com/jet/equinox/issues/127) [#176](https://github.com/jet/equinox/pull/176)
- `MemoryStore`: Supports custom Codec logic (can use `FsCodec.Box.Codec` as default) [#173](https://github.com/jet/equinox/pull/173)
- `eqx dump [store]`: Show event data from store [#177](https://github.com/jet/equinox/pull/177)

### Changed

Expand All @@ -27,6 +28,8 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Fixed

- `Cosmos`: fixed accidentally swapped `count` and `bytes` metrics field values

<a name="2.0.0"></a>
<a name="2.0.0-rc7"></a>
## [2.0.0-rc7] - 2019-10-17
Expand Down
22 changes: 10 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour
7. Use `propulsion` tool to Run a CosmosDb ChangeFeedProcessor, emitting to a Kafka topic
```powershell
```powershell
$env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b
# `-v` for verbose logging
# `-v` for verbose logging
# `projector3` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently
# `-l 5` to report ChangeFeed lags every 5 minutes
# `kafka` specifies one wants to emit to Kafka
Expand Down Expand Up @@ -310,34 +310,32 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour
<a name="sqlstreamstore"></a>
9. Use [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore)
The SqlStreamStore consists of:
- being able to supply `ms`, `my`, `pg` flag to `eqx run`, e.g. `eqx run -t cart -f 50 -d 5 -C -U ms -c "sqlserverconnectionstring" -s schema`
- being able to supply `ms`, `my`, `pg` flag to `eqx dump`, e.g. `eqx dump -JC -S "Favoritesab25cc9f24464d39939000aeb37ea11a" ms -c "sqlserverconnectionstring" -s schema`
- being able to supply `ms`, `my`, `pg` flag to Web sample, e.g. `dotnet run -p samples/Web/ -- my -c "mysqlconnectionstring"`
- being able to supply `ms`, `my`, `pg` flag to new `eqx config` command e.g. `eqx config pg -c "postgresconnectionstring" -u p "usercredentialsNotToBeLogged" -s schema`
```powershell
cd ~/code/equinox
# set up the DB/schema
& dotnet run -f netcoreapp2.1 -p tools/Equinox.Tool -- config pg -c "connectionstring" -p "u=un;p=password" -s "schema"
dotnet run -f netcoreapp2.1 -p tools/Equinox.Tool -- config pg -c "connectionstring" -p "u=un;p=password" -s "schema"
# run a benchmark
& dotnet run -c Release -f netcoreapp2.1 -p tools/Equinox.Tool -- run -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema"
dotnet run -c Release -f netcoreapp2.1 -p tools/Equinox.Tool -- run -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema"
# run the webserver, -A to autocreate schema on connection
& dotnet run -p samples/Web/ -- my -c "mysqlconnectionstring" -A
#############################
# TODO - NOTE NOT YET RELEASED
##############################
dotnet run -p samples/Web/ -- my -c "mysqlconnectionstring" -A
# set up the DB/schema
& eqx config pg -c "connectionstring" -p "u=un;p=password" -s "schema"
eqx config pg -c "connectionstring" -p "u=un;p=password" -s "schema"
# run a benchmark
& eqx run -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema"
eqx run -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema"
eqx dump -J -S "SavedForLater-ab25cc9f24464d39939000aeb37ea11a" pg # show stored JSON (Guid shown in eqx run output)
```
### BENCHMARKS
Expand Down
15 changes: 11 additions & 4 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ module internal Tip =
let private loggedGet (get : Container * string -> Position option -> Async<_>) (container,stream) (maybePos: Position option) (log: ILogger) = async {
let log = log |> Log.prop "stream" stream
let! t, (ru, res : ReadResult<Tip>) = get (container,stream) maybePos |> Stopwatch.Time
let log count bytes (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream; interval = t; bytes = bytes; count = count; ru = ru })
let log bytes count (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream; interval = t; bytes = bytes; count = count; ru = ru })
match res with
| ReadResult.NotModified ->
(log 0 0 Log.TipNotModified).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru)
Expand Down Expand Up @@ -809,9 +809,16 @@ type BatchingPolicy

type Gateway(conn : Connection, batching : BatchingPolicy) =
let (|FromUnfold|_|) (tryDecode: #IEventData<_> -> 'event option) (isOrigin: 'event -> bool) (xs:#IEventData<_>[]) : Option<'event[]> =
match Array.tryFindIndexBack (tryDecode >> Option.exists isOrigin) xs with
let items = ResizeArray()
let isOrigin' e =
match tryDecode e with
| None -> false
| Some e ->
items.Insert(0,e)
isOrigin e
match Array.tryFindIndexBack isOrigin' xs with
| None -> None
| Some index -> xs |> Seq.skip index |> Seq.choose tryDecode |> Array.ofSeq |> Some
| Some _ -> items.ToArray() |> Some
member __.Client = conn.Client
member __.LoadBackwardsStopping log (container, stream) (tryDecode,isOrigin): Async<StreamToken * 'event[]> = async {
let! pos, events = Query.walk log (container,stream) conn.QueryRetryPolicy batching.MaxItems batching.MaxRequests Direction.Backward None (tryDecode,isOrigin)
Expand Down Expand Up @@ -1014,7 +1021,7 @@ type Resolver<'event, 'state, 'context>(context : Context, codec, fold, initial,
| Some (AccessStrategy.Unfolded (isOrigin, unfold)) -> isOrigin, Choice2Of3 (fun _ state -> unfold state)
| Some (AccessStrategy.Snapshot (isValid,generate)) -> isValid, Choice2Of3 (fun _ state -> generate state |> Seq.singleton)
| Some (AccessStrategy.AnyKnownEventType) -> (fun _ -> true), Choice2Of3 (fun events _ -> Seq.last events |> Seq.singleton)
| Some (AccessStrategy.RollingUnfolds (isOrigin,transmute)) -> isOrigin, Choice3Of3 transmute
| Some (AccessStrategy.RollingUnfolds (isOrigin,transmute)) -> isOrigin, Choice3Of3 transmute
let cosmosCat = Category<'event, 'state, 'context>(context.Gateway, codec)
let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, ?readCache = readCacheOption)
let category : ICategory<_, _, Container*string, 'context> =
Expand Down
Loading

0 comments on commit 4473adf

Please sign in to comment.