diff --git a/integration/insert_command_test.go b/integration/insert_command_test.go index d57910f6c534..c07f3d0a3ff9 100644 --- a/integration/insert_command_test.go +++ b/integration/insert_command_test.go @@ -109,6 +109,17 @@ func TestInsertCommandErrors(t *testing.T) { Message: "The '_id' value cannot be of type regex", }, }, + "InsertDuplicateID": { + toInsert: []any{ + bson.D{{"_id", "foo"}, {"_id", "bar"}}, + }, + ordered: false, + werr: &mongo.WriteError{ + Code: 2, + Message: "can't have multiple _id fields in one document", + }, + altMessage: `invalid key: "_id" (duplicate keys are not allowed)`, + }, } { name, tc := name, tc t.Run(name, func(t *testing.T) { diff --git a/integration/insert_compat_test.go b/integration/insert_compat_test.go index 87f06fff69ae..d71e7aa3be10 100644 --- a/integration/insert_compat_test.go +++ b/integration/insert_compat_test.go @@ -200,15 +200,37 @@ func TestInsertCompat(t *testing.T) { }, ordered: true, }, - "UnorderedOneError": { + "UnorderedTwoErrors": { insert: []any{ bson.D{{"_id", "1"}}, - bson.D{{"_id", "1"}}, // to test duplicate key error + bson.D{{"_id", "1"}}, bson.D{{"_id", primitive.Regex{Pattern: "^regex$", Options: "i"}}}, bson.D{{"_id", "2"}}, }, ordered: false, }, + "OrderedThreeErrors": { + insert: []any{ + bson.D{{"_id", "1"}}, + bson.D{{"_id", primitive.Regex{Pattern: "^regex$", Options: "i"}}}, + bson.D{{"_id", "2"}}, + bson.D{{"_id", "1"}}, + bson.D{{"_id", "3"}}, + bson.D{{"_id", "4"}, {"_id", "4"}}, + }, + ordered: true, + }, + "UnorderedThreeErrors": { + insert: []any{ + bson.D{{"_id", "1"}}, + bson.D{{"_id", primitive.Regex{Pattern: "^regex$", Options: "i"}}}, + bson.D{{"_id", "2"}}, + bson.D{{"_id", "1"}}, + bson.D{{"_id", "3"}}, + bson.D{{"_id", "4"}, {"_id", "4"}}, + }, + ordered: false, + }, } testInsertCompat(t, testCases) diff --git a/internal/backends/postgresql/collection.go b/internal/backends/postgresql/collection.go index 5a4ec0ae5f9e..1b90353cac9b 100644 --- a/internal/backends/postgresql/collection.go +++ b/internal/backends/postgresql/collection.go @@ -128,33 +128,22 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa } err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error { - for _, doc := range params.Docs { - var b []byte - b, err = sjson.Marshal(doc) - if err != nil { - return lazyerrors.Error(err) - } + var batch []*types.Document + docs := params.Docs + const batchSize = 100 - // use batches: INSERT INTO %s %s VALUES (?), (?), (?), ... up to, say, 100 documents - // TODO https://github.com/FerretDB/FerretDB/issues/3271 - q := fmt.Sprintf( - `INSERT INTO %s (%s) VALUES ($1)`, - pgx.Identifier{c.dbName, meta.TableName}.Sanitize(), - metadata.DefaultColumn, - ) + for len(docs) > 0 { + i := min(batchSize, len(docs)) + batch, docs = docs[:i], docs[i:] + var q string var args []any - if meta.Capped() { - q = fmt.Sprintf( - `INSERT INTO %s (%s, %s) VALUES ($1, $2)`, - pgx.Identifier{c.dbName, meta.TableName}.Sanitize(), - metadata.RecordIDColumn, - metadata.DefaultColumn, - ) - args = append(args, doc.RecordID()) + + q, args, err = prepareInsertStatement(c.dbName, meta.TableName, meta.Capped(), batch) + if err != nil { + return lazyerrors.Error(err) } - args = append(args, string(b)) if _, err = tx.Exec(ctx, q, args...); err != nil { var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code == pgerrcode.UniqueViolation { diff --git a/internal/backends/postgresql/insert.go b/internal/backends/postgresql/insert.go new file mode 100644 index 000000000000..02d45ac9ddcf --- /dev/null +++ b/internal/backends/postgresql/insert.go @@ -0,0 +1,65 @@ +// Copyright 2021 FerretDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgresql + +import ( + "fmt" + "strings" + + "github.com/jackc/pgx/v5" + + "github.com/FerretDB/FerretDB/internal/backends/postgresql/metadata" + "github.com/FerretDB/FerretDB/internal/handlers/sjson" + "github.com/FerretDB/FerretDB/internal/types" + "github.com/FerretDB/FerretDB/internal/util/lazyerrors" +) + +// prepareInsertStatement returns a statement and arguments for inserting the given documents. +// +// If capped is true, it returns a statement and arguments for inserting record IDs and documents. +func prepareInsertStatement(schema, tableName string, capped bool, docs []*types.Document) (string, []any, error) { + var placeholder metadata.Placeholder + var args []any + rows := make([]string, len(docs)) + + for i, doc := range docs { + b, err := sjson.Marshal(doc) + if err != nil { + return "", nil, lazyerrors.Error(err) + } + + if capped { + rows[i] = "(" + placeholder.Next() + ", " + placeholder.Next() + ")" + args = append(args, doc.RecordID(), string(b)) + + continue + } + + rows[i] = "(" + placeholder.Next() + ")" + args = append(args, string(b)) + } + + columns := metadata.DefaultColumn + if capped { + columns = strings.Join([]string{metadata.RecordIDColumn, metadata.DefaultColumn}, ", ") + } + + return fmt.Sprintf( + `INSERT INTO %s (%s) VALUES %s`, + pgx.Identifier{schema, tableName}.Sanitize(), + columns, + strings.Join(rows, ", "), + ), args, nil +} diff --git a/internal/backends/sqlite/collection.go b/internal/backends/sqlite/collection.go index dc2c63132fef..c802c3d5d4f3 100644 --- a/internal/backends/sqlite/collection.go +++ b/internal/backends/sqlite/collection.go @@ -112,28 +112,19 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa meta := c.r.CollectionGet(ctx, c.dbName, c.name) err := db.InTransaction(ctx, func(tx *fsql.Tx) error { - for _, doc := range params.Docs { - b, err := sjson.Marshal(doc) + var batch []*types.Document + docs := params.Docs + const batchSize = 100 + + for len(docs) > 0 { + i := min(batchSize, len(docs)) + batch, docs = docs[:i], docs[i:] + + q, args, err := prepareInsertStatement(meta.TableName, meta.Capped(), batch) if err != nil { return lazyerrors.Error(err) } - // use batches: INSERT INTO %q %s VALUES (?), (?), (?), ... up to, say, 100 documents - // TODO https://github.com/FerretDB/FerretDB/issues/3271 - q := fmt.Sprintf(`INSERT INTO %q (%s) VALUES (?)`, meta.TableName, metadata.DefaultColumn) - - var args []any - if meta.Capped() { - q = fmt.Sprintf( - `INSERT INTO %q (%s, %s) VALUES (?, ?)`, - meta.TableName, - metadata.RecordIDColumn, - metadata.DefaultColumn, - ) - args = append(args, doc.RecordID()) - } - - args = append(args, string(b)) if _, err = tx.ExecContext(ctx, q, args...); err != nil { var se *sqlite3.Error if errors.As(err, &se) && se.Code() == sqlite3lib.SQLITE_CONSTRAINT_UNIQUE { diff --git a/internal/backends/sqlite/insert.go b/internal/backends/sqlite/insert.go new file mode 100644 index 000000000000..3a5402aca96c --- /dev/null +++ b/internal/backends/sqlite/insert.go @@ -0,0 +1,62 @@ +// Copyright 2021 FerretDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite + +import ( + "fmt" + "strings" + + "github.com/FerretDB/FerretDB/internal/backends/sqlite/metadata" + "github.com/FerretDB/FerretDB/internal/handlers/sjson" + "github.com/FerretDB/FerretDB/internal/types" + "github.com/FerretDB/FerretDB/internal/util/lazyerrors" +) + +// prepareInsertStatement returns a statement and arguments for inserting the given documents. +// +// If capped is true, it returns a statement and arguments for inserting record IDs and documents. +func prepareInsertStatement(tableName string, capped bool, docs []*types.Document) (string, []any, error) { + var args []any + rows := make([]string, len(docs)) + + for i, doc := range docs { + b, err := sjson.Marshal(doc) + if err != nil { + return "", nil, lazyerrors.Error(err) + } + + if capped { + rows[i] = "(?, ?)" + args = append(args, doc.RecordID(), string(b)) + + continue + } + + rows[i] = "(?)" + args = append(args, string(b)) + } + + columns := metadata.DefaultColumn + if capped { + columns = strings.Join([]string{metadata.RecordIDColumn, metadata.DefaultColumn}, ", ") + } + + return fmt.Sprintf( + `INSERT INTO %q (%s) VALUES %s`, + tableName, + columns, + strings.Join(rows, ", "), + ), args, nil +} diff --git a/internal/handlers/sqlite/msg_insert.go b/internal/handlers/sqlite/msg_insert.go index b6ce54679e90..140e4deea50f 100644 --- a/internal/handlers/sqlite/msg_insert.go +++ b/internal/handlers/sqlite/msg_insert.go @@ -15,9 +15,11 @@ package sqlite import ( + "cmp" "context" "errors" "fmt" + "slices" "github.com/FerretDB/FerretDB/internal/backends" "github.com/FerretDB/FerretDB/internal/handlers/common" @@ -86,34 +88,48 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, defer docsIter.Close() var inserted int32 - writeErrors := types.MakeArray(0) + var writeErrors []*writeError - for { - i, d, err := docsIter.Next() - if errors.Is(err, iterator.ErrIteratorDone) { - break - } + var done bool + for !done { + const batchSize = 1000 + docs := make([]*types.Document, 0, batchSize) + docsIndexes := make([]int32, 0, batchSize) - if err != nil { - return nil, lazyerrors.Error(err) - } + for j := 0; j < batchSize; j++ { + var i int + var d any - doc := d.(*types.Document) + i, d, err = docsIter.Next() + if errors.Is(err, iterator.ErrIteratorDone) { + done = true + break + } - if !doc.Has("_id") { - doc.Set("_id", types.NewObjectID()) - } + if err != nil { + return nil, lazyerrors.Error(err) + } - // TODO https://github.com/FerretDB/FerretDB/issues/3454 - if err = doc.ValidateData(); err != nil { - var ve *types.ValidationError + doc := d.(*types.Document) + + if !doc.Has("_id") { + doc.Set("_id", types.NewObjectID()) + } + + // TODO https://github.com/FerretDB/FerretDB/issues/3454 + if err = doc.ValidateData(); err == nil { + docs = append(docs, doc) + docsIndexes = append(docsIndexes, int32(i)) + + continue + } + var ve *types.ValidationError if !errors.As(err, &ve) { return nil, lazyerrors.Error(err) } var code commonerrors.ErrorCode - switch ve.Code() { case types.ErrValidation, types.ErrIDNotFound: code = commonerrors.ErrBadValue @@ -123,54 +139,68 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, panic(fmt.Sprintf("Unknown error code: %v", ve.Code())) } - we := &writeError{ + writeErrors = append(writeErrors, &writeError{ index: int32(i), code: code, errmsg: ve.Error(), - } - writeErrors.Append(we.Document()) + }) if params.Ordered { break } + } + + if _, err = c.InsertAll(ctx, &backends.InsertAllParams{Docs: docs}); err == nil { + inserted += int32(len(docs)) + + if params.Ordered && len(writeErrors) > 0 { + break + } continue } - // use bigger batches on a happy path, downgrade to one-document batches on error - // TODO https://github.com/FerretDB/FerretDB/issues/3271 - - _, err = c.InsertAll(ctx, &backends.InsertAllParams{ - Docs: []*types.Document{doc}, - }) - if err != nil { - if backends.ErrorCodeIs(err, backends.ErrorCodeInsertDuplicateID) { - we := &writeError{ - index: int32(i), - code: commonerrors.ErrDuplicateKeyInsert, - errmsg: fmt.Sprintf(`E11000 duplicate key error collection: %s.%s`, params.DB, params.Collection), - } - writeErrors.Append(we.Document()) - - if params.Ordered { - break - } + // insert doc one by one upon failing on batch insertion + for j, doc := range docs { + if _, err = c.InsertAll(ctx, &backends.InsertAllParams{ + Docs: []*types.Document{doc}, + }); err == nil { + inserted++ continue } - return nil, lazyerrors.Error(err) - } + if !backends.ErrorCodeIs(err, backends.ErrorCodeInsertDuplicateID) { + return nil, lazyerrors.Error(err) + } - inserted++ + writeErrors = append(writeErrors, &writeError{ + index: docsIndexes[j], + code: commonerrors.ErrDuplicateKeyInsert, + errmsg: fmt.Sprintf(`E11000 duplicate key error collection: %s.%s`, params.DB, params.Collection), + }) + + if params.Ordered { + break + } + } } res := must.NotFail(types.NewDocument( "n", inserted, )) - if writeErrors.Len() > 0 { - res.Set("writeErrors", writeErrors) + if len(writeErrors) > 0 { + slices.SortFunc(writeErrors, func(a, b *writeError) int { + return cmp.Compare(a.index, b.index) + }) + + array := types.MakeArray(len(writeErrors)) + for _, we := range writeErrors { + array.Append(we.Document()) + } + + res.Set("writeErrors", array) } res.Set("ok", float64(1))