Skip to content

Commit

Permalink
feat(bulk): add parallel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 25, 2024
1 parent dc778de commit 7e1b49c
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ Accept: application/json
|ledger|path|string|true|Name of the ledger.|
|continueOnFailure|query|boolean|false|Continue on failure|
|atomic|query|boolean|false|Make bulk atomic|
|parallel|query|boolean|false|Process bulk elements in parallel|
|body|body|[V2Bulk](#schemav2bulk)|false|none|

> Example responses
Expand Down
6 changes: 6 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,12 @@ paths:
schema:
type: boolean
example: true
- name: parallel
in: query
description: Process bulk elements in parallel
schema:
type: boolean
example: true
requestBody:
content:
application/json:
Expand Down
6 changes: 6 additions & 0 deletions openapi/v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ paths:
schema:
type: boolean
example: true
- name: parallel
in: query
description: Process bulk elements in parallel
schema:
type: boolean
example: true
requestBody:
content:
application/json:
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/.speakeasy/gen.lock
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
lockVersion: 2.0.0
id: a9ac79e1-e429-4ee3-96c4-ec973f19bec3
management:
docChecksum: 4a4a3929b808f3192cbb2f02351bc186
docChecksum: 2624238aba49e6a33f19ef1d62f0b568
docVersion: v1
speakeasyVersion: 1.351.0
generationVersion: 2.384.1
releaseVersion: 0.4.33
configChecksum: 22f33d29f62599fa892d20fe5d13f4cc
releaseVersion: 0.4.34
configChecksum: 44b98e4f6380b040c4360085974a2b3f
features:
go:
additionalDependencies: 0.1.0
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/.speakeasy/gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ generation:
auth:
oAuth2ClientCredentialsEnabled: true
go:
version: 0.4.33
version: 0.4.34
additionalDependencies: {}
allowUnknownFieldsInWeakUnions: false
clientServerStatusCodesAsErrors: true
Expand Down
1 change: 1 addition & 0 deletions pkg/client/docs/models/operations/v2createbulkrequest.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
| `Ledger` | *string* | :heavy_check_mark: | Name of the ledger. | ledger001 |
| `ContinueOnFailure` | **bool* | :heavy_minus_sign: | Continue on failure | true |
| `Atomic` | **bool* | :heavy_minus_sign: | Make bulk atomic | true |
| `Parallel` | **bool* | :heavy_minus_sign: | Process bulk elements in parallel | true |
| `RequestBody` | [][components.V2BulkElement](../../models/components/v2bulkelement.md) | :heavy_minus_sign: | N/A | |
1 change: 1 addition & 0 deletions pkg/client/docs/sdks/v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ func main() {
Ledger: "ledger001",
ContinueOnFailure: client.Bool(true),
Atomic: client.Bool(true),
Parallel: client.Bool(true),
RequestBody: []components.V2BulkElement{
components.CreateV2BulkElementV2BulkElementCreateTransaction(
components.V2BulkElementCreateTransaction{
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/formance.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func New(opts ...SDKOption) *Formance {
sdkConfiguration: sdkConfiguration{
Language: "go",
OpenAPIDocVersion: "v1",
SDKVersion: "0.4.33",
SDKVersion: "0.4.34",
GenVersion: "2.384.1",
UserAgent: "speakeasy-sdk/go 0.4.33 2.384.1 v1 github.com/formancehq/ledger/pkg/client",
UserAgent: "speakeasy-sdk/go 0.4.34 2.384.1 v1 github.com/formancehq/ledger/pkg/client",
Hooks: hooks.New(),
},
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/client/models/operations/v2createbulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ type V2CreateBulkRequest struct {
// Continue on failure
ContinueOnFailure *bool `queryParam:"style=form,explode=true,name=continueOnFailure"`
// Make bulk atomic
Atomic *bool `queryParam:"style=form,explode=true,name=atomic"`
Atomic *bool `queryParam:"style=form,explode=true,name=atomic"`
// Process bulk elements in parallel
Parallel *bool `queryParam:"style=form,explode=true,name=parallel"`
RequestBody []components.V2BulkElement `request:"mediaType=application/json"`
}

Expand All @@ -37,6 +39,13 @@ func (o *V2CreateBulkRequest) GetAtomic() *bool {
return o.Atomic
}

func (o *V2CreateBulkRequest) GetParallel() *bool {
if o == nil {
return nil
}
return o.Parallel
}

func (o *V2CreateBulkRequest) GetRequestBody() []components.V2BulkElement {
if o == nil {
return nil
Expand Down
34 changes: 29 additions & 5 deletions test/e2e/api_bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var _ = Context("Ledger engine tests", func() {
ctx = logging.TestingContext()
events chan *nats.Msg
bulkResponse []components.V2BulkElementResult
bulkMaxSize = 5
bulkMaxSize = 100
)

testServer := NewTestServer(func() Configuration {
Expand All @@ -60,10 +60,10 @@ var _ = Context("Ledger engine tests", func() {
})
When("creating a bulk on a ledger", func() {
var (
now = time.Now().Round(time.Microsecond).UTC()
items []components.V2BulkElement
err error
atomic bool
now = time.Now().Round(time.Microsecond).UTC()
items []components.V2BulkElement
err error
atomic, parallel bool
)
BeforeEach(func() {
items = []components.V2BulkElement{
Expand Down Expand Up @@ -106,6 +106,7 @@ var _ = Context("Ledger engine tests", func() {
JustBeforeEach(func() {
bulkResponse, err = CreateBulk(ctx, testServer.GetValue(), operations.V2CreateBulkRequest{
Atomic: pointer.For(atomic),
Parallel: pointer.For(parallel),
RequestBody: items,
Ledger: "default",
})
Expand Down Expand Up @@ -176,6 +177,29 @@ var _ = Context("Ledger engine tests", func() {
Expect(err).To(HaveErrorCode(string(components.V2ErrorsEnumBulkSizeExceeded)))
})
})
Context("with parallel", func() {
BeforeEach(func() {
parallel = true
items = make([]components.V2BulkElement, 0)
for i := 0; i < bulkMaxSize; i++ {
items = append(items, components.CreateV2BulkElementCreateTransaction(components.V2BulkElementCreateTransaction{
Data: &components.V2PostTransaction{
Metadata: map[string]string{},
Postings: []components.V2Posting{{
Amount: big.NewInt(100),
Asset: "USD/2",
Destination: "bank",
Source: "world",
}},
Timestamp: &now,
},
}))
}
})
It("should be ok", func() {
Expect(err).To(BeNil())
})
})
})
When("creating a bulk with an error on a ledger", func() {
var (
Expand Down
1 change: 1 addition & 0 deletions tools/generator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
require (
dario.cat/mergo v1.0.1 // indirect
github.com/ThreeDotsLabs/watermill v1.4.1 // indirect
github.com/alitto/pond v1.9.2 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
Expand Down

0 comments on commit 7e1b49c

Please sign in to comment.