diff --git a/internal/handlers/pg/msg_findandmodify.go b/internal/handlers/pg/msg_findandmodify.go index a63af3470d21..9ff1cad87379 100644 --- a/internal/handlers/pg/msg_findandmodify.go +++ b/internal/handlers/pg/msg_findandmodify.go @@ -141,6 +141,7 @@ func (h *Handler) MsgFindAndModify(ctx context.Context, msg *wire.OpMsg) (*wire. } } + // TODO https://github.com/FerretDB/FerretDB/issues/2612 if _, err = updateDocument(ctx, tx, &qp, upsert); err != nil { return lazyerrors.Error(err) } @@ -184,6 +185,7 @@ func (h *Handler) MsgFindAndModify(ctx context.Context, msg *wire.OpMsg) (*wire. return nil } + // TODO https://github.com/FerretDB/FerretDB/issues/2612 if _, err = deleteDocuments(ctx, dbPool, &qp, resDocs); err != nil { return err } @@ -218,13 +220,14 @@ func upsertDocuments(ctx context.Context, dbPool *pgdb.Pool, tx pgx.Tx, docs []* switch res.Operation { case common.UpsertOperationInsert: + // TODO https://github.com/FerretDB/FerretDB/issues/2612 if err = insertDocument(ctx, tx, query, res.Upsert); err != nil { return nil, lazyerrors.Error(err) } return res, nil case common.UpsertOperationUpdate: - + // TODO https://github.com/FerretDB/FerretDB/issues/2612 _, err = updateDocument(ctx, tx, query, res.Upsert) if err != nil { return nil, lazyerrors.Error(err) diff --git a/internal/handlers/pg/msg_insert.go b/internal/handlers/pg/msg_insert.go index 01fc8f940f4d..a26f4c4999b1 100644 --- a/internal/handlers/pg/msg_insert.go +++ b/internal/handlers/pg/msg_insert.go @@ -83,7 +83,7 @@ func insertMany(ctx context.Context, dbPool *pgdb.Pool, qp *pgdb.QueryParams, do var inserted int32 var insErrors commonerrors.WriteErrors - // + // attempt to insert all documents in a single transaction err := dbPool.InTransaction(ctx, func(tx pgx.Tx) error { for i := 0; i < docs.Len(); i++ { doc := must.NotFail(docs.Get(i)) @@ -101,7 +101,7 @@ func insertMany(ctx context.Context, dbPool *pgdb.Pool, qp *pgdb.QueryParams, do for i := 0; i < docs.Len(); i++ { doc := must.NotFail(docs.Get(i)) - err := insertDocumentFallback(ctx, dbPool, qp, doc) + err := insertDocumentSeparately(ctx, dbPool, qp, doc) var we *commonerrors.WriteErrors @@ -126,7 +126,7 @@ func insertMany(ctx context.Context, dbPool *pgdb.Pool, qp *pgdb.QueryParams, do return int32(docs.Len()), &insErrors } -// insertDocument prepares and executes actual INSERT request to Postgres. +// insertDocument prepares and executes actual INSERT request to Postgres in provided transaction. func insertDocument(ctx context.Context, tx pgx.Tx, qp *pgdb.QueryParams, doc any) error { d, ok := doc.(*types.Document) if !ok { @@ -136,19 +136,6 @@ func insertDocument(ctx context.Context, tx pgx.Tx, qp *pgdb.QueryParams, doc an ) } - return pgdb.InsertDocument(ctx, tx, qp.DB, qp.Collection, d) -} - -// insertDocument prepares and executes actual INSERT request to Postgres. -func insertDocumentFallback(ctx context.Context, dbPool *pgdb.Pool, qp *pgdb.QueryParams, doc any) error { - d, ok := doc.(*types.Document) - if !ok { - return commonerrors.NewCommandErrorMsg( - commonerrors.ErrBadValue, - fmt.Sprintf("document has invalid type %s", commonparams.AliasFromType(doc)), - ) - } - toInsert := d if !toInsert.Has("_id") { @@ -158,9 +145,7 @@ func insertDocumentFallback(ctx context.Context, dbPool *pgdb.Pool, qp *pgdb.Que toInsert.Set("_id", types.NewObjectID()) } - err := dbPool.InTransactionRetry(ctx, func(tx pgx.Tx) error { - return pgdb.InsertDocument(ctx, tx, qp.DB, qp.Collection, toInsert) - }) + err := pgdb.InsertDocument(ctx, tx, qp.DB, qp.Collection, toInsert) switch { case err == nil: @@ -186,3 +171,12 @@ func insertDocumentFallback(ctx context.Context, dbPool *pgdb.Pool, qp *pgdb.Que return commonerrors.CheckError(err) } } + +// insertDocumentSeparately prepares and executes actual INSERT request to Postgres in separate transaction. +// +// It should be used in places where we don't want to rollback previous inserted documents on error. +func insertDocumentSeparately(ctx context.Context, dbPool *pgdb.Pool, qp *pgdb.QueryParams, doc any) error { + return dbPool.InTransactionRetry(ctx, func(tx pgx.Tx) error { + return insertDocument(ctx, tx, qp, doc) + }) +} diff --git a/internal/handlers/pg/msg_update.go b/internal/handlers/pg/msg_update.go index 478784818417..8dcfde319ee7 100644 --- a/internal/handlers/pg/msg_update.go +++ b/internal/handlers/pg/msg_update.go @@ -97,6 +97,7 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, "_id", must.NotFail(doc.Get("_id")), ))) + // TODO https://github.com/FerretDB/FerretDB/issues/2612 if err = insertDocument(ctx, tx, &qp, doc); err != nil { return err } @@ -121,6 +122,7 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, continue } + // TODO https://github.com/FerretDB/FerretDB/issues/2612 rowsChanged, err := updateDocument(ctx, tx, &qp, doc) if err != nil { return err