Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make updates atomic for SQLite #3296

Merged
merged 6 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions internal/backends/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
type Collection interface {
Query(context.Context, *QueryParams) (*QueryResult, error)
InsertAll(context.Context, *InsertAllParams) (*InsertAllResult, error)
Update(context.Context, *UpdateParams) (*UpdateResult, error)
UpdateAll(context.Context, *UpdateAllParams) (*UpdateAllResult, error)
DeleteAll(context.Context, *DeleteAllParams) (*DeleteAllResult, error)
Explain(context.Context, *ExplainParams) (*ExplainResult, error)
}
Expand Down Expand Up @@ -91,7 +91,7 @@ type InsertAllParams struct {
// InsertAllResult represents the results of Collection.InsertAll method.
type InsertAllResult struct{}

// InsertAll inserts all or none documents into the collection.
// InsertAll inserts documents into the collection.
//
// The operation should be atomic.
// If some documents cannot be inserted, the operation should be rolled back,
Expand All @@ -115,25 +115,34 @@ func (cc *collectionContract) InsertAll(ctx context.Context, params *InsertAllPa
return res, err
}

// UpdateParams represents the parameters of Collection.Update method.
type UpdateParams struct {
// that should be []*types.Document
// TODO https://github.com/FerretDB/FerretDB/issues/3079
Docs *types.Array
// UpdateAllParams represents the parameters of Collection.Update method.
type UpdateAllParams struct {
Docs []*types.Document
}

// UpdateResult represents the results of Collection.Update method.
type UpdateResult struct {
// UpdateAllResult represents the results of Collection.Update method.
type UpdateAllResult struct {
Updated int32
}

// Update updates documents in collection.
// UpdateAll updates documents in collection.
//
// The operation should be atomic.
// If some documents cannot be updated, the operation should be rolled back,
// and the first encountered error should be returned.
//
// All documents are expected to be valid and include _id fields.
// They will be frozen.
//
// Database or collection may not exist; that's not an error.
func (cc *collectionContract) Update(ctx context.Context, params *UpdateParams) (*UpdateResult, error) {
func (cc *collectionContract) UpdateAll(ctx context.Context, params *UpdateAllParams) (*UpdateAllResult, error) {
defer observability.FuncCall(ctx)()

res, err := cc.c.Update(ctx, params)
for _, doc := range params.Docs {
doc.Freeze()
}

res, err := cc.c.UpdateAll(ctx, params)
checkError(err)

return res, err
Expand Down
6 changes: 4 additions & 2 deletions internal/backends/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
// 2. Backend objects are stateful.
// Database objects are almost stateless but should be Close()'d to avoid connection leaks.
// Collection objects are fully stateless.
// 3. Contexts are per-operation and should not be stored.
// 4. Errors returned by methods could be nil, [*Error], or some other opaque error type.
// 3. The Backend maintains the list of databases and collections.
// It is recommended that it does so by not querying the information_schema or equivalent often.
// 4. Contexts are per-operation and should not be stored.
// 5. Errors returned by methods could be nil, [*Error], or some other opaque error type.
// *Error values can't be wrapped or be present anywhere in the error chain.
// Contracts enforce *Error codes; they are not documented in the code comments
// but are visible in the contract's code (to avoid duplication).
Expand Down
4 changes: 2 additions & 2 deletions internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
panic("not implemented")
}

// Update implements backends.Collection interface.
func (c *collection) Update(ctx context.Context, params *backends.UpdateParams) (*backends.UpdateResult, error) {
// UpdateAll implements backends.Collection interface.
func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllParams) (*backends.UpdateAllResult, error) {
panic("not implemented")
}

Expand Down
57 changes: 25 additions & 32 deletions internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
"github.com/FerretDB/FerretDB/internal/backends"
"github.com/FerretDB/FerretDB/internal/backends/sqlite/metadata"
"github.com/FerretDB/FerretDB/internal/handlers/sjson"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/fsql"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
)
Expand Down Expand Up @@ -118,55 +116,50 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
return new(backends.InsertAllResult), nil
}

// Update implements backends.Collection interface.
func (c *collection) Update(ctx context.Context, params *backends.UpdateParams) (*backends.UpdateResult, error) {
// UpdateAll implements backends.Collection interface.
func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllParams) (*backends.UpdateAllResult, error) {
db := c.r.DatabaseGetExisting(ctx, c.dbName)
if db == nil {
return nil, lazyerrors.Errorf("no database %q", c.dbName)
}

var res backends.UpdateResult
var res backends.UpdateAllResult
meta := c.r.CollectionGet(ctx, c.dbName, c.name)
if meta == nil {
return &res, nil
}

q := fmt.Sprintf(`UPDATE %q SET %s = ? WHERE %s = ?`, meta.TableName, metadata.DefaultColumn, metadata.IDColumn)

iter := params.Docs.Iterator()
defer iter.Close()

for {
_, d, err := iter.Next()
if errors.Is(err, iterator.ErrIteratorDone) {
break
}
err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
for _, doc := range params.Docs {
b, err := sjson.Marshal(doc)
if err != nil {
return lazyerrors.Error(err)
}

if err != nil {
return nil, lazyerrors.Error(err)
}
id, _ := doc.Get("_id")
must.NotBeZero(id)

doc, ok := d.(*types.Document)
if !ok {
panic(fmt.Sprintf("expected document, got %T", d))
}
arg := string(must.NotFail(sjson.MarshalSingleValue(id)))

id, _ := doc.Get("_id")
must.NotBeZero(id)
docArg := string(must.NotFail(sjson.Marshal(doc)))
idArg := string(must.NotFail(sjson.MarshalSingleValue(id)))
r, err := tx.ExecContext(ctx, q, string(b), arg)
if err != nil {
return lazyerrors.Error(err)
}

r, err := db.ExecContext(ctx, q, docArg, idArg)
if err != nil {
return nil, lazyerrors.Error(err)
}
ra, err := r.RowsAffected()
if err != nil {
return lazyerrors.Error(err)
}

ra, err := r.RowsAffected()
if err != nil {
return nil, lazyerrors.Error(err)
res.Updated += int32(ra)
}

res.Updated += int32(ra)
return nil
})
if err != nil {
return nil, lazyerrors.Error(err)
}

return &res, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (db *database) RenameCollection(ctx context.Context, params *backends.Renam

// Stats implements backends.Database interface.
//
// If the database does not exist, it returns *backends.DBStatsResult filled with zeros for all the fields.
// If the database does not exist, it returns *backends.StatsResult filled with zeros for all the fields.
func (db *database) Stats(ctx context.Context, params *backends.StatsParams) (*backends.StatsResult, error) {
stats := new(backends.StatsResult)

Expand Down
19 changes: 7 additions & 12 deletions internal/backends/sqlite/metadata/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Pool struct {
// so no validation is needed.
// One exception is very long full path names for the filesystem,
// but we don't check it.
func openDB(name, uri string, singleConn bool, l *zap.Logger) (*fsql.DB, error) {
func openDB(name, uri string, memory bool, l *zap.Logger) (*fsql.DB, error) {
db, err := sql.Open("sqlite", uri)
if err != nil {
return nil, lazyerrors.Error(err)
Expand All @@ -77,8 +77,10 @@ func openDB(name, uri string, singleConn bool, l *zap.Logger) (*fsql.DB, error)
db.SetConnMaxIdleTime(0)
db.SetConnMaxLifetime(0)

// TODO https://github.com/FerretDB/FerretDB/issues/2755
if singleConn {
// Each connection to in-memory database uses its own database.
// See https://www.sqlite.org/inmemorydb.html.
// We don't want that.
if memory {
db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)
}
Expand Down Expand Up @@ -123,7 +125,7 @@ func New(u string, l *zap.Logger) (*Pool, map[string]*fsql.DB, error) {

p.l.Debug("Opening existing database.", zap.String("name", name), zap.String("uri", uri))

db, err := openDB(name, uri, p.singleConn(), l)
db, err := openDB(name, uri, p.memory(), l)
if err != nil {
p.Close()
return nil, nil, lazyerrors.Error(err)
Expand All @@ -140,13 +142,6 @@ func (p *Pool) memory() bool {
return p.uri.Query().Get("mode") == "memory"
}

// singleConn returns true if pool size must be limited to a single connection.
func (p *Pool) singleConn() bool {
// https://www.sqlite.org/inmemorydb.html
// TODO https://github.com/FerretDB/FerretDB/issues/2755
return p.memory()
}

// databaseName returns database name for given database file path.
func (p *Pool) databaseName(databaseFile string) string {
return strings.TrimSuffix(filepath.Base(databaseFile), filenameExtension)
Expand Down Expand Up @@ -233,7 +228,7 @@ func (p *Pool) GetOrCreate(ctx context.Context, name string) (*fsql.DB, bool, er
}

uri := p.databaseURI(name)
db, err := openDB(name, uri, p.singleConn(), p.l)
db, err := openDB(name, uri, p.memory(), p.l)
if err != nil {
return nil, false, lazyerrors.Errorf("%s: %w", uri, err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/backends/sqlite/metadata/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ func TestDefaults(t *testing.T) {
}
require.NoError(t, rows.Err())
require.NoError(t, rows.Close())
require.Contains(t, options, "THREADSAFE=1")
require.Contains(t, options, "ENABLE_DBSTAT_VTAB")

require.Contains(t, options, "THREADSAFE=1") // for it to work with database/sql
require.Contains(t, options, "ENABLE_DBSTAT_VTAB") // for dbStats/collStats/etc

for q, expected := range map[string]string{
"SELECT sqlite_version()": "3.41.2",
Expand Down
3 changes: 1 addition & 2 deletions internal/backends/sqlite/metadata/pool/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func parseURI(u string) (*url.URL, error) {

values := uri.Query()

// https://www.sqlite.org/inmemorydb.html
// TODO https://github.com/FerretDB/FerretDB/issues/2755
// it is deprecated and interacts weirdly with database/sql.Pool
if values.Get("cache") == "shared" {
return nil, fmt.Errorf(`shared cache is not supported`)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
)

// DeleteParams represents parameters for the delete command.
//
//nolint:vet // for readability
type DeleteParams struct {
DB string `ferretdb:"$db"`
Collection string `ferretdb:"collection"`
Expand All @@ -37,6 +39,8 @@ type DeleteParams struct {
}

// Delete represents single delete operation parameters.
//
//nolint:vet // for readability
type Delete struct {
Filter *types.Document `ferretdb:"q"`
Limited bool `ferretdb:"limit,zeroOrOneAsBool"`
Expand Down
22 changes: 13 additions & 9 deletions internal/handlers/common/update_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"github.com/FerretDB/FerretDB/internal/types"
)

// UpdatesParams represents parameters for the update command.
type UpdatesParams struct {
DB string `ferretdb:"$db"`
Collection string `ferretdb:"collection"`
Updates []UpdateParams `ferretdb:"updates"`
// UpdateParams represents parameters for the update command.
//
//nolint:vet // for readability
type UpdateParams struct {
DB string `ferretdb:"$db"`
Collection string `ferretdb:"collection"`
Updates []Update `ferretdb:"updates"`

Comment string `ferretdb:"comment,opt"`

Expand All @@ -37,8 +39,10 @@ type UpdatesParams struct {
LSID any `ferretdb:"lsid,ignored"`
}

// UpdateParams represents a single update operation parameters.
type UpdateParams struct {
// Update represents a single update operation parameters.
//
//nolint:vet // for readability
type Update struct {
// TODO https://github.com/FerretDB/FerretDB/issues/2627
// get comment from query, e.g. db.collection.UpdateOne({"_id":"string", "$comment: "test"},{$set:{"v":"foo""}})
Filter *types.Document `ferretdb:"q,opt"`
Expand All @@ -54,8 +58,8 @@ type UpdateParams struct {
}

// GetUpdateParams returns parameters for update command.
func GetUpdateParams(document *types.Document, l *zap.Logger) (*UpdatesParams, error) {
var params UpdatesParams
func GetUpdateParams(document *types.Document, l *zap.Logger) (*UpdateParams, error) {
var params UpdateParams

err := commonparams.ExtractParams(document, "update", &params, l)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions internal/handlers/sqlite/msg_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, lazyerrors.Error(err)
}

// TODO https://github.com/FerretDB/FerretDB/issues/2612
_ = params.Ordered

matched, modified, upserted, err := h.updateDocument(ctx, params)
if err != nil {
return nil, lazyerrors.Error(err)
Expand All @@ -66,7 +69,7 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}

// updateDocument iterate through all documents in collection and update them.
func (h *Handler) updateDocument(ctx context.Context, params *common.UpdatesParams) (int32, int32, *types.Array, error) {
func (h *Handler) updateDocument(ctx context.Context, params *common.UpdateParams) (int32, int32, *types.Array, error) {
var matched, modified int32
var upserted types.Array

Expand Down Expand Up @@ -214,7 +217,7 @@ func (h *Handler) updateDocument(ctx context.Context, params *common.UpdatesPara
continue
}

updateRes, err := c.Update(ctx, &backends.UpdateParams{Docs: must.NotFail(types.NewArray(doc))})
updateRes, err := c.UpdateAll(ctx, &backends.UpdateAllParams{Docs: []*types.Document{doc}})
if err != nil {
return 0, 0, nil, lazyerrors.Error(err)
}
Expand Down