Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
kropidlowsky committed Mar 4, 2024
1 parent 1bf9901 commit b228093
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 41 deletions.
11 changes: 6 additions & 5 deletions internal/backends/postgresql/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
}

err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize()

var batch []*types.Document
docs := params.Docs
Expand Down
20 changes: 15 additions & 5 deletions internal/backends/postgresql/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,22 @@ type Registry struct {
// One global lock should be replaced by more granular locks – one per database or even one per collection.
// But that requires some redesign.
// TODO https://github.com/FerretDB/FerretDB/issues/2755
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
batchSize int
}

// NewRegistry creates a registry for PostgreSQL databases with a given base URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
p: p,
l: l,
batchSize: batchSize,
}

return r, nil
Expand Down Expand Up @@ -1032,6 +1034,14 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) {
}
}

// BatchSize returns number of maximum size of query parameters.
func (r *Registry) BatchSize() int {
r.rw.RLock()
batchSize := r.batchSize
r.rw.RUnlock()
return batchSize
}

// check interfaces
var (
_ prometheus.Collector = (*Registry)(nil)
Expand Down
4 changes: 2 additions & 2 deletions internal/backends/postgresql/metadata/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func createDatabase(t *testing.T, ctx context.Context) (*Registry, *pgxpool.Pool
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(u, testutil.Logger(t), sp)
r, err := NewRegistry(u, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -140,7 +140,7 @@ func TestAuth(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(tc.uri, testutil.Logger(t), sp)
r, err := NewRegistry(tc.uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
11 changes: 6 additions & 5 deletions internal/backends/sqlite/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ type backend struct {
//
//nolint:vet // for readability
type NewBackendParams struct {
URI string
L *zap.Logger
P *state.Provider
_ struct{} // prevent unkeyed literals
URI string
L *zap.Logger
P *state.Provider
BatchSize int
_ struct{} // prevent unkeyed literals
}

// NewBackend creates a new Backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
r, err := metadata.NewRegistry(params.URI, params.L, params.P)
r, err := metadata.NewRegistry(params.URI, params.BatchSize, params.L, params.P)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
meta := c.r.CollectionGet(ctx, c.dbName, c.name)

err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
// TODO https://github.com/FerretDB/FerretDB/issues/3708
const batchSize = 100
batchSize := c.r.BatchSize()

var batch []*types.Document
docs := params.Docs
Expand Down
22 changes: 16 additions & 6 deletions internal/backends/sqlite/metadata/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,23 @@ type Registry struct {
// One global lock should be replaced by more granular locks – one per database or even one per collection.
// But that requires some redesign.
// TODO https://github.com/FerretDB/FerretDB/issues/2755
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
rw sync.RWMutex
colls map[string]map[string]*Collection // database name -> collection name -> collection
batchSize int
}

// NewRegistry creates a registry for SQLite databases in the directory specified by SQLite URI.
func NewRegistry(u string, l *zap.Logger, sp *state.Provider) (*Registry, error) {
func NewRegistry(u string, batchSize int, l *zap.Logger, sp *state.Provider) (*Registry, error) {
p, initDBs, err := pool.New(u, l, sp)
if err != nil {
return nil, err
}

r := &Registry{
p: p,
l: l,
colls: map[string]map[string]*Collection{},
p: p,
l: l,
colls: map[string]map[string]*Collection{},
batchSize: batchSize,
}

for name, db := range initDBs {
Expand Down Expand Up @@ -640,6 +642,14 @@ func (r *Registry) Collect(ch chan<- prometheus.Metric) {
}
}

// BatchSize returns number of maximum size of query parameters.
func (r *Registry) BatchSize() int {
r.rw.RLock()
batchSize := r.batchSize
r.rw.RUnlock()
return batchSize
}

// check interfaces
var (
_ prometheus.Collector = (*Registry)(nil)
Expand Down
12 changes: 6 additions & 6 deletions internal/backends/sqlite/metadata/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestCreateDropStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -158,7 +158,7 @@ func TestCreateSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -280,7 +280,7 @@ func TestCreateDropSameStress(t *testing.T) {
} {
t.Run(testName, func(t *testing.T) {
uri := testutil.TestSQLiteURI(t, "") + params
r, err := NewRegistry(uri, testutil.Logger(t), sp)
r, err := NewRegistry(uri, 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down Expand Up @@ -329,7 +329,7 @@ func TestIndexesCreateDrop(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCollectionsStats(t *testing.T) {
sp, err := state.NewProvider("")
require.NoError(t, err)

r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), testutil.Logger(t), sp)
r, err := metadata.NewRegistry(testutil.TestSQLiteURI(t, ""), 100, testutil.Logger(t), sp)
require.NoError(t, err)
t.Cleanup(r.Close)

Expand Down
2 changes: 1 addition & 1 deletion internal/handler/registry/hana.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func init() {
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: opts.EnableNewAuth,
BatchSize: opts.BatchSize
BatchSize: opts.BatchSize,
}

h, err := handler.New(handlerOpts)
Expand Down
7 changes: 4 additions & 3 deletions internal/handler/registry/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
func init() {
registry["postgresql"] = func(opts *NewHandlerOpts) (*handler.Handler, CloseBackendFunc, error) {
b, err := postgresql.NewBackend(&postgresql.NewBackendParams{
URI: opts.PostgreSQLURL,
L: opts.Logger.Named("postgresql"),
P: opts.StateProvider,
URI: opts.PostgreSQLURL,
L: opts.Logger.Named("postgresql"),
P: opts.StateProvider,
BatchSize: opts.BatchSize,
})
if err != nil {
return nil, nil, err
Expand Down
7 changes: 4 additions & 3 deletions internal/handler/registry/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
func init() {
registry["sqlite"] = func(opts *NewHandlerOpts) (*handler.Handler, CloseBackendFunc, error) {
b, err := sqlite.NewBackend(&sqlite.NewBackendParams{
URI: opts.SQLiteURL,
L: opts.Logger.Named("sqlite"),
P: opts.StateProvider,
URI: opts.SQLiteURL,
L: opts.Logger.Named("sqlite"),
P: opts.StateProvider,
BatchSize: opts.BatchSize,
})
if err != nil {
return nil, nil, err
Expand Down

0 comments on commit b228093

Please sign in to comment.