Skip to content

Commit

Permalink
Make batch sizes configurable (#4149)
Browse files Browse the repository at this point in the history
Closes #3708.
  • Loading branch information
kropidlowsky authored Mar 19, 2024
1 parent aea56ea commit 60fcd09
Show file tree
Hide file tree
Showing 25 changed files with 93 additions and 66 deletions.
2 changes: 2 additions & 0 deletions cmd/ferretdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ var cli struct {
} `embed:"" prefix:"capped-cleanup-"`

EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."`
BatchSize int `default:"100" help:"Experimental: maximum insertion batch size."`

Telemetry struct {
URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."`
Expand Down Expand Up @@ -412,6 +413,7 @@ func run() {
CappedCleanupInterval: cli.Test.CappedCleanup.Interval,
CappedCleanupPercentage: cli.Test.CappedCleanup.Percentage,
EnableNewAuth: cli.Test.EnableNewAuth,
BatchSize: cli.Test.BatchSize,
},
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions integration/setup/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger, opts *
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: true,
BatchSize: *batchSizeF,
},
}
h, closeBackend, err := registry.NewHandler(handler, handlerOpts)
Expand Down
2 changes: 2 additions & 0 deletions integration/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var (
mysqlURLF = flag.String("mysql-url", "", "in-process FerretDB: MySQL URL for 'mysql' handler.")
hanaURLF = flag.String("hana-url", "", "in-process FerretDB: Hana URL for 'hana' handler.")

batchSizeF = flag.Int("batch-size", 100, "maximum insertion batch size")

compatURLF = flag.String("compat-url", "", "compat system's (MongoDB) URL for compatibility tests; if empty, they are skipped")

benchDocsF = flag.Int("bench-docs", 1000, "benchmarks: number of documents to generate per iteration")
Expand Down
8 changes: 5 additions & 3 deletions internal/backends/hana/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
URI string
L *zap.Logger
P *state.Provider
BatchSize int
}

// NewBackend creates a new Backend.
Expand All @@ -50,6 +51,7 @@ func NewBackend(params *NewBackendParams) (backends.Backend, error) {
}

hdb := fsql.WrapDB(db, "hana", params.L)
hdb.BatchSize = params.BatchSize

