Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize insert performance by batching #3621

Merged
merged 13 commits into from
Oct 31, 2023
11 changes: 11 additions & 0 deletions integration/insert_command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ func TestInsertCommandErrors(t *testing.T) {
Message: "The '_id' value cannot be of type regex",
},
},
"InsertDuplicateID": {
toInsert: []any{
bson.D{{"_id", "foo"}, {"_id", "bar"}},
},
ordered: false,
werr: &mongo.WriteError{
Code: 2,
Message: "can't have multiple _id fields in one document",
},
altMessage: `invalid key: "_id" (duplicate keys are not allowed)`,
},
} {
name, tc := name, tc
t.Run(name, func(t *testing.T) {
Expand Down
26 changes: 24 additions & 2 deletions integration/insert_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,37 @@ func TestInsertCompat(t *testing.T) {
},
ordered: true,
},
"UnorderedOneError": {
"UnorderedTwoErrors": {
insert: []any{
bson.D{{"_id", "1"}},
bson.D{{"_id", "1"}}, // to test duplicate key error
bson.D{{"_id", "1"}},
bson.D{{"_id", primitive.Regex{Pattern: "^regex$", Options: "i"}}},
bson.D{{"_id", "2"}},
},
ordered: false,
},
"OrderedThreeErrors": {
insert: []any{
bson.D{{"_id", "1"}},
bson.D{{"_id", primitive.Regex{Pattern: "^regex$", Options: "i"}}},
bson.D{{"_id", "2"}},
bson.D{{"_id", "1"}},
bson.D{{"_id", "3"}},
bson.D{{"_id", "4"}, {"_id", "4"}},
},
ordered: true,
},
"UnorderedThreeErrors": {
insert: []any{
bson.D{{"_id", "1"}},
bson.D{{"_id", primitive.Regex{Pattern: "^regex$", Options: "i"}}},
bson.D{{"_id", "2"}},
bson.D{{"_id", "1"}},
bson.D{{"_id", "3"}},
bson.D{{"_id", "4"}, {"_id", "4"}},
},
ordered: false,
},
}

testInsertCompat(t, testCases)
Expand Down
33 changes: 11 additions & 22 deletions internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,22 @@
}

err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error {
for _, doc := range params.Docs {
var b []byte
b, err = sjson.Marshal(doc)
if err != nil {
return lazyerrors.Error(err)
}
var batch []*types.Document
docs := params.Docs
const batchSize = 100

// use batches: INSERT INTO %s %s VALUES (?), (?), (?), ... up to, say, 100 documents
// TODO https://github.com/FerretDB/FerretDB/issues/3271
q := fmt.Sprintf(
`INSERT INTO %s (%s) VALUES ($1)`,
pgx.Identifier{c.dbName, meta.TableName}.Sanitize(),
metadata.DefaultColumn,
)
for len(docs) > 0 {
i := min(batchSize, len(docs))
batch, docs = docs[:i], docs[i:]

var q string
var args []any
if meta.Capped() {
q = fmt.Sprintf(
`INSERT INTO %s (%s, %s) VALUES ($1, $2)`,
pgx.Identifier{c.dbName, meta.TableName}.Sanitize(),
metadata.RecordIDColumn,
metadata.DefaultColumn,
)
args = append(args, doc.RecordID())

q, args, err = prepareInsertStatement(c.dbName, meta.TableName, meta.Capped(), batch)
if err != nil {
return lazyerrors.Error(err)

Check warning on line 144 in internal/backends/postgresql/collection.go

View check run for this annotation

Codecov / codecov/patch

internal/backends/postgresql/collection.go#L144

Added line #L144 was not covered by tests
}

args = append(args, string(b))
if _, err = tx.Exec(ctx, q, args...); err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == pgerrcode.UniqueViolation {
Expand Down
65 changes: 65 additions & 0 deletions internal/backends/postgresql/insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2021 FerretDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package postgresql

import (
"fmt"
"strings"

"github.com/jackc/pgx/v5"

"github.com/FerretDB/FerretDB/internal/backends/postgresql/metadata"
"github.com/FerretDB/FerretDB/internal/handlers/sjson"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
)

// prepareInsertStatement returns a statement and arguments for inserting the given documents.
//
// If capped is true, it returns a statement and arguments for inserting record IDs and documents.
func prepareInsertStatement(schema, tableName string, capped bool, docs []*types.Document) (string, []any, error) {
var placeholder metadata.Placeholder
var args []any
rows := make([]string, len(docs))

for i, doc := range docs {
b, err := sjson.Marshal(doc)
if err != nil {
return "", nil, lazyerrors.Error(err)
}

Check warning on line 41 in internal/backends/postgresql/insert.go

View check run for this annotation

Codecov / codecov/patch

internal/backends/postgresql/insert.go#L40-L41

Added lines #L40 - L41 were not covered by tests

if capped {
rows[i] = "(" + placeholder.Next() + ", " + placeholder.Next() + ")"
args = append(args, doc.RecordID(), string(b))

continue
}

rows[i] = "(" + placeholder.Next() + ")"
args = append(args, string(b))
}

columns := metadata.DefaultColumn
if capped {
columns = strings.Join([]string{metadata.RecordIDColumn, metadata.DefaultColumn}, ", ")
}

return fmt.Sprintf(
`INSERT INTO %s (%s) VALUES %s`,
pgx.Identifier{schema, tableName}.Sanitize(),
columns,
strings.Join(rows, ", "),
), args, nil
}
27 changes: 9 additions & 18 deletions internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,19 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
meta := c.r.CollectionGet(ctx, c.dbName, c.name)

err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
for _, doc := range params.Docs {
b, err := sjson.Marshal(doc)
var batch []*types.Document
docs := params.Docs
const batchSize = 100

for len(docs) > 0 {
i := min(batchSize, len(docs))
batch, docs = docs[:i], docs[i:]

q, args, err := prepareInsertStatement(meta.TableName, meta.Capped(), batch)
if err != nil {
return lazyerrors.Error(err)
}

// use batches: INSERT INTO %q %s VALUES (?), (?), (?), ... up to, say, 100 documents
// TODO https://github.com/FerretDB/FerretDB/issues/3271
q := fmt.Sprintf(`INSERT INTO %q (%s) VALUES (?)`, meta.TableName, metadata.DefaultColumn)

var args []any
if meta.Capped() {
q = fmt.Sprintf(
`INSERT INTO %q (%s, %s) VALUES (?, ?)`,
meta.TableName,
metadata.RecordIDColumn,
metadata.DefaultColumn,
)
args = append(args, doc.RecordID())
}

args = append(args, string(b))
if _, err = tx.ExecContext(ctx, q, args...); err != nil {
var se *sqlite3.Error
if errors.As(err, &se) && se.Code() == sqlite3lib.SQLITE_CONSTRAINT_UNIQUE {
Expand Down
62 changes: 62 additions & 0 deletions internal/backends/sqlite/insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2021 FerretDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sqlite

import (
"fmt"
"strings"

"github.com/FerretDB/FerretDB/internal/backends/sqlite/metadata"
"github.com/FerretDB/FerretDB/internal/handlers/sjson"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
)

// prepareInsertStatement returns a statement and arguments for inserting the given documents.
//
// If capped is true, it returns a statement and arguments for inserting record IDs and documents.
func prepareInsertStatement(tableName string, capped bool, docs []*types.Document) (string, []any, error) {
var args []any
rows := make([]string, len(docs))

for i, doc := range docs {
b, err := sjson.Marshal(doc)
if err != nil {
return "", nil, lazyerrors.Error(err)
}

Check warning on line 38 in internal/backends/sqlite/insert.go

View check run for this annotation

Codecov / codecov/patch

internal/backends/sqlite/insert.go#L37-L38

Added lines #L37 - L38 were not covered by tests

if capped {
rows[i] = "(?, ?)"
args = append(args, doc.RecordID(), string(b))

continue
}

rows[i] = "(?)"
args = append(args, string(b))
}

columns := metadata.DefaultColumn
if capped {
columns = strings.Join([]string{metadata.RecordIDColumn, metadata.DefaultColumn}, ", ")
}

return fmt.Sprintf(
`INSERT INTO %q (%s) VALUES %s`,
tableName,
columns,
strings.Join(rows, ", "),
), args, nil
}
Loading
Loading