Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batch sizes configurable #4149

Merged
merged 22 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/ferretdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ var cli struct {
} `embed:"" prefix:"capped-cleanup-"`

EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."`
BatchSize int `default:"100" help:"Experimental: maximum insertion batch size."`

Telemetry struct {
URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."`
Expand Down Expand Up @@ -412,6 +413,7 @@ func run() {
CappedCleanupInterval: cli.Test.CappedCleanup.Interval,
CappedCleanupPercentage: cli.Test.CappedCleanup.Percentage,
EnableNewAuth: cli.Test.EnableNewAuth,
BatchSize: cli.Test.BatchSize,
},
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions integration/setup/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger, opts *
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: true,
BatchSize: *batchSizeF,
},
}
h, closeBackend, err := registry.NewHandler(handler, handlerOpts)
Expand Down
2 changes: 2 additions & 0 deletions integration/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var (
mysqlURLF = flag.String("mysql-url", "", "in-process FerretDB: MySQL URL for 'mysql' handler.")
hanaURLF = flag.String("hana-url", "", "in-process FerretDB: Hana URL for 'hana' handler.")

batchSizeF = flag.Int("batch-size", 100, "maximum insertion batch size")

compatURLF = flag.String("compat-url", "", "compat system's (MongoDB) URL for compatibility tests; if empty, they are skipped")

benchDocsF = flag.Int("bench-docs", 1000, "benchmarks: number of documents to generate per iteration")
Expand Down
8 changes: 5 additions & 3 deletions internal/backends/hana/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
URI string
L *zap.Logger
P *state.Provider
BatchSize int
}

// NewBackend creates a new Backend.
Expand All @@ -50,6 +51,7 @@
}

hdb := fsql.WrapDB(db, "hana", params.L)
hdb.BatchSize = params.BatchSize

Check warning on line 54 in internal/backends/hana/backend.go

View check run for this annotation

Codecov / codecov/patch

internal/backends/hana/backend.go#L54

Added line #L54 was not covered by tests