return backends.BackendContract(&backend{
hdb: hdb,
Expand Down
14 changes: 8 additions & 6 deletions internal/backends/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ func testBackends(t *testing.T) map[string]*testBackend {
require.NoError(t, err)

b, err := postgresql.NewBackend(&postgresql.NewBackendParams{
URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""),
L: l.Named("postgresql"),
P: sp,
URI: testutil.TestPostgreSQLURI(t, context.TODO(), ""),
L: l.Named("postgresql"),
P: sp,
BatchSize: 1000,
})
require.NoError(t, err)
t.Cleanup(b.Close)
Expand All @@ -71,9 +72,10 @@ func testBackends(t *testing.T) map[string]*testBackend {
require.NoError(t, err)

b, err := sqlite.NewBackend(&sqlite.NewBackendParams{
URI: testutil.TestSQLiteURI(t, ""),
L: l.Named("sqlite"),
P: sp,
URI: testutil.TestSQLiteURI(t, ""),
L: l.Named("sqlite"),
P: sp,
BatchSize: 100,
})
require.NoError(t, err)
t.Cleanup(b.Close)
Expand Down
11 changes: 6 additions & 5 deletions internal/backends/postgresql/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
}

err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize
if batchSize < 1 {
panic("batch-size should be greater or equal to 1")
}

var batch []*types.Document
docs := params.Docs
Expand Down
7 changes: 4 additions & 3 deletions internal/backends/postgresql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func TestDatabaseStats(t *testing.T) {
require.NoError(t, err)

params := NewBackendParams{
URI: testutil.TestPostgreSQLURI(t, ctx, ""),
L: testutil.Logger(t),
P: sp,
URI: testutil.TestPostgreSQLURI(t, ctx, ""),
L: testutil.Logger(t),
P: sp,
BatchSize: 1000,
}
b, err := NewBackend(&params)
require.NoError(t, err)
Expand Down
12 changes: 7 additions & 5 deletions internal/backends/postgresql/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ var specialCharacters = regexp.MustCompile("[^a-z][^a-z0-9_]*")
//
//nolint:vet // for readability
type Registry struct {
p *pool.Pool
l *zap.Logger
p *pool.Pool
l *zap.Logger
BatchSize int

// rw protects colls but also acts like a global lock for the whole registry.
// The latter effectively replaces transactions (see the postgresql backend package description for more info).
Expand All @@ -86,15 +87,16 @@ type Registry struct {
}

// NewRegistry creates a registry for PostgreSQL databases with a given base URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
p: p,
l: l,
BatchSize: batchSize,
}

return r, nil
Expand Down
4 changes: 2 additions & 2 deletions internal/backends/postgresql/metadata/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func createDatabase(t *testing.T, ctx context.Context) (*Registry, *pgxpool.Pool
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(u, testutil.Logger(t), sp)
r, err := NewRegistry(u, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -140,7 +140,7 @@ func TestAuth(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(tc.uri, testutil.Logger(t), sp)
r, err := NewRegistry(tc.uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
11 changes: 6 additions & 5 deletions internal/backends/sqlite/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
meta := c.r.CollectionGet(ctx, c.dbName, c.name)

err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize
if batchSize < 1 {
panic("batch-size should be greater or equal to 1")
}

var batch []*types.Document
docs := params.Docs
Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestCappedCollectionInsertAllQueryExplain(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp})
b, err := NewBackend(&NewBackendParams{URI: testutil.TestSQLiteURI(t, ""), L: testutil.Logger(t), P: sp, BatchSize: 100})
require.NoError(t, err)
t.Cleanup(b.Close)

Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestDatabaseStatsFreeStorage(t *testing.T) {
name, params := name, params
t.Run(name, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp})
b, err := NewBackend(&NewBackendParams{URI: uri, L: testutil.Logger(t), P: sp, BatchSize: 100})
require.NoError(t, err)

t.Cleanup(b.Close)
Expand Down
14 changes: 8 additions & 6 deletions internal/backends/sqlite/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ const (
//
// Exported methods are safe for concurrent use. Unexported methods are not.
type Registry struct {
p *pool.Pool
l *zap.Logger
p *pool.Pool
l *zap.Logger
BatchSize int

// rw protects colls but also acts like a global lock for the whole registry.
// The latter effectively replaces transactions (see the sqlite backend package description for more info).
Expand All @@ -69,16 +70,17 @@ type Registry struct {
}

// NewRegistry creates a registry for SQLite databases in the directory specified by SQLite URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, initDBs, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
colls: map[string]map[string]*Collection{},
p: p,
l: l,
BatchSize: batchSize,
colls: map[string]map[string]*Collection{},
}

for name, db := range initDBs {
Expand Down
12 changes: 6 additions & 6 deletions internal/backends/sqlite/metadata/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestCreateDropStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -158,7 +158,7 @@ func TestCreateSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -280,7 +280,7 @@ func TestCreateDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -329,7 +329,7 @@ func TestIndexesCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCollectionsStats(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
1 change: 1 addition & 0 deletions internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type NewOpts struct {
CappedCleanupInterval time.Duration
CappedCleanupPercentage uint8
EnableNewAuth bool
BatchSize int
}

// New returns a new handler.
Expand Down
9 changes: 3 additions & 6 deletions internal/handler/msg_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,10 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,

var done bool
for !done {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 1000
docs := make([]*types.Document, 0, h.BatchSize)
docsIndexes := make([]int, 0, h.BatchSize)

docs := make([]*types.Document, 0, batchSize)
docsIndexes := make([]int, 0, batchSize)

for j := 0; j < batchSize; j++ {
for j := 0; j < h.BatchSize; j++ {
var i int
var d any

Expand Down
8 changes: 5 additions & 3 deletions internal/handler/registry/hana.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ func init() {
opts.Logger.Warn("HANA handler is in alpha. It is not supported yet.")

b, err := hana.NewBackend(&hana.NewBackendParams{
URI: opts.HANAURL,
L: opts.Logger.Named("hana"),
P: opts.StateProvider,
URI: opts.HANAURL,
L: opts.Logger.Named("hana"),
P: opts.StateProvider,
BatchSize: opts.BatchSize,
})
if err != nil {
return nil, nil, err
Expand All @@ -48,6 +49,7 @@ func init() {
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: opts.EnableNewAuth,
BatchSize: opts.BatchSize,
}

h, err := handler.New(handlerOpts)
Expand Down
1 change: 1 addition & 0 deletions internal/handler/registry/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func init() {
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: opts.EnableNewAuth,
BatchSize: opts.BatchSize,
}

h, err := handler.New(handlerOpts)
Expand Down
Loading

0 comments on commit 60fcd09

Please sign in to comment.