Skip to content

Commit

Permalink
Make updates atomic for SQLite (#3296)
Browse files Browse the repository at this point in the history
Closes #3079.
  • Loading branch information
AlekSi authored Sep 1, 2023
1 parent 2be310a commit de67252
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 76 deletions.
33 changes: 21 additions & 12 deletions internal/backends/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
type Collection interface {
Query(context.Context, *QueryParams) (*QueryResult, error)
InsertAll(context.Context, *InsertAllParams) (*InsertAllResult, error)
Update(context.Context, *UpdateParams) (*UpdateResult, error)
UpdateAll(context.Context, *UpdateAllParams) (*UpdateAllResult, error)
DeleteAll(context.Context, *DeleteAllParams) (*DeleteAllResult, error)
Explain(context.Context, *ExplainParams) (*ExplainResult, error)
}
Expand Down Expand Up @@ -91,7 +91,7 @@ type InsertAllParams struct {
// InsertAllResult represents the results of Collection.InsertAll method.
type InsertAllResult struct{}

// InsertAll inserts all or none documents into the collection.
// InsertAll inserts documents into the collection.
//
// The operation should be atomic.
// If some documents cannot be inserted, the operation should be rolled back,
Expand All @@ -115,25 +115,34 @@ func (cc *collectionContract) InsertAll(ctx context.Context, params *InsertAllPa
return res, err
}

// UpdateParams represents the parameters of Collection.Update method.
type UpdateParams struct {
// that should be []*types.Document
// TODO https://github.com/FerretDB/FerretDB/issues/3079
Docs *types.Array
// UpdateAllParams represents the parameters of Collection.Update method.
type UpdateAllParams struct {
Docs []*types.Document
}

// UpdateResult represents the results of Collection.Update method.
type UpdateResult struct {
// UpdateAllResult represents the results of Collection.Update method.
type UpdateAllResult struct {
Updated int32
}

// Update updates documents in collection.
// UpdateAll updates documents in collection.
//
// The operation should be atomic.
// If some documents cannot be updated, the operation should be rolled back,
// and the first encountered error should be returned.
//
// All documents are expected to be valid and include _id fields.
// They will be frozen.
//
// Database or collection may not exist; that's not an error.
func (cc *collectionContract) Update(ctx context.Context, params *UpdateParams) (*UpdateResult, error) {
func (cc *collectionContract) UpdateAll(ctx context.Context, params *UpdateAllParams) (*UpdateAllResult, error) {
defer observability.FuncCall(ctx)()

res, err := cc.c.Update(ctx, params)
for _, doc := range params.Docs {
doc.Freeze()
}

res, err := cc.c.UpdateAll(ctx, params)
checkError(err)

return res, err
Expand Down
6 changes: 4 additions & 2 deletions internal/backends/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
// 2. Backend objects are stateful.
// Database objects are almost stateless but should be Close()'d to avoid connection leaks.
// Collection objects are fully stateless.
// 3. Contexts are per-operation and should not be stored.
// 4. Errors returned by methods could be nil, [*Error], or some other opaque error type.
// 3. The Backend maintains the list of databases and collections.
// It is recommended that it does so by not querying the information_schema or equivalent often.
// 4. Contexts are per-operation and should not be stored.
// 5. Errors returned by methods could be nil, [*Error], or some other opaque error type.
// *Error values can't be wrapped or be present anywhere in the error chain.
// Contracts enforce *Error codes; they are not documented in the code comments
// but are visible in the contract's code (to avoid duplication).
Expand Down
4 changes: 2 additions & 2 deletions internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
panic("not implemented")
}

// Update implements backends.Collection interface.
func (c *collection) Update(ctx context.Context, params *backends.UpdateParams) (*backends.UpdateResult, error) {
// UpdateAll implements backends.Collection interface.
func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllParams) (*backends.UpdateAllResult, error) {
panic("not implemented")
}

Expand Down
57 changes: 25 additions & 32 deletions internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
"github.com/FerretDB/FerretDB/internal/backends"
"github.com/FerretDB/FerretDB/internal/backends/sqlite/metadata"
"github.com/FerretDB/FerretDB/internal/handlers/sjson"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/fsql"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
)
Expand Down Expand Up @@ -118,55 +116,50 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
return new(backends.InsertAllResult), nil
}

// Update implements backends.Collection interface.
func (c *collection) Update(ctx context.Context, params *backends.UpdateParams) (*backends.UpdateResult, error) {
// UpdateAll implements backends.Collection interface.
func (c *collection) UpdateAll(ctx context.Context, params *backends.UpdateAllParams) (*backends.UpdateAllResult, error) {
db := c.r.DatabaseGetExisting(ctx, c.dbName)
if db == nil {
return nil, lazyerrors.Errorf("no database %q", c.dbName)
}

var res backends.UpdateResult
var res backends.UpdateAllResult
meta := c.r.CollectionGet(ctx, c.dbName, c.name)
if meta == nil {
return &res, nil
}

q := fmt.Sprintf(`UPDATE %q SET %s = ? WHERE %s = ?`, meta.TableName, metadata.DefaultColumn, metadata.IDColumn)

iter := params.Docs.Iterator()
defer iter.Close()

for {
_, d, err := iter.Next()
if errors.Is(err, iterator.ErrIteratorDone) {
break
}
err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
for _, doc := range params.Docs {
b, err := sjson.Marshal(doc)
if err != nil {
return lazyerrors.Error(err)
}

if err != nil {
return nil, lazyerrors.Error(err)
}
id, _ := doc.Get("_id")
must.NotBeZero(id)

doc, ok := d.(*types.Document)
if !ok {
panic(fmt.Sprintf("expected document, got %T", d))
}
arg := string(must.NotFail(sjson.MarshalSingleValue(id)))

id, _ := doc.Get("_id")
must.NotBeZero(id)
docArg := string(must.NotFail(sjson.Marshal(doc)))
idArg := string(must.NotFail(sjson.MarshalSingleValue(id)))
r, err := tx.ExecContext(ctx, q, string(b), arg)
if err != nil {
return lazyerrors.Error(err)
}

r, err := db.ExecContext(ctx, q, docArg, idArg)
if err != nil {
return nil, lazyerrors.Error(err)
}
ra, err := r.RowsAffected()
if err != nil {
return lazyerrors.Error(err)
}

ra, err := r.RowsAffected()
if err != nil {
return nil, lazyerrors.Error(err)
res.Updated += int32(ra)
}

res.Updated += int32(ra)
return nil
})
if err != nil {
return nil, lazyerrors.Error(err)
}

return &res, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (db *database) RenameCollection(ctx context.Context, params *backends.Renam

// Stats implements backends.Database interface.
//
// If the database does not exist, it returns *backends.DBStatsResult filled with zeros for all the fields.
// If the database does not exist, it returns *backends.StatsResult filled with zeros for all the fields.
func (db *database) Stats(ctx context.Context, params *backends.StatsParams) (*backends.StatsResult, error) {
stats := new(backends.StatsResult)

Expand Down
19 changes: 7 additions & 12 deletions internal/backends/sqlite/metadata/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Pool struct {
// so no validation is needed.
// One exception is very long full path names for the filesystem,
// but we don't check it.
func openDB(name, uri string, singleConn bool, l *zap.Logger) (*fsql.DB, error) {
func openDB(name, uri string, memory bool, l *zap.Logger) (*fsql.DB, error) {
db, err := sql.Open("sqlite", uri)
if err != nil {
return nil, lazyerrors.Error(err)
Expand All @@ -77,8 +77,10 @@ func openDB(name, uri string, singleConn bool, l *zap.Logger) (*fsql.DB, error)
db.SetConnMaxIdleTime(0)
db.SetConnMaxLifetime(0)

// TODO https://github.com/FerretDB/FerretDB/issues/2755
if singleConn {
// Each connection to in-memory database uses its own database.
// See https://www.sqlite.org/inmemorydb.html.
// We don't want that.
if memory {
db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)
}
Expand Down Expand Up @@ -123,7 +125,7 @@ func New(u string, l *zap.Logger) (*Pool, map[string]*fsql.DB, error) {

p.l.Debug("Opening existing database.", zap.String("name", name), zap.String("uri", uri))

db, err := openDB(name, uri, p.singleConn(), l)
db, err := openDB(name, uri, p.memory(), l)
if err != nil {
p.Close()
return nil, nil, lazyerrors.Error(err)
Expand All @@ -140,13 +142,6 @@ func (p *Pool) memory() bool {
return p.uri.Query().Get("mode") == "memory"
}

// singleConn returns true if pool size must be limited to a single connection.
func (p *Pool) singleConn() bool {
// https://www.sqlite.org/inmemorydb.html
// TODO https://github.com/FerretDB/FerretDB/issues/2755
return p.memory()
}

// databaseName returns database name for given database file path.
func (p *Pool) databaseName(databaseFile string) string {
return strings.TrimSuffix(filepath.Base(databaseFile), filenameExtension)
Expand Down Expand Up @@ -233,7 +228,7 @@ func (p *Pool) GetOrCreate(ctx context.Context, name string) (*fsql.DB, bool, er
}

uri := p.databaseURI(name)
db, err := openDB(name, uri, p.singleConn(), p.l)
db, err := openDB(name, uri, p.memory(), p.l)
if err != nil {
return nil, false, lazyerrors.Errorf("%s: %w", uri, err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/backends/sqlite/metadata/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ func TestDefaults(t *testing.T) {
}
require.NoError(t, rows.Err())
require.NoError(t, rows.Close())
require.Contains(t, options, "THREADSAFE=1")
require.Contains(t, options, "ENABLE_DBSTAT_VTAB")

require.Contains(t, options, "THREADSAFE=1") // for it to work with database/sql
require.Contains(t, options, "ENABLE_DBSTAT_VTAB") // for dbStats/collStats/etc

for q, expected := range map[string]string{
"SELECT sqlite_version()": "3.41.2",
Expand Down
3 changes: 1 addition & 2 deletions internal/backends/sqlite/metadata/pool/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func parseURI(u string) (*url.URL, error) {

values := uri.Query()

// https://www.sqlite.org/inmemorydb.html
// TODO https://github.com/FerretDB/FerretDB/issues/2755
// it is deprecated and interacts weirdly with database/sql.Pool
if values.Get("cache") == "shared" {
return nil, fmt.Errorf(`shared cache is not supported`)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
)

// DeleteParams represents parameters for the delete command.
//
//nolint:vet // for readability
type DeleteParams struct {
DB string `ferretdb:"$db"`
Collection string `ferretdb:"collection"`
Expand All @@ -37,6 +39,8 @@ type DeleteParams struct {
}

// Delete represents single delete operation parameters.
//
//nolint:vet // for readability
type Delete struct {
Filter *types.Document `ferretdb:"q"`
Limited bool `ferretdb:"limit,zeroOrOneAsBool"`
Expand Down
22 changes: 13 additions & 9 deletions internal/handlers/common/update_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"github.com/FerretDB/FerretDB/internal/types"
)

// UpdatesParams represents parameters for the update command.
type UpdatesParams struct {
DB string `ferretdb:"$db"`
Collection string `ferretdb:"collection"`
Updates []UpdateParams `ferretdb:"updates"`
// UpdateParams represents parameters for the update command.
//
//nolint:vet // for readability
type UpdateParams struct {
DB string `ferretdb:"$db"`
Collection string `ferretdb:"collection"`
Updates []Update `ferretdb:"updates"`

Comment string `ferretdb:"comment,opt"`

Expand All @@ -37,8 +39,10 @@ type UpdatesParams struct {
LSID any `ferretdb:"lsid,ignored"`
}

// UpdateParams represents a single update operation parameters.
type UpdateParams struct {
// Update represents a single update operation parameters.
//
//nolint:vet // for readability
type Update struct {
// TODO https://github.com/FerretDB/FerretDB/issues/2627
// get comment from query, e.g. db.collection.UpdateOne({"_id":"string", "$comment: "test"},{$set:{"v":"foo""}})
Filter *types.Document `ferretdb:"q,opt"`
Expand All @@ -54,8 +58,8 @@ type UpdateParams struct {
}

// GetUpdateParams returns parameters for update command.
func GetUpdateParams(document *types.Document, l *zap.Logger) (*UpdatesParams, error) {
var params UpdatesParams
func GetUpdateParams(document *types.Document, l *zap.Logger) (*UpdateParams, error) {
var params UpdateParams

err := commonparams.ExtractParams(document, "update", &params, l)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions internal/handlers/sqlite/msg_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, lazyerrors.Error(err)
}

// TODO https://github.com/FerretDB/FerretDB/issues/2612
_ = params.Ordered

matched, modified, upserted, err := h.updateDocument(ctx, params)
if err != nil {
return nil, lazyerrors.Error(err)
Expand All @@ -66,7 +69,7 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}

// updateDocument iterate through all documents in collection and update them.
func (h *Handler) updateDocument(ctx context.Context, params *common.UpdatesParams) (int32, int32, *types.Array, error) {
func (h *Handler) updateDocument(ctx context.Context, params *common.UpdateParams) (int32, int32, *types.Array, error) {
var matched, modified int32
var upserted types.Array

Expand Down Expand Up @@ -214,7 +217,7 @@ func (h *Handler) updateDocument(ctx context.Context, params *common.UpdatesPara
continue
}

updateRes, err := c.Update(ctx, &backends.UpdateParams{Docs: must.NotFail(types.NewArray(doc))})
updateRes, err := c.UpdateAll(ctx, &backends.UpdateAllParams{Docs: []*types.Document{doc}})
if err != nil {
return 0, 0, nil, lazyerrors.Error(err)
}
Expand Down

0 comments on commit de67252

Please sign in to comment.