From 6ed1bf1a03aba7b87c566a42b88e650c315a826d Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 29 Sep 2020 15:55:28 +0100 Subject: [PATCH 1/5] Extend sync log (#241) --- CHANGELOG.md | 3 +++ src/Equinox.Cosmos/Cosmos.fs | 49 ++++++++++++++++-------------------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f90a04cbb..f83929669 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added ### Changed + +- `Cosmos`: Reorganize Sync log message text, merge with Sync Conflict message [#241](https://github.com/jet/equinox/pull/241) + ### Removed ### Fixed diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 84a7d5e1a..d80a3b626 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -1,4 +1,4 @@ -namespace Equinox.Cosmos.Store +namespace Equinox.Cosmos.Store open Equinox.Core open FsCodec @@ -265,7 +265,7 @@ module Log = let enrich (e : LogEvent) = e.AddPropertyIfAbsent(LogEventProperty("cosmosEvt", ScalarValue(value))) log.ForContext({ new Serilog.Core.ILogEventEnricher with member __.Enrich(evt,_) = enrich evt }) let (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length - let (|EventLen|) (x: #IEventData<_>) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes+metaBytes + let (|EventLen|) (x: #IEventData<_>) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes + metaBytes + 80 let (|BatchLen|) = Seq.sumBy (|EventLen|) /// NB Caveat emptor; this is subject to unlimited change without the major version changing - while the `dotnet-templates` repo will be kept in step, and @@ -494,32 +494,27 @@ function sync(req, expIndex, expEtag) { let private logged (container,stream) (exp : Exp, req: Tip) (log : ILogger) : Async = async { - let verbose = log.IsEnabled Serilog.Events.LogEventLevel.Debug - let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log + let! t, (ru, result) = run (container,stream) (exp, req) |> Stopwatch.Time let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length - let log = log |> Log.prop "bytes" bytes - let writeLog = - log |> Log.prop "stream" stream - |> Log.prop "count" req.e.Length |> Log.prop "ucount" req.u.Length - |> match exp with - | Exp.Etag et -> Log.prop "expectedEtag" et - | Exp.Version ev -> Log.prop "expectedVersion" ev - | Exp.Any -> Log.prop "expectedVersion" -1 - let! t, (ru,result) = run (container,stream) (exp, req) |> Stopwatch.Time - let resultLog = - let mkMetric ru : Log.Measurement = { stream = stream; interval = t; bytes = bytes; count = count; ru = ru } - let logConflict () = writeLog.Information("EqxCosmos Sync: Conflict writing {eventTypes}", Seq.truncate 5 (seq { for x in req.e -> x.c })) - match result with - | Result.Written pos -> - log |> Log.event (Log.SyncSuccess (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos - | Result.ConflictUnknown pos -> - logConflict () - log |> Log.event (Log.SyncConflict (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos |> Log.prop "conflict" true - | Result.Conflict (pos, xs) -> - logConflict () - let log = if verbose then log |> Log.prop "nextExpectedVersion" pos |> Log.propData "conflicts" xs else log - log |> Log.event (Log.SyncResync(mkMetric ru)) |> Log.prop "conflict" true - resultLog.Information("EqxCosmos {action:l} {count}+{ucount} {ms}ms rc={ru}", "Sync", req.e.Length, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru) + let log = + let inline mkMetric ru : Log.Measurement = { stream = stream; interval = t; bytes = bytes; count = count; ru = ru } + let inline propConflict log = log |> Log.prop "conflict" true |> Log.prop "eventTypes" (Seq.truncate 5 (seq { for x in req.e -> x.c })) + let verbose = log.IsEnabled Serilog.Events.LogEventLevel.Debug + (if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log) + |> match exp with + | Exp.Etag et -> Log.prop "expectedEtag" et + | Exp.Version ev -> Log.prop "expectedVersion" ev + | Exp.Any -> Log.prop "expectedVersion" -1 + |> match result with + | Result.Written pos -> + Log.prop "nextExpectedVersion" pos >> Log.event (Log.SyncSuccess (mkMetric ru)) + | Result.ConflictUnknown pos -> + Log.prop "nextExpectedVersion" pos >> propConflict >> Log.event (Log.SyncConflict (mkMetric ru)) + | Result.Conflict (pos, xs) -> + (if verbose then Log.propData "conflicts" xs else id) + >> Log.prop "nextExpectedVersion" pos >> propConflict >> Log.event (Log.SyncResync(mkMetric ru)) + log.Information("EqxCosmos {action:l} {stream} {count}+{ucount} {ms:f1}ms {ru}RU {bytes:n0}b {exp}", + "Sync", stream, count, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru, bytes, exp) return result } let batch (log : ILogger) retryPolicy containerStream batch: Async = From e3c45d81f4729a2613cb9cc53be39c6abbfc89dc Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 29 Sep 2020 16:14:38 +0100 Subject: [PATCH 2/5] Cherry-pick V3 stored proc enhancements (#242) --- CHANGELOG.md | 2 +- src/Equinox.Cosmos/Cosmos.fs | 91 +++++++++++-------- .../CosmosCoreIntegration.fs | 18 ++-- .../CosmosIntegration.fs | 6 +- .../Equinox.Cosmos.Integration.fsproj | 2 + 5 files changed, 70 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f83929669..b4d368034 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Changed - `Cosmos`: Reorganize Sync log message text, merge with Sync Conflict message [#241](https://github.com/jet/equinox/pull/241) +- `Cosmos`: Converge Stored Procedure Impl with `tip-isa-batch` impl from V3 (minor Request Charges cost reduction) [#242](https://github.com/jet/equinox/pull/242) ### Removed ### Fixed @@ -27,7 +28,6 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Cosmos`: Tweaked log messages - ## [2.3.0-rc1] - 2020-08-31 diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index d80a3b626..b387ad766 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -404,23 +404,27 @@ module Sync = // NB don't nest in a private module, or serialization will fail miserably ;) [] type SyncResponse = { etag: string; n: int64; conflicts: Unfold[] } - let [] private sprocName = "EquinoxRollingUnfolds3" // NB need to rename/number for any breaking change + let [] private sprocName = "EquinoxRollingUnfolds4" // NB need to rename/number for any breaking change let [] private sprocBody = """ -// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states -// 1 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index) -// 2a Verify no current Tip; if so - incoming req.e and defines the `n`ext position / unfolds -// 2b If we already have a tip, move position forward, replace unfolds -// 3 insert a new document containing the events as part of the same batch of work -// 3a in some cases, there are only changes to the `u`nfolds and no `e`vents, in which case no write should happen +// Manages the merging of the supplied Request Batch into the stream + +// 0 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index) + +// High level end-states: + +// 1a if there is a Tip, but are only changes to the `u`nfolds (and no `e`vents) -> update Tip only +// 1b if there is a Tip, but incoming request includes an event -> generate a batch document + create empty Tip + +// 2a if stream empty, but incoming request includes an event -> generate a batch document + create empty Tip +// 2b if no current Tip, and no events being written -> the incoming `req` becomes the Tip batch + function sync(req, expIndex, expEtag) { if (!req) throw new Error("Missing req argument"); - const collection = getContext().getCollection(); - const collectionLink = collection.getSelfLink(); + const collectionLink = __.getSelfLink(); const response = getContext().getResponse(); - // Locate the Tip (-1) batch for this stream (which may not exist) - const tipDocId = collection.getAltLink() + "/docs/" + req.id; - const isAccepted = collection.readDocument(tipDocId, {}, function (err, current) { + const tipDocId = __.getAltLink() + "/docs/" + req.id; + const isAccepted = __.readDocument(tipDocId, {}, function (err, current) { // Verify we dont have a conflicting write if (expIndex === -1) { // For Any mode, we always do an append operation @@ -429,7 +433,10 @@ function sync(req, expIndex, expEtag) { // If there is no Tip page, the writer has no possible reason for writing at an index other than zero, and an etag exp must be fulfilled response.setBody({ etag: null, n: 0, conflicts: [] }); } else if (current && ((expIndex === -2 && expEtag !== current._etag) || (expIndex !== -2 && expIndex !== current.n))) { - // if we're working based on etags, the `u`nfolds very likely to bear relevant info as state-bearing unfolds + // Where possible, we extract conflicting events from e and/or u in order to avoid another read cycle; + // yielding [] triggers the client to go loading the events itself + + // if we're working based on etags, the `u`nfolds likely bear relevant info as state-bearing unfolds // if there are no `u`nfolds, we need to be careful not to yield `conflicts: null`, as that signals a successful write (see below) response.setBody({ etag: current._etag, n: current.n, conflicts: current.u || [] }); } else { @@ -438,34 +445,46 @@ function sync(req, expIndex, expEtag) { }); if (!isAccepted) throw new Error("readDocument not Accepted"); - function executeUpsert(current) { + function executeUpsert(tip) { function callback(err, doc) { if (err) throw err; response.setBody({ etag: doc._etag, n: doc.n, conflicts: null }); } - var tip; - if (!current) { - tip = { p: req.p, id: req.id, i: req.e.length, n: req.e.length, e: [], u: req.u }; - const tipAccepted = collection.createDocument(collectionLink, tip, { disableAutomaticIdGeneration: true }, callback); - if (!tipAccepted) throw new Error("Unable to create Tip."); - } else { - // TODO Carry forward `u` items not in `req`, together with supporting catchup events from preceding batches - const n = current.n + req.e.length; - tip = { p: current.p, id: current.id, i: n, n: n, e: [], u: req.u }; + if (tip) { + Array.prototype.push.apply(tip.e, req.e); + tip.n = tip.i + tip.e.length; + // If there are events, calve them to their own batch (this behavior is to simplify CFP consumer impl) + if (tip.e.length > 0) { + const batch = { id: tip.i.toString(), p: tip.p, i: tip.i, n: tip.n, e: tip.e } + const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true }); + if (!batchAccepted) throw new Error("Unable to remove Tip markings."); + + tip.i = tip.n; + tip.e = []; + } - // as we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place - const tipAccepted = collection.replaceDocument(current._self, tip, { etag: current._etag }, callback); - if (!tipAccepted) throw new Error("Unable to replace Tip."); - } - // if there's only a state update involved, we don't do an event-batch write (if we did, they'd trigger uniqueness violations) - if (req.e.length) { - // For now, always do an Insert, as Change Feed mechanism does not yet afford us a way to - // a) guarantee an item per write (multiple consecutive updates can be 'squashed') - // b) with metadata sufficient for us to determine the items added (only etags, no way to convey i/n in feed item) - const i = tip.n - req.e.length; - const batch = { p: tip.p, id: i.toString(), i: i, n: tip.n, e: req.e }; - const batchAccepted = collection.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true }); - if (!batchAccepted) throw new Error("Unable to insert Batch."); + // TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches + + // Replace all the unfolds // TODO: should remove only unfolds being superseded + tip.u = req.u; + // As we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place + const isAccepted = __.replaceDocument(tip._self, tip, { etag: tip._etag }, callback); + if (!isAccepted) throw new Error("Unable to replace Tip batch."); + } else { + // NOTE we write the batch first (more consistent RU cost than writing tip first) + if (req.e.length > 0) { + const batch = { id: "0", p: req.p, i: 0, n: req.e.length, e: req.e }; + const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true }); + if (!batchAccepted) throw new Error("Unable to create Batch 0."); + + req.i = batch.n; + req.e = []; + } else { + req.i = 0; + } + req.n = req.i + req.e.length; + const isAccepted = __.createDocument(collectionLink, req, { disableAutomaticIdGeneration: true }, callback); + if (!isAccepted) throw new Error("Unable to create Tip batch."); } } }""" diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index 877f29541..dbc3d2b6d 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -46,7 +46,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName index <| TestEvents.Create(0,1) test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 32 // 31.22 // WAS 10 + verifyRequestChargesMax 33 // 32.27 // WAS 10 // Clear the counters capture.Clear() @@ -54,7 +54,7 @@ type Tests(testOutputHelper) = test <@ AppendResult.Ok 6L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> // We didnt request small batches or splitting so it's not dramatically more expensive to write N events - verifyRequestChargesMax 39 // 38.21 // was 11 + verifyRequestChargesMax 39 // 38.74 // was 11 } // It's conceivable that in the future we might allow zero-length batches as long as a sync mechanism leveraging the etags and unfolds update mechanisms @@ -121,7 +121,7 @@ type Tests(testOutputHelper) = test <@ [EqxAct.Append] = capture.ExternalCalls @> pos <- pos + int64 appendBatchSize pos =! res - verifyRequestChargesMax 42 // 41.12 // 46 // 44.07 observed + verifyRequestChargesMax 46 // 45.16 capture.Clear() let! res = Events.getNextIndex ctx streamName @@ -134,7 +134,7 @@ type Tests(testOutputHelper) = pos <- pos + 42L pos =! res test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 48 // 47.02 // WAS 20 + verifyRequestChargesMax 46 // 45.42 // 47.02 // WAS 20 capture.Clear() let! res = Events.getNextIndex ctx streamName @@ -149,12 +149,12 @@ type Tests(testOutputHelper) = let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100 let! _pos = ctx.NonIdempotentAppend(stream, TestEvents.Create (int pos,extrasCount)) test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 465 // 463.01 observed + verifyRequestChargesMax 149 // 148.11 // 463.01 observed capture.Clear() let! pos = ctx.Sync(stream,?position=None) test <@ [EqxAct.Tip] = capture.ExternalCalls @> - verifyRequestChargesMax 45 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded) + verifyRequestChargesMax 5 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded) capture.Clear() let! _pos = ctx.Sync(stream,pos) @@ -174,7 +174,7 @@ type Tests(testOutputHelper) = test <@ [EqxAct.Resync] = capture.ExternalCalls @> // The response aligns with a normal conflict in that it passes the entire set of conflicting events () test <@ AppendResult.Conflict (0L,[||]) = res @> - verifyRequestChargesMax 7 // 6.6 // WAS 5 + verifyRequestChargesMax 6 // 5.5 // WAS 5 capture.Clear() // Now write at the correct position @@ -182,7 +182,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName 0L expected test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 32 // 30.42 WAS 11 // 10.33 + verifyRequestChargesMax 33 // 32.05 WAS 11 // 10.33 capture.Clear() // Try overwriting it (a competing consumer would see the same) @@ -200,7 +200,7 @@ type Tests(testOutputHelper) = #else test <@ [EqxAct.Conflict] = capture.ExternalCalls @> #endif - verifyRequestChargesMax 7 // 6.64 + verifyRequestChargesMax 6 // 5.63 // 6.64 capture.Clear() } diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 4cc15eaa2..dd750df1f 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -93,7 +93,7 @@ type Tests(testOutputHelper) = // Extra roundtrip required after maxItemsPerRequest is exceeded let expectedBatchesOfItems = max 1 ((i-1) / maxItemsPerRequest) test <@ i = i && List.replicate expectedBatchesOfItems EqxAct.ResponseBackward @ [EqxAct.QueryBackward; EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 61 // 57.09 [5.24 + 54.78] // 5.5 observed for read + verifyRequestChargesMax 60 // 59.27 [3.28; 55.99] // 5.5 observed for read capture.Clear() // Validate basic operation; Key side effect: Log entries will be emitted to `capture` @@ -103,7 +103,7 @@ type Tests(testOutputHelper) = let expectedResponses = transactions/maxItemsPerRequest + 1 test <@ List.replicate expectedResponses EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @> - verifyRequestChargesMax 11 // 10.01 + verifyRequestChargesMax 8 // 7.74 // 10.01 } [] @@ -375,4 +375,4 @@ type Tests(testOutputHelper) = capture.Clear() do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service1 1 test <@ [EqxAct.Append] = capture.ExternalCalls @> - } \ No newline at end of file + } diff --git a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj index ce2a6041f..180a475d2 100644 --- a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj +++ b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj @@ -26,6 +26,8 @@ + + From 03b00fbea59e5c71c1502a18b952d5425f747a56 Mon Sep 17 00:00:00 2001 From: OmnipotentOwl <1769881+OmnipotentOwl@users.noreply.github.com> Date: Tue, 29 Sep 2020 18:38:34 -0400 Subject: [PATCH 3/5] Update `eqx init` to support Serverless re #244 as in master #245 * Add configuration for init tool to support Cosmos DB Serverless accounts. * Update inline help text to include default value if not provided. --- CHANGELOG.md | 3 +++ src/Equinox.Cosmos/Cosmos.fs | 10 ++++---- tools/Equinox.Tool/Program.fs | 47 ++++++++++++++++++++++++----------- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4d368034..81d3e253b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- `Cosmos`: Support Serverless Account Mode in `eqx init`; default RU/s to 400 if unspecified [#244](https://github.com/jet/equinox/pull/244) :pray: [@OmnipotentOwl](https://github.com/OmnipotentOwl) + ### Changed - `Cosmos`: Reorganize Sync log message text, merge with Sync Conflict message [#241](https://github.com/jet/equinox/pull/241) diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index b387ad766..f1494e698 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -548,7 +548,7 @@ function sync(req, expIndex, expEtag) { module Initialization = open System.Linq - type [] Provisioning = Container of rus: int | Database of rus: int + type [] Provisioning = Container of rus: int | Database of rus: int | Serverless let adjustOffer (c:Client.DocumentClient) resourceLink rus = async { let offer = c.CreateOfferQuery().Where(fun r -> r.ResourceLink = resourceLink).AsEnumerable().Single() let! _ = c.ReplaceOfferAsync(OfferV2(offer,rus)) |> Async.AwaitTaskCorrect in () } @@ -561,7 +561,7 @@ function sync(req, expIndex, expEtag) { | Provisioning.Database rus -> let! db = createDatabaseIfNotExists client dName (Some rus) return! adjustOffer client db.Resource.SelfLink rus - | Provisioning.Container _ -> + | Provisioning.Container _ | Provisioning.Serverless -> let! _ = createDatabaseIfNotExists client dName None in () } let private createContainerIfNotExists (client:Client.DocumentClient) dName (def: DocumentCollection) maybeRus = let dbUri = Client.UriFactory.CreateDatabaseUri dName @@ -569,11 +569,11 @@ function sync(req, expIndex, expEtag) { client.CreateDocumentCollectionIfNotExistsAsync(dbUri, def, opts) |> Async.AwaitTaskCorrect let private createOrProvisionContainer (client: Client.DocumentClient) (dName, def: DocumentCollection) mode = async { match mode with - | Provisioning.Database _ -> - let! _ = createContainerIfNotExists client dName def None in () | Provisioning.Container rus -> let! container = createContainerIfNotExists client dName def (Some rus) in () - return! adjustOffer client container.Resource.SelfLink rus } + return! adjustOffer client container.Resource.SelfLink rus + | Provisioning.Database _ | Provisioning.Serverless -> + let! _ = createContainerIfNotExists client dName def None in () } let private createStoredProcIfNotExists (c:Container) (name, body): Async = async { try let! r = c.Client.CreateStoredProcedureAsync(c.CollectionUri, StoredProcedure(Id = name, Body = body)) |> Async.AwaitTaskCorrect return r.RequestCharge diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index c7933ff18..4b2138cfc 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -12,6 +12,8 @@ open System open System.Net.Http open System.Threading +type Provisioning = Equinox.Cosmos.Store.Sync.Initialization.Provisioning + let [] appName = "equinox-tool" [] @@ -20,11 +22,11 @@ type Arguments = | [] VerboseConsole | [] LocalSeq | [] LogFile of string - | [] Run of ParseResults - | [] Init of ParseResults - | [] Config of ParseResults - | [] Stats of ParseResults - | [] Dump of ParseResults + | [] Run of ParseResults + | [] Init of ParseResults + | [] Config of ParseResults + | [] Stats of ParseResults + | [] Dump of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Verbose -> "Include low level logging regarding specific test runs." @@ -33,20 +35,28 @@ type Arguments = | LogFile _ -> "specify a log file to write the result breakdown into (default: eqx.log)." | Run _ -> "Run a load test" | Init _ -> "Initialize Store/Container (supports `cosmos` stores; also handles RU/s provisioning adjustment)." - | Config _ -> "Initialize Database Schema (supports `mssql`/`mysql`/`postgres` SqlStreamStore stores)." + | Config _ -> "Initialize Database Schema (supports `mssql`/`mysql`/`postgres` SqlStreamStore stores)." | Stats _ -> "inspect store to determine numbers of streams/documents/events (supports `cosmos` stores)." | Dump _ -> "Load and show events in a specified stream (supports all stores)." and []InitArguments = - | [] Rus of int - | [] Shared + | [] Rus of int + | [] Mode of CosmosModeType | [] SkipStoredProc | [] Cosmos of ParseResults interface IArgParserTemplate with member a.Usage = a |> function - | Rus _ -> "Specify RU/s level to provision for the Container." - | Shared -> "Use Database-level RU allocations (Default: Use Container-level allocation)." + | Rus _ -> "Specify RU/s level to provision for the Container (Default: 400 RU/s)." + | Mode _ -> "Configure RU mode to use Container-level RU, Database-level RU, or Serverless allocations (Default: Use Container-level allocation)." | SkipStoredProc -> "Inhibit creation of stored procedure in specified Container." | Cosmos _ -> "Cosmos Connection parameters." +and CosmosInitInfo(args : ParseResults) = + member __.ProvisioningMode = + match args.GetResult(Mode, CosmosModeType.Container) with + | CosmosModeType.Container -> Provisioning.Container (args.GetResult(Rus, 400)) + | CosmosModeType.Db -> Provisioning.Database (args.GetResult(Rus, 400)) + | CosmosModeType.Serverless -> + if args.Contains Rus then raise (Storage.MissingArg "Cannot specify RU/s in Serverless mode") + Provisioning.Serverless and []ConfigArguments = | [] MsSql of ParseResults | [] MySql of ParseResults @@ -204,6 +214,7 @@ and TestInfo(args: ParseResults) = | SaveForLater -> Tests.SaveForLater | Todo -> Tests.Todo (args.GetResult(Size,100)) and Test = Favorite | SaveForLater | Todo +and CosmosModeType = Container | Db | Serverless let createStoreLog verbose verboseConsole maybeSeqEndpoint = let c = LoggerConfiguration().Destructure.FSharpTypes() @@ -313,11 +324,19 @@ module CosmosInit = let containerAndOrDb (log: ILogger, verboseConsole, maybeSeq) (iargs: ParseResults) = async { match iargs.TryGetSubCommand() with | Some (InitArguments.Cosmos sargs) -> - let rus, skipStoredProc = iargs.GetResult(InitArguments.Rus), iargs.Contains InitArguments.SkipStoredProc - let mode = if iargs.Contains InitArguments.Shared then Provisioning.Database rus else Provisioning.Container rus - let modeStr, rus = match mode with Provisioning.Container rus -> "Container",rus | Provisioning.Database rus -> "Database",rus + let skipStoredProc = iargs.Contains InitArguments.SkipStoredProc let! _storeLog,conn,dName,cName = conn (log,verboseConsole,maybeSeq) sargs - log.Information("Provisioning `Equinox.Cosmos` Store at {mode:l} level for {rus:n0} RU/s", modeStr, rus) + let mode = (CosmosInitInfo iargs).ProvisioningMode + match mode with + | Provisioning.Container ru -> + let modeStr = "Container" + log.Information("Provisioning `Equinox.Cosmos` Store at {mode:l} level for {rus:n0} RU/s", modeStr, ru) + | Provisioning.Database ru -> + let modeStr = "Database" + log.Information("Provisioning `Equinox.Cosmos` Store at {mode:l} level for {rus:n0} RU/s", modeStr, ru) + | Provisioning.Serverless -> + let modeStr = "Serverless" + log.Information("Provisioning `Equinox.Cosmos` Store in {mode:l} mode with automatic RU/s as configured in account", modeStr) return! init log conn.Client (dName,cName) mode skipStoredProc | _ -> failwith "please specify a `cosmos` endpoint" } From 9edc3308e5ca50e5b203201df41846900cf020e7 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 2 Oct 2020 16:10:31 +0100 Subject: [PATCH 4/5] Cosmos: Enable uncompressed unfolds (#249) --- CHANGELOG.md | 1 + src/Equinox.Cosmos/Cosmos.fs | 80 ++++++++++++------- .../CosmosCoreIntegration.fs | 4 +- .../JsonConverterTests.fs | 50 ++++++------ 4 files changed, 78 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 81d3e253b..86be9a72f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added - `Cosmos`: Support Serverless Account Mode in `eqx init`; default RU/s to 400 if unspecified [#244](https://github.com/jet/equinox/pull/244) :pray: [@OmnipotentOwl](https://github.com/OmnipotentOwl) +- `Cosmos`: Added ability to turn off compression of Unfolds [#249](https://github.com/jet/equinox/pull/249) :pray: [@ylibrach](https://github.com/ylibrach) ### Changed diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index f1494e698..f0e6d82d5 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -88,45 +88,48 @@ type Unfold = /// The Case (Event Type) of this compaction/snapshot, used to drive deserialization c: string // required - /// Event body - Json -> UTF-8 -> Deflate -> Base64 - [)>] + /// UTF-8 JSON OR Event body - Json -> UTF-8 -> Deflate -> Base64 + [)>] d: byte[] // required /// Optional metadata, same encoding as `d` (can be null; not written if missing) - [)>] + [)>] [] m: byte[] } // optional -/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc -/// Only applied to snapshots in the Tip -and Base64DeflateUtf8JsonConverter() = +/// Transparently encodes/decodes fields that can optionally by compressed by +/// 1. Writing outgoing values (which may be JSON string, JSON object, or null) from a UTF-8 JSON array representation as per VerbatimUtf8Converter +/// 2a. Decoding incoming JSON String values by Decompressing it to a UTF-8 JSON array representation +/// 2b. Decoding incoming JSON non-string values by reading the raw value directly into a UTF-8 JSON array as per VerbatimUtf8Converter +and Base64MaybeDeflateUtf8JsonConverter() = inherit JsonConverter() - let pickle (input : byte[]) : string = - if input = null then null else - - use output = new MemoryStream() - use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) - compressor.Write(input,0,input.Length) - compressor.Close() - System.Convert.ToBase64String(output.ToArray()) - let unpickle str : byte[] = - if str = null then null else - + let inflate str : byte[] = let compressedBytes = System.Convert.FromBase64String str use input = new MemoryStream(compressedBytes) use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress) use output = new MemoryStream() decompressor.CopyTo(output) output.ToArray() + static member Compress(input : byte[]) : byte[] = + if input = null || input.Length = 0 then null else + use output = new System.IO.MemoryStream() + use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) + compressor.Write(input,0,input.Length) + compressor.Close() + String.Concat("\"", System.Convert.ToBase64String(output.ToArray()), "\"") + |> System.Text.Encoding.UTF8.GetBytes override __.CanConvert(objectType) = typeof.Equals(objectType) override __.ReadJson(reader, _, _, serializer) = - //( if reader.TokenType = JsonToken.Null then null else - serializer.Deserialize(reader, typedefof) :?> string |> unpickle |> box + match reader.TokenType with + | JsonToken.Null -> null + | JsonToken.String -> serializer.Deserialize(reader, typedefof) :?> string |> inflate |> box + | _ -> Newtonsoft.Json.Linq.JToken.Load reader |> string |> System.Text.Encoding.UTF8.GetBytes |> box override __.WriteJson(writer, value, serializer) = - let pickled = value |> unbox |> pickle - serializer.Serialize(writer, pickled) + let array = value :?> byte[] + if array = null || array.Length = 0 then serializer.Serialize(writer, null) + else System.Text.Encoding.UTF8.GetString array |> writer.WriteRawValue /// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document /// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`) @@ -539,12 +542,21 @@ function sync(req, expIndex, expEtag) { let batch (log : ILogger) retryPolicy containerStream batch: Async = let call = logged containerStream batch Log.withLoggedRetries retryPolicy "writeAttempt" call log + + let private mkEvent (e : IEventData<_>) = + { t = e.Timestamp; c = e.EventType; d = e.Data; m = e.Meta; correlationId = e.CorrelationId; causationId = e.CausationId } let mkBatch (stream: string) (events: IEventData<_>[]) unfolds: Tip = { p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null - e = [| for e in events -> { t = e.Timestamp; c = e.EventType; d = e.Data; m = e.Meta; correlationId = e.CorrelationId; causationId = e.CausationId } |] - u = Array.ofSeq unfolds } - let mkUnfold baseIndex (unfolds: IEventData<_> seq) : Unfold seq = - unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; c = x.EventType; d = x.Data; m = x.Meta; t = DateTimeOffset.UtcNow } : Unfold) + e = Array.map mkEvent events; u = Array.ofSeq unfolds } + let mkUnfold compressor baseIndex (unfolds: IEventData<_> seq) : Unfold seq = + unfolds + |> Seq.mapi (fun offset x -> + { i = baseIndex + int64 offset + c = x.EventType + d = compressor x.Data + m = compressor x.Meta + t = DateTimeOffset.UtcNow + } : Unfold) module Initialization = open System.Linq @@ -1026,7 +1038,7 @@ type private Category<'event, 'state, 'context>(gateway : Gateway, codec : IEven match res with | LoadFromTokenResult.Unchanged -> return current | LoadFromTokenResult.Found (token', events') -> return token', fold state events' } - member __.Sync(Token.Unpack (container,stream,pos), state as current, events, mapUnfolds, fold, isOrigin, log, context): Async> = async { + member __.Sync(Token.Unpack (container,stream,pos), state as current, events, mapUnfolds, fold, isOrigin, log, context, compressUnfolds): Async> = async { let state' = fold state (Seq.ofList events) let encode e = codec.Encode(context,e) let exp,events,eventsEncoded,projectionsEncoded = @@ -1037,7 +1049,8 @@ type private Category<'event, 'state, 'context>(gateway : Gateway, codec : IEven let events', unfolds = transmute events state' Sync.Exp.Etag (defaultArg pos.etag null), events', Seq.map encode events' |> Array.ofSeq, Seq.map encode unfolds let baseIndex = pos.index + int64 (List.length events) - let projections = Sync.mkUnfold baseIndex projectionsEncoded + let compressor = if compressUnfolds then Base64MaybeDeflateUtf8JsonConverter.Compress else id + let projections = Sync.mkUnfold compressor baseIndex projectionsEncoded let batch = Sync.mkBatch stream eventsEncoded projections let! res = gateway.Sync log (container,stream) (exp,batch) match res with @@ -1081,6 +1094,7 @@ type private Folder<'event, 'state, 'context> ( category: Category<'event, 'state, 'context>, fold: 'state -> 'event seq -> 'state, initial: 'state, isOrigin: 'event -> bool, mapUnfolds: Choice 'state -> 'event seq),('event list -> 'state -> 'event list * 'event list)>, + compressUnfolds, ?readCache) = let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true let batched log containerStream = category.Load inspectUnfolds containerStream fold initial isOrigin log @@ -1095,8 +1109,7 @@ type private Folder<'event, 'state, 'context> | Some tokenAndState -> return! category.LoadFromToken tokenAndState fold isOrigin log } member __.TrySync(log : ILogger, streamToken, state, events : 'event list, context) : Async> = async { - let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log, context) - match res with + match! category.Sync((streamToken, state), events, mapUnfolds, fold, isOrigin, log, context, compressUnfolds) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token',state') -> return SyncResult.Written (token',state') } @@ -1182,7 +1195,12 @@ type AccessStrategy<'event,'state> = /// | Custom of isOrigin: ('event -> bool) * transmute: ('event list -> 'state -> 'event list*'event list) -type Resolver<'event, 'state, 'context>(context : Context, codec, fold, initial, caching, access) = +type Resolver<'event, 'state, 'context> + ( context : Context, codec, fold, initial, caching, access, + /// Compress Unfolds in Tip. Default: true. + /// NOTE when set to false, requires Equinox.Cosmos Version >= 2.3.0 to be able to read + ?compressUnfolds) = + let compressUnfolds = defaultArg compressUnfolds true let readCacheOption = match caching with | CachingStrategy.NoCaching -> None @@ -1196,7 +1214,7 @@ type Resolver<'event, 'state, 'context>(context : Context, codec, fold, initial, | AccessStrategy.RollingState toSnapshot -> (fun _ -> true), Choice3Of3 (fun _ state -> [],[toSnapshot state]) | AccessStrategy.Custom (isOrigin,transmute) -> isOrigin, Choice3Of3 transmute let cosmosCat = Category<'event, 'state, 'context>(context.Gateway, codec) - let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, ?readCache = readCacheOption) + let folder = Folder<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, mapUnfolds, compressUnfolds, ?readCache = readCacheOption) let category : ICategory<_, _, Container*string, 'context> = match caching with | CachingStrategy.NoCaching -> folder :> _ diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index dbc3d2b6d..00177d3d5 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -54,7 +54,7 @@ type Tests(testOutputHelper) = test <@ AppendResult.Ok 6L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> // We didnt request small batches or splitting so it's not dramatically more expensive to write N events - verifyRequestChargesMax 39 // 38.74 // was 11 + verifyRequestChargesMax 41 // 40.68 // was 11 } // It's conceivable that in the future we might allow zero-length batches as long as a sync mechanism leveraging the etags and unfolds update mechanisms @@ -149,7 +149,7 @@ type Tests(testOutputHelper) = let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100 let! _pos = ctx.NonIdempotentAppend(stream, TestEvents.Create (int pos,extrasCount)) test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 149 // 148.11 // 463.01 observed + verifyRequestChargesMax 448 // 447.5 // 463.01 observed capture.Clear() let! pos = ctx.Sync(stream,?position=None) diff --git a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs index fc283b40c..0d911e261 100644 --- a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs +++ b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs @@ -18,36 +18,38 @@ let defaultSettings = FsCodec.NewtonsoftJson.Settings.CreateDefault() type Base64ZipUtf8Tests() = let eventCodec = FsCodec.NewtonsoftJson.Codec.Create(defaultSettings) - [] - let ``serializes, achieving compression`` () = - let encoded = eventCodec.Encode(None,A { embed = String('x',5000) }) + let ser eventType data = let e : Store.Unfold = { i = 42L - c = encoded.EventType - d = encoded.Data + c = eventType + d = data m = null t = DateTimeOffset.MinValue } - let res = JsonConvert.SerializeObject e - test <@ res.Contains("\"d\":\"") && res.Length < 128 @> + JsonConvert.SerializeObject e + + [] + let ``serializes, achieving expected compression`` () = + let encoded = eventCodec.Encode(None,A { embed = String('x',5000) }) + let res = ser encoded.EventType (Store.Base64MaybeDeflateUtf8JsonConverter.Compress encoded.Data) + test <@ res.Contains("\"d\":\"") && res.Length < 138 @> [] - let roundtrips value = - let hasNulls = - match value with - | A x | B x when obj.ReferenceEquals(null, x) -> true - | A { embed = x } | B { embed = x } -> obj.ReferenceEquals(null, x) - if hasNulls then () else - - let encoded = eventCodec.Encode(None,value) - let e : Store.Unfold = - { i = 42L - c = encoded.EventType - d = encoded.Data - m = null - t = DateTimeOffset.MinValue } - let ser = JsonConvert.SerializeObject(e) - test <@ ser.Contains("\"d\":\"") @> + let roundtrips compress value = + let encoded = eventCodec.Encode(None, value) + let maybeCompressor = if compress then Store.Base64MaybeDeflateUtf8JsonConverter.Compress else id + let actualData = maybeCompressor encoded.Data + let ser = ser encoded.EventType actualData + test <@ if compress then ser.Contains("\"d\":\"") + else ser.Contains("\"d\":{") @> let des = JsonConvert.DeserializeObject(ser) let d = FsCodec.Core.TimelineEvent.Create(-1L, des.c, des.d) let decoded = eventCodec.TryDecode d |> Option.get - test <@ value = decoded @> \ No newline at end of file + test <@ value = decoded @> + + [] + let handlesNulls compress = + let maybeCompressor = if compress then Store.Base64MaybeDeflateUtf8JsonConverter.Compress else id + let maybeCompressed = maybeCompressor null + let ser = ser "AnEventType" maybeCompressed + let des = JsonConvert.DeserializeObject(ser) + test <@ null = des.d @> From 33e9afd03db33af5b872679f8ddd3d616f8e402c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 2 Oct 2020 16:14:22 +0100 Subject: [PATCH 5/5] Release 2.3.0-rc2 --- CHANGELOG.md | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 86be9a72f..a5865992c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,28 +9,22 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added - -- `Cosmos`: Support Serverless Account Mode in `eqx init`; default RU/s to 400 if unspecified [#244](https://github.com/jet/equinox/pull/244) :pray: [@OmnipotentOwl](https://github.com/OmnipotentOwl) -- `Cosmos`: Added ability to turn off compression of Unfolds [#249](https://github.com/jet/equinox/pull/249) :pray: [@ylibrach](https://github.com/ylibrach) - ### Changed - -- `Cosmos`: Reorganize Sync log message text, merge with Sync Conflict message [#241](https://github.com/jet/equinox/pull/241) -- `Cosmos`: Converge Stored Procedure Impl with `tip-isa-batch` impl from V3 (minor Request Charges cost reduction) [#242](https://github.com/jet/equinox/pull/242) - ### Removed ### Fixed - -## [2.3.0-rc1] - 2020-08-31 + +## [2.3.0-rc2] - 2020-10-02 ### Added -- `Cosmos`: `Prune` API to delete events from the head of a stream [#233](https://github.com/jet/equinox/pull/233) +- `Cosmos`: Support Serverless Account Mode in `eqx init`; default RU/s to 400 if unspecified [#244](https://github.com/jet/equinox/pull/244) :pray: [@OmnipotentOwl](https://github.com/OmnipotentOwl) +- `Cosmos`: Added ability to turn off compression of Unfolds [#249](https://github.com/jet/equinox/pull/249) :pray: [@ylibrach](https://github.com/ylibrach) ### Changed -- `Cosmos`: Tweaked log messages +- `Cosmos`: Reorganize Sync log message text, merge with Sync Conflict message [#241](https://github.com/jet/equinox/pull/241) +- `Cosmos`: Converge Stored Procedure Impl with `tip-isa-batch` impl from V3 (minor Request Charges cost reduction) [#242](https://github.com/jet/equinox/pull/242) ## [2.3.0-rc1] - 2020-08-31 @@ -432,7 +426,8 @@ The `Unreleased` section name is replaced by the expected version of next releas (For information pertaining to earlier releases, see release notes in https://github.com/jet/equinox/releases and/or can someone please add it!) -[Unreleased]: https://github.com/jet/equinox/compare/2.3.0-rc1...HEAD +[Unreleased]: https://github.com/jet/equinox/compare/2.3.0-rc2...HEAD +[2.3.0-rc2]: https://github.com/jet/equinox/compare/2.3.0-rc1...2.3.0-rc2 [2.3.0-rc1]: https://github.com/jet/equinox/compare/2.2.0...2.3.0-rc1 [2.2.0]: https://github.com/jet/equinox/compare/2.1.0...2.2.0 [2.1.0]: https://github.com/jet/equinox/compare/2.0.2...2.1.0