Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add eqx dump cosmos #177

Merged
merged 12 commits into from
Nov 8, 2019
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Cosmos`: Exposed a `Connector.CreateClient` for interop with V2 ChangeFeedProcessor and `Propulsion.Cosmos` [#171](https://github.com/jet/equinox/pull/171)
- `Cosmos`: Added `eqx stats` command to count streams/docs/events in a CosmosDb Container re [#127](https://github.com/jet/equinox/issues/127) [#176](https://github.com/jet/equinox/pull/176)
- `MemoryStore`: Supports custom Codec logic (can use `FsCodec.Box.Codec` as default) [#173](https://github.com/jet/equinox/pull/173)
- `eqx dump [store]`: Show event data from store [#177](https://github.com/jet/equinox/pull/177)

### Changed

Expand All @@ -27,6 +28,8 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Fixed

- `Cosmos`: fixed accidentally swapped `count` and `bytes` metrics field values

<a name="2.0.0"></a>
<a name="2.0.0-rc7"></a>
## [2.0.0-rc7] - 2019-10-17
Expand Down
22 changes: 10 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour

7. Use `propulsion` tool to Run a CosmosDb ChangeFeedProcessor, emitting to a Kafka topic

```powershell
```powershell
$env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b
# `-v` for verbose logging
# `-v` for verbose logging
# `projector3` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently
# `-l 5` to report ChangeFeed lags every 5 minutes
# `kafka` specifies one wants to emit to Kafka
Expand Down Expand Up @@ -310,34 +310,32 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour

<a name="sqlstreamstore"></a>
9. Use [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore)

The SqlStreamStore consists of:

- being able to supply `ms`, `my`, `pg` flag to `eqx run`, e.g. `eqx run -t cart -f 50 -d 5 -C -U ms -c "sqlserverconnectionstring" -s schema`
- being able to supply `ms`, `my`, `pg` flag to `eqx dump`, e.g. `eqx dump -JC -S "Favoritesab25cc9f24464d39939000aeb37ea11a" ms -c "sqlserverconnectionstring" -s schema`
- being able to supply `ms`, `my`, `pg` flag to Web sample, e.g. `dotnet run -p samples/Web/ -- my -c "mysqlconnectionstring"`
- being able to supply `ms`, `my`, `pg` flag to new `eqx config` command e.g. `eqx config pg -c "postgresconnectionstring" -u p "usercredentialsNotToBeLogged" -s schema`

```powershell
cd ~/code/equinox

# set up the DB/schema
& dotnet run -f netcoreapp2.1 -p tools/Equinox.Tool -- config pg -c "connectionstring" -p "u=un;p=password" -s "schema"
dotnet run -f netcoreapp2.1 -p tools/Equinox.Tool -- config pg -c "connectionstring" -p "u=un;p=password" -s "schema"

# run a benchmark
& dotnet run -c Release -f netcoreapp2.1 -p tools/Equinox.Tool -- run -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema"
dotnet run -c Release -f netcoreapp2.1 -p tools/Equinox.Tool -- run -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema"

# run the webserver, -A to autocreate schema on connection
& dotnet run -p samples/Web/ -- my -c "mysqlconnectionstring" -A

#############################
# TODO - NOTE NOT YET RELEASED
##############################
dotnet run -p samples/Web/ -- my -c "mysqlconnectionstring" -A

# set up the DB/schema
& eqx config pg -c "connectionstring" -p "u=un;p=password" -s "schema"
eqx config pg -c "connectionstring" -p "u=un;p=password" -s "schema"

# run a benchmark
& eqx run -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema"
eqx run -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema"
eqx dump -J -S "SavedForLater-ab25cc9f24464d39939000aeb37ea11a" pg # show stored JSON (Guid shown in eqx run output)
```

### BENCHMARKS
Expand Down
15 changes: 11 additions & 4 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ module internal Tip =
let private loggedGet (get : Container * string -> Position option -> Async<_>) (container,stream) (maybePos: Position option) (log: ILogger) = async {
let log = log |> Log.prop "stream" stream
let! t, (ru, res : ReadResult<Tip>) = get (container,stream) maybePos |> Stopwatch.Time
let log count bytes (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream; interval = t; bytes = bytes; count = count; ru = ru })
let log bytes count (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream; interval = t; bytes = bytes; count = count; ru = ru })
match res with
| ReadResult.NotModified ->
(log 0 0 Log.TipNotModified).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru)
Expand Down Expand Up @@ -809,9 +809,16 @@ type BatchingPolicy

type Gateway(conn : Connection, batching : BatchingPolicy) =
let (|FromUnfold|_|) (tryDecode: #IEventData<_> -> 'event option) (isOrigin: 'event -> bool) (xs:#IEventData<_>[]) : Option<'event[]> =
match Array.tryFindIndexBack (tryDecode >> Option.exists isOrigin) xs with
let items = ResizeArray()
let isOrigin' e =
match tryDecode e with
| None -> false
| Some e ->
items.Insert(0,e)
isOrigin e
match Array.tryFindIndexBack isOrigin' xs with
| None -> None
| Some index -> xs |> Seq.skip index |> Seq.choose tryDecode |> Array.ofSeq |> Some
| Some _ -> items.ToArray() |> Some
member __.Client = conn.Client
member __.LoadBackwardsStopping log (container, stream) (tryDecode,isOrigin): Async<StreamToken * 'event[]> = async {
let! pos, events = Query.walk log (container,stream) conn.QueryRetryPolicy batching.MaxItems batching.MaxRequests Direction.Backward None (tryDecode,isOrigin)
Expand Down Expand Up @@ -1014,7 +1021,7 @@ type Resolver<'event, 'state, 'context>(context : Context, codec, fold, initial,
| Some (AccessStrategy.Unfolded (isOrigin, unfold)) -> isOrigin, Choice2Of3 (fun _ state -> unfold state)
| Some (AccessStrategy.Snapshot (isValid,generate)) -> isValid, Choice2Of3 (fun _ state -> generate state |> Seq.singleton)
| Some (AccessStrategy.AnyKnownEventType) -> (fun _ -> true), Choice2Of3 (fun events _ -> Seq.last events |> Seq.singleton)
| Some (AccessStrategy.RollingUnfolds (isOrigin,transmute)) -> isOrigin, Choice3Of3 transmute
| Some (AccessStrategy.RollingUnfolds (isOrigin,transmute)) -> isOrigin, Choice3Of3 transmute
let cosmosCat = Category<'event, 'state, 'context>(context.Gateway, codec)
let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, ?readCache = readCacheOption)
let category : ICategory<_, _, Container*string, 'context> =
Expand Down
Loading