Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batch sizes configurable #4149

Merged
merged 22 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
kropidlowsky committed Mar 5, 2024
commit 2c51cabf9741b1016ef174df4b18a226ae38eedf
6 changes: 6 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ tasks:
--handler=sqlite
--sqlite-url=file:tmp/sqlite/
--test-records-dir=tmp/records
--test-batch-size=1000

run-mysql:
desc: "Run FerretDB with `mysql` backend"
Expand All @@ -407,6 +408,7 @@ tasks:
--mode=diff-normal
--handler=hana
--test-records-dir=tmp/records
--test-batch-size=100

run-secured:
desc: "Run FerretDB with `postgresql` backend (TLS, auth required)"
Expand All @@ -424,6 +426,7 @@ tasks:
--handler=pg
--postgresql-url='postgres://127.0.0.1:5433/ferretdb?search_path='
--test-records-dir=tmp/records
--test-batch-size=100

run-proxy:
desc: "Run FerretDB in diff-proxy mode"
Expand All @@ -437,6 +440,7 @@ tasks:
--handler=pg
--postgresql-url='postgres://username@127.0.0.1:5432/ferretdb?search_path='
--test-records-dir=tmp/records
--test-batch-size=100

run-sqlite-proxy:
desc: "Run FerretDB with `sqlite` handler in diff-proxy mode"
Expand All @@ -451,6 +455,7 @@ tasks:
--handler=sqlite
--sqlite-url=file:tmp/sqlite/
--test-records-dir=tmp/records
--test-batch-size=1000

run-proxy-secured:
desc: "Run FerretDB in diff-proxy mode (TLS, auth required)"
Expand All @@ -471,6 +476,7 @@ tasks:
--handler=pg
--postgresql-url='postgres://username@127.0.0.1:5433/ferretdb?search_path='
--test-records-dir=tmp/records
--test-batch-size=100

lint:
desc: "Run linters"
Expand Down
2 changes: 1 addition & 1 deletion cmd/ferretdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ var cli struct {
} `embed:"" prefix:"capped-cleanup-"`

EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication."`
BatchSize int `default:"100" help:"number of maximum size of query parameters"`
BatchSize int `default:"100" help:"number of maximum size of query parameters"`

Telemetry struct {
URL string `default:"https://beacon.ferretdb.com/" help:"Telemetry: reporting URL."`
Expand Down
8 changes: 5 additions & 3 deletions internal/backends/hana/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
URI string
L *zap.Logger
P *state.Provider
BatchSize int
}

// NewBackend creates a new Backend.
Expand All @@ -50,6 +51,7 @@ func NewBackend(params *NewBackendParams) (backends.Backend, error) {
}

hdb := fsql.WrapDB(db, "hana", params.L)
hdb.BatchSize = params.BatchSize

return backends.BackendContract(&backend{
hdb: hdb,
Expand Down
2 changes: 1 addition & 1 deletion internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
}

err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error {
batchSize := c.r.BatchSize()
batchSize := c.r.BatchSize
kropidlowsky marked this conversation as resolved.
Show resolved Hide resolved

var batch []*types.Document
docs := params.Docs
Expand Down
20 changes: 6 additions & 14 deletions internal/backends/postgresql/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ var specialCharacters = regexp.MustCompile("[^a-z][^a-z0-9_]*")
//
//nolint:vet // for readability
type Registry struct {
p *pool.Pool
l *zap.Logger
p *pool.Pool
l *zap.Logger
BatchSize int

// rw protects colls but also acts like a global lock for the whole registry.
// The latter effectively replaces transactions (see the postgresql backend package description for more info).
// One global lock should be replaced by more granular locks – one per database or even one per collection.
// But that requires some redesign.
// TODO https://github.com/FerretDB/FerretDB/issues/2755
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
batchSize int
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
}

// NewRegistry creates a registry for PostgreSQL databases with a given base URI.
Expand All @@ -96,7 +96,7 @@ func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*R
r := &Registry{
p: p,
l: l,
batchSize: batchSize,
BatchSize: batchSize,
}

return r, nil
Expand Down Expand Up @@ -1034,14 +1034,6 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) {
}
}

// BatchSize returns number of maximum size of query parameters.
func (r *Registry) BatchSize() int {
r.rw.RLock()
batchSize := r.batchSize
r.rw.RUnlock()
return batchSize
}

// check interfaces
var (
_ prometheus.Collector = (*Registry)(nil)
Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
meta := c.r.CollectionGet(ctx, c.dbName, c.name)

err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
batchSize := c.r.BatchSize()
batchSize := c.r.BatchSize
kropidlowsky marked this conversation as resolved.
Show resolved Hide resolved

var batch []*types.Document
docs := params.Docs
Expand Down
20 changes: 6 additions & 14 deletions internal/backends/sqlite/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ const (
//
// Exported methods are safe for concurrent use. Unexported methods are not.
type Registry struct {
p *pool.Pool
l *zap.Logger
p *pool.Pool
l *zap.Logger
BatchSize int

// rw protects colls but also acts like a global lock for the whole registry.
// The latter effectively replaces transactions (see the sqlite backend package description for more info).
// One global lock should be replaced by more granular locks – one per database or even one per collection.
// But that requires some redesign.
// TODO https://github.com/FerretDB/FerretDB/issues/2755
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
batchSize int
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
}

// NewRegistry creates a registry for SQLite databases in the directory specified by SQLite URI.
Expand All @@ -79,8 +79,8 @@ func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*R
r := &Registry{
p: p,
l: l,
BatchSize: batchSize,
colls: map[string]map[string]*Collection{},
batchSize: batchSize,
}

for name, db := range initDBs {
Expand Down Expand Up @@ -642,14 +642,6 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) {
}
}

// BatchSize returns number of maximum size of query parameters.
func (r *Registry) BatchSize() int {
r.rw.RLock()
batchSize := r.batchSize
r.rw.RUnlock()
return batchSize
}

// check interfaces
var (
_ prometheus.Collector = (*Registry)(nil)
Expand Down
1 change: 0 additions & 1 deletion internal/handler/msg_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,

var done bool
for !done {

docs := make([]*types.Document, 0, h.BatchSize)
docsIndexes := make([]int, 0, h.BatchSize)

Expand Down
7 changes: 4 additions & 3 deletions internal/handler/registry/hana.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ func init() {
opts.Logger.Warn("HANA handler is in alpha. It is not supported yet.")

b, err := hana.NewBackend(&hana.NewBackendParams{
URI: opts.HANAURL,
L: opts.Logger.Named("hana"),
P: opts.StateProvider,
URI: opts.HANAURL,
L: opts.Logger.Named("hana"),
P: opts.StateProvider,
BatchSize: opts.BatchSize,
})
if err != nil {
return nil, nil, err
Expand Down
7 changes: 4 additions & 3 deletions internal/util/fsql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import (
type DB struct {
*metricsCollector

sqlDB *sql.DB
l *zap.Logger
token *resource.Token
sqlDB *sql.DB
l *zap.Logger
token *resource.Token
BatchSize int
}

// WrapDB creates a new DB.
Expand Down