return backends.BackendContract(&backend{
hdb: hdb,
Expand Down
14 changes: 8 additions & 6 deletions internal/backends/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ func testBackends(t *testing.T) map[string]*testBackend {
require.NoError(t, err)

b, err := postgresql.NewBackend(&postgresql.NewBackendParams{
URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""),
L: l.Named("postgresql"),
P: sp,
URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""),
L: l.Named("postgresql"),
P: sp,
BatchSize: 1000,
})
require.NoError(t, err)
t.Cleanup(b.Close)
Expand All @@ -71,9 +72,10 @@ func testBackends(t *testing.T) map[string]*testBackend {
require.NoError(t, err)

b, err := sqlite.NewBackend(&sqlite.NewBackendParams{
URI: testutil.TestSQLiteURI(t, ""),
L: l.Named("sqlite"),
P: sp,
URI: testutil.TestSQLiteURI(t, ""),
L: l.Named("sqlite"),
P: sp,
BatchSize: 100,
})
require.NoError(t, err)
t.Cleanup(b.Close)
Expand Down
11 changes: 6 additions & 5 deletions internal/backends/postgresql/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@
}

err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize
kropidlowsky marked this conversation as resolved.
Show resolved Hide resolved
if batchSize < 1 {
panic("batch-size should be greater or equal to 1")

Check warning on line 139 in internal/backends/postgresql/collection.go

View check run for this annotation

Codecov / codecov/patch

internal/backends/postgresql/collection.go#L139

Added line #L139 was not covered by tests
}

var batch []*types.Document
docs := params.Docs
Expand Down
7 changes: 4 additions & 3 deletions internal/backends/postgresql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func TestDatabaseStats(t *testing.T) {
require.NoError(t, err)

params := NewBackendParams{
URI: testutil.TestPostgreSQLURI(t, ctx, ""),
L: testutil.Logger(t),
P: sp,
URI: testutil.TestPostgreSQLURI(t, ctx, ""),
L: testutil.Logger(t),
P: sp,
BatchSize: 1000,
}
b, err := NewBackend(&params)
require.NoError(t, err)
Expand Down
12 changes: 7 additions & 5 deletions internal/backends/postgresql/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ var specialCharacters = regexp.MustCompile("[^a-z][^a-z0-9_]*")
//
//nolint:vet // for readability
type Registry struct {
p *pool.Pool
l *zap.Logger
p *pool.Pool
l *zap.Logger
BatchSize int

// rw protects colls but also acts like a global lock for the whole registry.
// The latter effectively replaces transactions (see the postgresql backend package description for more info).
Expand All @@ -86,15 +87,16 @@ type Registry struct {
}

// NewRegistry creates a registry for PostgreSQL databases with a given base URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
p: p,
l: l,
BatchSize: batchSize,
}

return r, nil
Expand Down
4 changes: 2 additions & 2 deletions internal/backends/postgresql/metadata/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func createDatabase(t *testing.T, ctx context.Context) (*Registry, *pgxpool.Pool
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(u, testutil.Logger(t), sp)
r, err := NewRegistry(u, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -140,7 +140,7 @@ func TestAuth(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(tc.uri, testutil.Logger(t), sp)
r, err := NewRegistry(tc.uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
11 changes: 6 additions & 5 deletions internal/backends/sqlite/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@
meta := c.r.CollectionGet(ctx, c.dbName, c.name)

err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize
kropidlowsky marked this conversation as resolved.
Show resolved Hide resolved
if batchSize < 1 {
panic("batch-size should be greater or equal to 1")

Check warning on line 114 in internal/backends/sqlite/collection.go

View check run for this annotation

Codecov / codecov/patch

internal/backends/sqlite/collection.go#L114

Added line #L114 was not covered by tests
}

var batch []*types.Document
docs := params.Docs
Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestCappedCollectionInsertAllQueryExplain(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp})
b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp, BatchSize: 100})
require.NoError(t, err)
t.Cleanup(b.Close)

Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestDatabaseStatsFreeStorage(t *testing.T) {
name, params := name, params
t.Run(name, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp})
b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp, BatchSize: 100})
require.NoError(t, err)

t.Cleanup(b.Close)
Expand Down
14 changes: 8 additions & 6 deletions internal/backends/sqlite/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ const (
//
// Exported methods are safe for concurrent use. Unexported methods are not.
type Registry struct {
p *pool.Pool
l *zap.Logger
p *pool.Pool
l *zap.Logger
BatchSize int

// rw protects colls but also acts like a global lock for the whole registry.
// The latter effectively replaces transactions (see the sqlite backend package description for more info).
Expand All @@ -69,16 +70,17 @@ type Registry struct {
}

// NewRegistry creates a registry for SQLite databases in the directory specified by SQLite URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, initDBs, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
colls: map[string]map[string]*Collection{},
p: p,
l: l,
BatchSize: batchSize,
colls: map[string]map[string]*Collection{},
}

for name, db := range initDBs {
Expand Down
12 changes: 6 additions & 6 deletions internal/backends/sqlite/metadata/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestCreateDropStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -158,7 +158,7 @@ func TestCreateSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -280,7 +280,7 @@ func TestCreateDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -329,7 +329,7 @@ func TestIndexesCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCollectionsStats(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
1 change: 1 addition & 0 deletions internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type NewOpts struct {
CappedCleanupInterval time.Duration
CappedCleanupPercentage uint8
EnableNewAuth bool
BatchSize int
}

// New returns a new handler.
Expand Down
9 changes: 3 additions & 6 deletions internal/handler/msg_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,10 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,

var done bool
for !done {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 1000
docs := make([]*types.Document, 0, h.BatchSize)
docsIndexes := make([]int, 0, h.BatchSize)

docs := make([]*types.Document, 0, batchSize)
docsIndexes := make([]int, 0, batchSize)

for j := 0; j < batchSize; j++ {
for j := 0; j < h.BatchSize; j++ {
var i int
var d any

Expand Down
8 changes: 5 additions & 3 deletions internal/handler/registry/hana.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
opts.Logger.Warn("HANA handler is in alpha. It is not supported yet.")

b, err := hana.NewBackend(&hana.NewBackendParams{
URI: opts.HANAURL,
L: opts.Logger.Named("hana"),
P: opts.StateProvider,
URI: opts.HANAURL,
L: opts.Logger.Named("hana"),
P: opts.StateProvider,
BatchSize: opts.BatchSize,

Check warning on line 33 in internal/handler/registry/hana.go

View check run for this annotation

Codecov / codecov/patch

internal/handler/registry/hana.go#L30-L33

Added lines #L30 - L33 were not covered by tests
})
if err != nil {
return nil, nil, err
Expand All @@ -48,6 +49,7 @@
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: opts.EnableNewAuth,
BatchSize: opts.BatchSize,

Check warning on line 52 in internal/handler/registry/hana.go

View check run for this annotation

Codecov / codecov/patch

internal/handler/registry/hana.go#L52

Added line #L52 was not covered by tests
}

h, err := handler.New(handlerOpts)
Expand Down
1 change: 1 addition & 0 deletions internal/handler/registry/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: opts.EnableNewAuth,
BatchSize: opts.BatchSize,

Check warning on line 48 in internal/handler/registry/mysql.go

View check run for this annotation

Codecov / codecov/patch

internal/handler/registry/mysql.go#L48

Added line #L48 was not covered by tests
}

h, err := handler.New(handlerOpts)
Expand Down
Loading
Loading