Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize insert performance by batching #3621

Merged
merged 13 commits into from
Oct 31, 2023
Prev Previous commit
Next Next commit
simplify code
  • Loading branch information
chilagrow committed Oct 31, 2023
commit e28546246768df977472aa67559947a0b1f388ff
12 changes: 6 additions & 6 deletions internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,16 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
batchSize := 100

err = pool.InTransaction(ctx, p, func(tx pgx.Tx) error {
var j int
for i := 0; i < len(params.Docs); i += batchSize {
if j += batchSize; j > len(params.Docs) {
j = len(params.Docs)
}
docs := params.Docs
for len(docs) > 0 {
i := min(batchSize, len(docs))
batch := docs[:i]
docs = docs[i:]

var q string
var args []any

q, args, err = prepareInsertStatement(c.dbName, meta.TableName, meta.CappedSize > 0, params.Docs[i:j])
q, args, err = prepareInsertStatement(c.dbName, meta.TableName, meta.Capped(), batch)
if err != nil {
return lazyerrors.Error(err)
}
Expand Down
12 changes: 6 additions & 6 deletions internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ func (c *collection) InsertAll(ctx context.Context, params *backends.InsertAllPa
batchSize := 100

err := db.InTransaction(ctx, func(tx *fsql.Tx) error {
var j int
for i := 0; i < len(params.Docs); i += batchSize {
if j += batchSize; j > len(params.Docs) {
j = len(params.Docs)
}
docs := params.Docs
for len(docs) > 0 {
i := min(batchSize, len(docs))
batch := docs[:i]
docs = docs[i:]

q, args, err := prepareInsertStatement(meta.TableName, meta.Settings.CappedSize > 0, params.Docs[i:j])
q, args, err := prepareInsertStatement(meta.TableName, meta.Capped(), batch)
if err != nil {
return lazyerrors.Error(err)
}
Expand Down
62 changes: 28 additions & 34 deletions internal/handlers/sqlite/msg_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
batchSize := 100
var writeErrors []*writeError

for {
if done {
break
}

for !done {
var docs []*types.Document
var docsIndexes []int32

Expand All @@ -121,44 +117,42 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}

// TODO https://github.com/FerretDB/FerretDB/issues/3454
if err = doc.ValidateData(); err != nil {
var ve *types.ValidationError
if err = doc.ValidateData(); err == nil {
docs = append(docs, doc)
docsIndexes = append(docsIndexes, int32(i))

if !errors.As(err, &ve) {
return nil, lazyerrors.Error(err)
}
continue
}

var code commonerrors.ErrorCode
var ve *types.ValidationError
if !errors.As(err, &ve) {
return nil, lazyerrors.Error(err)
}

switch ve.Code() {
case types.ErrValidation, types.ErrIDNotFound:
code = commonerrors.ErrBadValue
case types.ErrWrongIDType:
code = commonerrors.ErrInvalidID
default:
panic(fmt.Sprintf("Unknown error code: %v", ve.Code()))
}
var code commonerrors.ErrorCode
switch ve.Code() {
case types.ErrValidation, types.ErrIDNotFound:
code = commonerrors.ErrBadValue
case types.ErrWrongIDType:
code = commonerrors.ErrInvalidID
default:
panic(fmt.Sprintf("Unknown error code: %v", ve.Code()))
}

writeErrors = append(writeErrors, &writeError{
index: int32(i),
code: code,
errmsg: ve.Error(),
})
writeErrors = append(writeErrors, &writeError{
index: int32(i),
code: code,
errmsg: ve.Error(),
})

if params.Ordered {
break
}
} else {
docs = append(docs, doc)
docsIndexes = append(docsIndexes, int32(i))
if params.Ordered {
break
}
}

_, err = c.InsertAll(ctx, &backends.InsertAllParams{
if _, err = c.InsertAll(ctx, &backends.InsertAllParams{
Docs: docs,
})

if err == nil {
}); err == nil {
inserted += int32(len(docs))

if params.Ordered && len(writeErrors) > 0 {
Expand Down