From 7e1b49c5ba87d846e0a059d2b2c64831240f59a9 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Sat, 23 Nov 2024 16:26:48 +0100 Subject: [PATCH] feat(bulk): add parallel processing --- docs/api/README.md | 1 + openapi.yaml | 6 ++++ openapi/v2.yaml | 6 ++++ pkg/client/.speakeasy/gen.lock | 6 ++-- pkg/client/.speakeasy/gen.yaml | 2 +- .../models/operations/v2createbulkrequest.md | 1 + pkg/client/docs/sdks/v2/README.md | 1 + pkg/client/formance.go | 4 +-- pkg/client/models/operations/v2createbulk.go | 11 +++++- test/e2e/api_bulk_test.go | 34 ++++++++++++++++--- tools/generator/go.mod | 1 + 11 files changed, 61 insertions(+), 12 deletions(-) diff --git a/docs/api/README.md b/docs/api/README.md index f8e344946..a4ad477c0 100644 --- a/docs/api/README.md +++ b/docs/api/README.md @@ -500,6 +500,7 @@ Accept: application/json |ledger|path|string|true|Name of the ledger.| |continueOnFailure|query|boolean|false|Continue on failure| |atomic|query|boolean|false|Make bulk atomic| +|parallel|query|boolean|false|Process bulk elements in parallel| |body|body|[V2Bulk](#schemav2bulk)|false|none| > Example responses diff --git a/openapi.yaml b/openapi.yaml index 9f8baf13b..45e46e777 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -1460,6 +1460,12 @@ paths: schema: type: boolean example: true + - name: parallel + in: query + description: Process bulk elements in parallel + schema: + type: boolean + example: true requestBody: content: application/json: diff --git a/openapi/v2.yaml b/openapi/v2.yaml index 13eb82791..c78ffe775 100644 --- a/openapi/v2.yaml +++ b/openapi/v2.yaml @@ -291,6 +291,12 @@ paths: schema: type: boolean example: true + - name: parallel + in: query + description: Process bulk elements in parallel + schema: + type: boolean + example: true requestBody: content: application/json: diff --git a/pkg/client/.speakeasy/gen.lock b/pkg/client/.speakeasy/gen.lock index 983ac76d8..198238b13 100644 --- a/pkg/client/.speakeasy/gen.lock +++ b/pkg/client/.speakeasy/gen.lock @@ -1,12 +1,12 @@ lockVersion: 2.0.0 id: a9ac79e1-e429-4ee3-96c4-ec973f19bec3 management: - docChecksum: 4a4a3929b808f3192cbb2f02351bc186 + docChecksum: 2624238aba49e6a33f19ef1d62f0b568 docVersion: v1 speakeasyVersion: 1.351.0 generationVersion: 2.384.1 - releaseVersion: 0.4.33 - configChecksum: 22f33d29f62599fa892d20fe5d13f4cc + releaseVersion: 0.4.34 + configChecksum: 44b98e4f6380b040c4360085974a2b3f features: go: additionalDependencies: 0.1.0 diff --git a/pkg/client/.speakeasy/gen.yaml b/pkg/client/.speakeasy/gen.yaml index 117836507..c3cb8f2b1 100644 --- a/pkg/client/.speakeasy/gen.yaml +++ b/pkg/client/.speakeasy/gen.yaml @@ -15,7 +15,7 @@ generation: auth: oAuth2ClientCredentialsEnabled: true go: - version: 0.4.33 + version: 0.4.34 additionalDependencies: {} allowUnknownFieldsInWeakUnions: false clientServerStatusCodesAsErrors: true diff --git a/pkg/client/docs/models/operations/v2createbulkrequest.md b/pkg/client/docs/models/operations/v2createbulkrequest.md index 078c42518..46c8f599a 100644 --- a/pkg/client/docs/models/operations/v2createbulkrequest.md +++ b/pkg/client/docs/models/operations/v2createbulkrequest.md @@ -8,4 +8,5 @@ | `Ledger` | *string* | :heavy_check_mark: | Name of the ledger. | ledger001 | | `ContinueOnFailure` | **bool* | :heavy_minus_sign: | Continue on failure | true | | `Atomic` | **bool* | :heavy_minus_sign: | Make bulk atomic | true | +| `Parallel` | **bool* | :heavy_minus_sign: | Process bulk elements in parallel | true | | `RequestBody` | [][components.V2BulkElement](../../models/components/v2bulkelement.md) | :heavy_minus_sign: | N/A | | \ No newline at end of file diff --git a/pkg/client/docs/sdks/v2/README.md b/pkg/client/docs/sdks/v2/README.md index 38dd2752e..3788cf758 100644 --- a/pkg/client/docs/sdks/v2/README.md +++ b/pkg/client/docs/sdks/v2/README.md @@ -398,6 +398,7 @@ func main() { Ledger: "ledger001", ContinueOnFailure: client.Bool(true), Atomic: client.Bool(true), + Parallel: client.Bool(true), RequestBody: []components.V2BulkElement{ components.CreateV2BulkElementV2BulkElementCreateTransaction( components.V2BulkElementCreateTransaction{ diff --git a/pkg/client/formance.go b/pkg/client/formance.go index f6e207f06..4aa7b84a6 100644 --- a/pkg/client/formance.go +++ b/pkg/client/formance.go @@ -143,9 +143,9 @@ func New(opts ...SDKOption) *Formance { sdkConfiguration: sdkConfiguration{ Language: "go", OpenAPIDocVersion: "v1", - SDKVersion: "0.4.33", + SDKVersion: "0.4.34", GenVersion: "2.384.1", - UserAgent: "speakeasy-sdk/go 0.4.33 2.384.1 v1 github.com/formancehq/ledger/pkg/client", + UserAgent: "speakeasy-sdk/go 0.4.34 2.384.1 v1 github.com/formancehq/ledger/pkg/client", Hooks: hooks.New(), }, } diff --git a/pkg/client/models/operations/v2createbulk.go b/pkg/client/models/operations/v2createbulk.go index eb3819184..876185e0c 100644 --- a/pkg/client/models/operations/v2createbulk.go +++ b/pkg/client/models/operations/v2createbulk.go @@ -12,7 +12,9 @@ type V2CreateBulkRequest struct { // Continue on failure ContinueOnFailure *bool `queryParam:"style=form,explode=true,name=continueOnFailure"` // Make bulk atomic - Atomic *bool `queryParam:"style=form,explode=true,name=atomic"` + Atomic *bool `queryParam:"style=form,explode=true,name=atomic"` + // Process bulk elements in parallel + Parallel *bool `queryParam:"style=form,explode=true,name=parallel"` RequestBody []components.V2BulkElement `request:"mediaType=application/json"` } @@ -37,6 +39,13 @@ func (o *V2CreateBulkRequest) GetAtomic() *bool { return o.Atomic } +func (o *V2CreateBulkRequest) GetParallel() *bool { + if o == nil { + return nil + } + return o.Parallel +} + func (o *V2CreateBulkRequest) GetRequestBody() []components.V2BulkElement { if o == nil { return nil diff --git a/test/e2e/api_bulk_test.go b/test/e2e/api_bulk_test.go index fd568860b..24e3a6fb1 100644 --- a/test/e2e/api_bulk_test.go +++ b/test/e2e/api_bulk_test.go @@ -38,7 +38,7 @@ var _ = Context("Ledger engine tests", func() { ctx = logging.TestingContext() events chan *nats.Msg bulkResponse []components.V2BulkElementResult - bulkMaxSize = 5 + bulkMaxSize = 100 ) testServer := NewTestServer(func() Configuration { @@ -60,10 +60,10 @@ var _ = Context("Ledger engine tests", func() { }) When("creating a bulk on a ledger", func() { var ( - now = time.Now().Round(time.Microsecond).UTC() - items []components.V2BulkElement - err error - atomic bool + now = time.Now().Round(time.Microsecond).UTC() + items []components.V2BulkElement + err error + atomic, parallel bool ) BeforeEach(func() { items = []components.V2BulkElement{ @@ -106,6 +106,7 @@ var _ = Context("Ledger engine tests", func() { JustBeforeEach(func() { bulkResponse, err = CreateBulk(ctx, testServer.GetValue(), operations.V2CreateBulkRequest{ Atomic: pointer.For(atomic), + Parallel: pointer.For(parallel), RequestBody: items, Ledger: "default", }) @@ -176,6 +177,29 @@ var _ = Context("Ledger engine tests", func() { Expect(err).To(HaveErrorCode(string(components.V2ErrorsEnumBulkSizeExceeded))) }) }) + Context("with parallel", func() { + BeforeEach(func() { + parallel = true + items = make([]components.V2BulkElement, 0) + for i := 0; i < bulkMaxSize; i++ { + items = append(items, components.CreateV2BulkElementCreateTransaction(components.V2BulkElementCreateTransaction{ + Data: &components.V2PostTransaction{ + Metadata: map[string]string{}, + Postings: []components.V2Posting{{ + Amount: big.NewInt(100), + Asset: "USD/2", + Destination: "bank", + Source: "world", + }}, + Timestamp: &now, + }, + })) + } + }) + It("should be ok", func() { + Expect(err).To(BeNil()) + }) + }) }) When("creating a bulk with an error on a ledger", func() { var ( diff --git a/tools/generator/go.mod b/tools/generator/go.mod index abd649151..5467b81c9 100644 --- a/tools/generator/go.mod +++ b/tools/generator/go.mod @@ -20,6 +20,7 @@ require ( require ( dario.cat/mergo v1.0.1 // indirect github.com/ThreeDotsLabs/watermill v1.4.1 // indirect + github.com/alitto/pond v1.9.2 // indirect github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect