diff --git a/internal/backends/collection.go b/internal/backends/collection.go index 97fc706cb8fd..31f679729757 100644 --- a/internal/backends/collection.go +++ b/internal/backends/collection.go @@ -34,7 +34,7 @@ import ( type Collection interface { Query(context.Context, *QueryParams) (*QueryResult, error) InsertAll(context.Context, *InsertAllParams) (*InsertAllResult, error) - Update(context.Context, *UpdateParams) (*UpdateResult, error) + UpdateAll(context.Context, *UpdateAllParams) (*UpdateAllResult, error) DeleteAll(context.Context, *DeleteAllParams) (*DeleteAllResult, error) Explain(context.Context, *ExplainParams) (*ExplainResult, error) } @@ -91,7 +91,7 @@ type InsertAllParams struct { // InsertAllResult represents the results of Collection.InsertAll method. type InsertAllResult struct{} -// InsertAll inserts all or none documents into the collection. +// InsertAll inserts documents into the collection. // // The operation should be atomic. // If some documents cannot be inserted, the operation should be rolled back, @@ -115,25 +115,34 @@ func (cc *collectionContract) InsertAll(ctx context.Context, params *InsertAllPa return res, err } -// UpdateParams represents the parameters of Collection.Update method. -type UpdateParams struct { - // that should be []*types.Document - // TODO https://github.com/FerretDB/FerretDB/issues/3079 - Docs *types.Array +// UpdateAllParams represents the parameters of Collection.Update method. +type UpdateAllParams struct { + Docs []*types.Document } -// UpdateResult represents the results of Collection.Update method. -type UpdateResult struct { +// UpdateAllResult represents the results of Collection.Update method. +type UpdateAllResult struct { Updated int32 } -// Update updates documents in collection. +// UpdateAll updates documents in collection. +// +// The operation should be atomic. +// If some documents cannot be updated, the operation should be rolled back, +// and the first encountered error should be returned. +// +// All documents are expected to be valid and include _id fields. +// They will be frozen. // // Database or collection may not exist; that's not an error. -func (cc *collectionContract) Update(ctx context.Context, params *UpdateParams) (*UpdateResult, error) { +func (cc *collectionContract) UpdateAll(ctx context.Context, params *UpdateAllParams) (*UpdateAllResult, error) { defer observability.FuncCall(ctx)() - res, err := cc.c.Update(ctx, params) + for _, doc := range params.Docs { + doc.Freeze() + } + + res, err := cc.c.UpdateAll(ctx, params) checkError(err) return res, err diff --git a/internal/backends/doc.go b/internal/backends/doc.go index f0e51d61f18b..8b3995fb01ed 100644 --- a/internal/backends/doc.go +++ b/internal/backends/doc.go @@ -27,8 +27,10 @@ // 2. Backend objects are stateful. // Database objects are almost stateless but should be Close()'d to avoid connection leaks. // Collection objects are fully stateless. -// 3. Contexts are per-operation and should not be stored. -// 4. Errors returned by methods could be nil, [*Error], or some other opaque error type. +// 3. The Backend maintains the list of databases and collections. +// It is recommended that it does so by not querying the information_schema or equivalent often. +// 4. Contexts are per-operation and should not be stored. +// 5. Errors returned by methods could be nil, [*Error], or some other opaque error type. // *Error values can't be wrapped or be present anywhere in the error chain. // Contracts enforce *Error codes; they are not documented in the code comments // but are visible in the contract's code (to avoid duplication). diff --git a/internal/backends/postgresql/collection.go b/internal/backends/postgresql/collection.go index dcea8e09ecf0..8164d617977b 100644 --- a/internal/backends/postgresql/collection.go +++ b/internal/backends/postgresql/collection.go @@ -44,8 +44,8 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa panic("not implemented") } -// Update implements backends.Collection interface. -func (c *collection) Update(ctx context.Context, params *backends.UpdateParams) (*backends.UpdateResult, error) { +// UpdateAll implements backends.Collection interface. +func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllParams) (*backends.UpdateAllResult, error) { panic("not implemented") } diff --git a/internal/backends/sqlite/collection.go b/internal/backends/sqlite/collection.go index 4c615a1c35f8..590d8b041309 100644 --- a/internal/backends/sqlite/collection.go +++ b/internal/backends/sqlite/collection.go @@ -26,9 +26,7 @@ import ( "github.com/FerretDB/FerretDB/internal/backends" "github.com/FerretDB/FerretDB/internal/backends/sqlite/metadata" "github.com/FerretDB/FerretDB/internal/handlers/sjson" - "github.com/FerretDB/FerretDB/internal/types" "github.com/FerretDB/FerretDB/internal/util/fsql" - "github.com/FerretDB/FerretDB/internal/util/iterator" "github.com/FerretDB/FerretDB/internal/util/lazyerrors" "github.com/FerretDB/FerretDB/internal/util/must" ) @@ -118,14 +116,14 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa return new(backends.InsertAllResult), nil } -// Update implements backends.Collection interface. -func (c *collection) Update(ctx context.Context, params *backends.UpdateParams) (*backends.UpdateResult, error) { +// UpdateAll implements backends.Collection interface. +func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllParams) (*backends.UpdateAllResult, error) { db := c.r.DatabaseGetExisting(ctx, c.dbName) if db == nil { return nil, lazyerrors.Errorf("no database %q", c.dbName) } - var res backends.UpdateResult + var res backends.UpdateAllResult meta := c.r.CollectionGet(ctx, c.dbName, c.name) if meta == nil { return &res, nil @@ -133,40 +131,35 @@ func (c *collection) Update(ctx context.Context, params *backends.UpdateParams) q := fmt.Sprintf(`UPDATE %q SET %s = ? WHERE %s = ?`, meta.TableName, metadata.DefaultColumn, metadata.IDColumn) - iter := params.Docs.Iterator() - defer iter.Close() - - for { - _, d, err := iter.Next() - if errors.Is(err, iterator.ErrIteratorDone) { - break - } + err := db.InTransaction(ctx, func(tx *fsql.Tx) error { + for _, doc := range params.Docs { + b, err := sjson.Marshal(doc) + if err != nil { + return lazyerrors.Error(err) + } - if err != nil { - return nil, lazyerrors.Error(err) - } + id, _ := doc.Get("_id") + must.NotBeZero(id) - doc, ok := d.(*types.Document) - if !ok { - panic(fmt.Sprintf("expected document, got %T", d)) - } + arg := string(must.NotFail(sjson.MarshalSingleValue(id))) - id, _ := doc.Get("_id") - must.NotBeZero(id) - docArg := string(must.NotFail(sjson.Marshal(doc))) - idArg := string(must.NotFail(sjson.MarshalSingleValue(id))) + r, err := tx.ExecContext(ctx, q, string(b), arg) + if err != nil { + return lazyerrors.Error(err) + } - r, err := db.ExecContext(ctx, q, docArg, idArg) - if err != nil { - return nil, lazyerrors.Error(err) - } + ra, err := r.RowsAffected() + if err != nil { + return lazyerrors.Error(err) + } - ra, err := r.RowsAffected() - if err != nil { - return nil, lazyerrors.Error(err) + res.Updated += int32(ra) } - res.Updated += int32(ra) + return nil + }) + if err != nil { + return nil, lazyerrors.Error(err) } return &res, nil diff --git a/internal/backends/sqlite/database.go b/internal/backends/sqlite/database.go index 93f913e927c4..a3430321bfa1 100644 --- a/internal/backends/sqlite/database.go +++ b/internal/backends/sqlite/database.go @@ -105,7 +105,7 @@ func (db *database) RenameCollection(ctx context.Context, params *backends.Renam // Stats implements backends.Database interface. // -// If the database does not exist, it returns *backends.DBStatsResult filled with zeros for all the fields. +// If the database does not exist, it returns *backends.StatsResult filled with zeros for all the fields. func (db *database) Stats(ctx context.Context, params *backends.StatsParams) (*backends.StatsResult, error) { stats := new(backends.StatsResult) diff --git a/internal/backends/sqlite/metadata/pool/pool.go b/internal/backends/sqlite/metadata/pool/pool.go index e6ece43f3192..3296ac31cb28 100644 --- a/internal/backends/sqlite/metadata/pool/pool.go +++ b/internal/backends/sqlite/metadata/pool/pool.go @@ -68,7 +68,7 @@ type Pool struct { // so no validation is needed. // One exception is very long full path names for the filesystem, // but we don't check it. -func openDB(name, uri string, singleConn bool, l *zap.Logger) (*fsql.DB, error) { +func openDB(name, uri string, memory bool, l *zap.Logger) (*fsql.DB, error) { db, err := sql.Open("sqlite", uri) if err != nil { return nil, lazyerrors.Error(err) @@ -77,8 +77,10 @@ func openDB(name, uri string, singleConn bool, l *zap.Logger) (*fsql.DB, error) db.SetConnMaxIdleTime(0) db.SetConnMaxLifetime(0) - // TODO https://github.com/FerretDB/FerretDB/issues/2755 - if singleConn { + // Each connection to in-memory database uses its own database. + // See https://www.sqlite.org/inmemorydb.html. + // We don't want that. + if memory { db.SetMaxIdleConns(1) db.SetMaxOpenConns(1) } @@ -123,7 +125,7 @@ func New(u string, l *zap.Logger) (*Pool, map[string]*fsql.DB, error) { p.l.Debug("Opening existing database.", zap.String("name", name), zap.String("uri", uri)) - db, err := openDB(name, uri, p.singleConn(), l) + db, err := openDB(name, uri, p.memory(), l) if err != nil { p.Close() return nil, nil, lazyerrors.Error(err) @@ -140,13 +142,6 @@ func (p *Pool) memory() bool { return p.uri.Query().Get("mode") == "memory" } -// singleConn returns true if pool size must be limited to a single connection. -func (p *Pool) singleConn() bool { - // https://www.sqlite.org/inmemorydb.html - // TODO https://github.com/FerretDB/FerretDB/issues/2755 - return p.memory() -} - // databaseName returns database name for given database file path. func (p *Pool) databaseName(databaseFile string) string { return strings.TrimSuffix(filepath.Base(databaseFile), filenameExtension) @@ -233,7 +228,7 @@ func (p *Pool) GetOrCreate(ctx context.Context, name string) (*fsql.DB, bool, er } uri := p.databaseURI(name) - db, err := openDB(name, uri, p.singleConn(), p.l) + db, err := openDB(name, uri, p.memory(), p.l) if err != nil { return nil, false, lazyerrors.Errorf("%s: %w", uri, err) } diff --git a/internal/backends/sqlite/metadata/pool/pool_test.go b/internal/backends/sqlite/metadata/pool/pool_test.go index ac1520f76ef6..ec004d46e762 100644 --- a/internal/backends/sqlite/metadata/pool/pool_test.go +++ b/internal/backends/sqlite/metadata/pool/pool_test.go @@ -163,8 +163,9 @@ func TestDefaults(t *testing.T) { } require.NoError(t, rows.Err()) require.NoError(t, rows.Close()) - require.Contains(t, options, "THREADSAFE=1") - require.Contains(t, options, "ENABLE_DBSTAT_VTAB") + + require.Contains(t, options, "THREADSAFE=1") // for it to work with database/sql + require.Contains(t, options, "ENABLE_DBSTAT_VTAB") // for dbStats/collStats/etc for q, expected := range map[string]string{ "SELECT sqlite_version()": "3.41.2", diff --git a/internal/backends/sqlite/metadata/pool/uri.go b/internal/backends/sqlite/metadata/pool/uri.go index 1754435fd9db..643a591cf76d 100644 --- a/internal/backends/sqlite/metadata/pool/uri.go +++ b/internal/backends/sqlite/metadata/pool/uri.go @@ -55,8 +55,7 @@ func parseURI(u string) (*url.URL, error) { values := uri.Query() - // https://www.sqlite.org/inmemorydb.html - // TODO https://github.com/FerretDB/FerretDB/issues/2755 + // it is deprecated and interacts weirdly with database/sql.Pool if values.Get("cache") == "shared" { return nil, fmt.Errorf(`shared cache is not supported`) } diff --git a/internal/handlers/common/delete.go b/internal/handlers/common/delete_params.go similarity index 96% rename from internal/handlers/common/delete.go rename to internal/handlers/common/delete_params.go index 9f6dc8fe9925..e62b364c70a5 100644 --- a/internal/handlers/common/delete.go +++ b/internal/handlers/common/delete_params.go @@ -22,6 +22,8 @@ import ( ) // DeleteParams represents parameters for the delete command. +// +//nolint:vet // for readability type DeleteParams struct { DB string `ferretdb:"$db"` Collection string `ferretdb:"collection"` @@ -37,6 +39,8 @@ type DeleteParams struct { } // Delete represents single delete operation parameters. +// +//nolint:vet // for readability type Delete struct { Filter *types.Document `ferretdb:"q"` Limited bool `ferretdb:"limit,zeroOrOneAsBool"` diff --git a/internal/handlers/common/update_params.go b/internal/handlers/common/update_params.go index b320c5a5f681..1d21cd489761 100644 --- a/internal/handlers/common/update_params.go +++ b/internal/handlers/common/update_params.go @@ -21,11 +21,13 @@ import ( "github.com/FerretDB/FerretDB/internal/types" ) -// UpdatesParams represents parameters for the update command. -type UpdatesParams struct { - DB string `ferretdb:"$db"` - Collection string `ferretdb:"collection"` - Updates []UpdateParams `ferretdb:"updates"` +// UpdateParams represents parameters for the update command. +// +//nolint:vet // for readability +type UpdateParams struct { + DB string `ferretdb:"$db"` + Collection string `ferretdb:"collection"` + Updates []Update `ferretdb:"updates"` Comment string `ferretdb:"comment,opt"` @@ -37,8 +39,10 @@ type UpdatesParams struct { LSID any `ferretdb:"lsid,ignored"` } -// UpdateParams represents a single update operation parameters. -type UpdateParams struct { +// Update represents a single update operation parameters. +// +//nolint:vet // for readability +type Update struct { // TODO https://github.com/FerretDB/FerretDB/issues/2627 // get comment from query, e.g. db.collection.UpdateOne({"_id":"string", "$comment: "test"},{$set:{"v":"foo""}}) Filter *types.Document `ferretdb:"q,opt"` @@ -54,8 +58,8 @@ type UpdateParams struct { } // GetUpdateParams returns parameters for update command. -func GetUpdateParams(document *types.Document, l *zap.Logger) (*UpdatesParams, error) { - var params UpdatesParams +func GetUpdateParams(document *types.Document, l *zap.Logger) (*UpdateParams, error) { + var params UpdateParams err := commonparams.ExtractParams(document, "update", ¶ms, l) if err != nil { diff --git a/internal/handlers/sqlite/msg_update.go b/internal/handlers/sqlite/msg_update.go index 3b7228c81f59..76bd9422f5e2 100644 --- a/internal/handlers/sqlite/msg_update.go +++ b/internal/handlers/sqlite/msg_update.go @@ -41,6 +41,9 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, return nil, lazyerrors.Error(err) } + // TODO https://github.com/FerretDB/FerretDB/issues/2612 + _ = params.Ordered + matched, modified, upserted, err := h.updateDocument(ctx, params) if err != nil { return nil, lazyerrors.Error(err) @@ -66,7 +69,7 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, } // updateDocument iterate through all documents in collection and update them. -func (h *Handler) updateDocument(ctx context.Context, params *common.UpdatesParams) (int32, int32, *types.Array, error) { +func (h *Handler) updateDocument(ctx context.Context, params *common.UpdateParams) (int32, int32, *types.Array, error) { var matched, modified int32 var upserted types.Array @@ -214,7 +217,7 @@ func (h *Handler) updateDocument(ctx context.Context, params *common.UpdatesPara continue } - updateRes, err := c.Update(ctx, &backends.UpdateParams{Docs: must.NotFail(types.NewArray(doc))}) + updateRes, err := c.UpdateAll(ctx, &backends.UpdateAllParams{Docs: []*types.Document{doc}}) if err != nil { return 0, 0, nil, lazyerrors.Error(err) }