Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batch sizes configurable #4149

Merged
merged 22 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
kropidlowsky committed Mar 4, 2024
commit b22809393940a14f2375c84609647e26865936e3
11 changes: 6 additions & 5 deletions internal/backends/postgresql/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
}

err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize()

var batch []*types.Document
docs := params.Docs
Expand Down
20 changes: 15 additions & 5 deletions internal/backends/postgresql/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,22 @@ type Registry struct {
// One global lock should be replaced by more granular locks – one per database or even one per collection.
// But that requires some redesign.
// TODO https://github.com/FerretDB/FerretDB/issues/2755
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
batchSize int
}

// NewRegistry creates a registry for PostgreSQL databases with a given base URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
p: p,
l: l,
batchSize: batchSize,
}

return r, nil
Expand Down Expand Up @@ -1032,6 +1034,14 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) {
}
}

// BatchSize returns number of maximum size of query parameters.
func (r *Registry) BatchSize() int {
r.rw.RLock()
batchSize := r.batchSize
r.rw.RUnlock()
return batchSize
}

// check interfaces
var (
_ prometheus.Collector = (*Registry)(nil)
Expand Down
4 changes: 2 additions & 2 deletions internal/backends/postgresql/metadata/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func createDatabase(t *testing.T, ctx context.Context) (*Registry, *pgxpool.Pool
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(u, testutil.Logger(t), sp)
r, err := NewRegistry(u, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -140,7 +140,7 @@ func TestAuth(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(tc.uri, testutil.Logger(t), sp)
r, err := NewRegistry(tc.uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
11 changes: 6 additions & 5 deletions internal/backends/sqlite/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
meta := c.r.CollectionGet(ctx, c.dbName, c.name)

err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize()

var batch []*types.Document
docs := params.Docs
Expand Down
22 changes: 16 additions & 6 deletions internal/backends/sqlite/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,23 @@ type Registry struct {
// One global lock should be replaced by more granular locks – one per database or even one per collection.
// But that requires some redesign.
// TODO https://github.com/FerretDB/FerretDB/issues/2755
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
batchSize int
}

// NewRegistry creates a registry for SQLite databases in the directory specified by SQLite URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, initDBs, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
colls: map[string]map[string]*Collection{},
p: p,
l: l,
colls: map[string]map[string]*Collection{},
batchSize: batchSize,
}

for name, db := range initDBs {
Expand Down Expand Up @@ -640,6 +642,14 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) {
}
}

// BatchSize returns number of maximum size of query parameters.
func (r *Registry) BatchSize() int {
r.rw.RLock()
batchSize := r.batchSize
r.rw.RUnlock()
return batchSize
}

// check interfaces
var (
_ prometheus.Collector = (*Registry)(nil)
Expand Down
12 changes: 6 additions & 6 deletions internal/backends/sqlite/metadata/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestCreateDropStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -158,7 +158,7 @@ func TestCreateSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -280,7 +280,7 @@ func TestCreateDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -329,7 +329,7 @@ func TestIndexesCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCollectionsStats(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
2 changes: 1 addition & 1 deletion internal/handler/registry/hana.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func init() {
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: opts.EnableNewAuth,
BatchSize: opts.BatchSize
BatchSize: opts.BatchSize,
}

h, err := handler.New(handlerOpts)
Expand Down
7 changes: 4 additions & 3 deletions internal/handler/registry/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
func init() {
registry["postgresql"] = func(opts *NewHandlerOpts) (*handler.Handler, CloseBackendFunc, error) {
b, err := postgresql.NewBackend(&postgresql.NewBackendParams{
URI: opts.PostgreSQLURL,
L: opts.Logger.Named("postgresql"),
P: opts.StateProvider,
URI: opts.PostgreSQLURL,
L: opts.Logger.Named("postgresql"),
P: opts.StateProvider,
BatchSize: opts.BatchSize,
})
if err != nil {
return nil, nil, err
Expand Down
7 changes: 4 additions & 3 deletions internal/handler/registry/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
func init() {
registry["sqlite"] = func(opts *NewHandlerOpts) (*handler.Handler, CloseBackendFunc, error) {
b, err := sqlite.NewBackend(&sqlite.NewBackendParams{
URI: opts.SQLiteURL,
L: opts.Logger.Named("sqlite"),
P: opts.StateProvider,
URI: opts.SQLiteURL,
L: opts.Logger.Named("sqlite"),
P: opts.StateProvider,
BatchSize: opts.BatchSize,
})
if err != nil {
return nil, nil, err
Expand Down