Skip to content

Commit

Permalink
Support findAndModify
Browse files Browse the repository at this point in the history
Closes #548
  • Loading branch information
Dmitry authored May 23, 2022
1 parent 638e107 commit d80126f
Show file tree
Hide file tree
Showing 8 changed files with 1,071 additions and 95 deletions.
675 changes: 675 additions & 0 deletions integration/findandmodify_test.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions internal/handlers/common/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
ErrNamespaceNotFound = ErrorCode(26) // NamespaceNotFound
ErrNamespaceExists = ErrorCode(48) // NamespaceExists
ErrCommandNotFound = ErrorCode(59) // CommandNotFound
ErrInvalidNamespace = ErrorCode(73) // InvalidNamespace
ErrNotImplemented = ErrorCode(238) // NotImplemented
ErrSortBadValue = ErrorCode(15974) // Location15974
ErrInvalidArg = ErrorCode(28667) // Location28667
Expand Down
20 changes: 11 additions & 9 deletions internal/handlers/common/errorcode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 32 additions & 2 deletions internal/handlers/common/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,43 @@ func GetOptionalParam[T types.Type](doc *types.Document, key string, defaultValu

res, ok := v.(T)
if !ok {
msg := fmt.Sprintf("parameter %q has type %T (expected %T)", key, v, defaultValue)
return defaultValue, NewErrorMsg(ErrBadValue, msg)
msg := fmt.Sprintf(
`BSON field '%s' is the wrong type '%s', expected type '%s'`,
key, AliasFromType(v), AliasFromType(defaultValue),
)
return defaultValue, NewErrorMsg(ErrTypeMismatch, msg)
}

return res, nil
}

// GetBoolOptionalParam returns doc's bool value for key, or protocol error for invalid parameter.
// Non-zero value for double, long and int return true.
func GetBoolOptionalParam(doc *types.Document, key string) (bool, error) {
v, err := doc.Get(key)
if err != nil {
return false, nil
}

switch v := v.(type) {
case float64:
return v != 0, nil
case bool:
return v, nil
case int32:
return v != 0, nil
case int64:
return v != 0, nil
default:
msg := fmt.Sprintf(
`BSON field '%s' is the wrong type '%s', expected types '[bool, long, int, decimal, double]'`,
key,
AliasFromType(v),
)
return false, NewErrorMsg(ErrTypeMismatch, msg)
}
}

// AssertType asserts value's type, returning protocol error for unexpected types.
//
// If a custom error is needed, use a normal Go type assertion instead:
Expand Down
42 changes: 26 additions & 16 deletions internal/handlers/pg/msg_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strings"

"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"

"github.com/FerretDB/FerretDB/internal/fjson"
Expand Down Expand Up @@ -113,23 +114,9 @@ func (h *Handler) MsgDelete(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
continue
}

var p pgdb.Placeholder
placeholders := make([]string, len(resDocs))
ids := make([]any, len(resDocs))
for i, doc := range resDocs {
placeholders[i] = p.Next()
id := must.NotFail(doc.Get("_id"))
ids[i] = must.NotFail(fjson.Marshal(id))
}

sql := fmt.Sprintf(
"DELETE FROM %s WHERE _jsonb->'_id' IN (%s)",
pgx.Identifier{sp.db, sp.collection}.Sanitize(), strings.Join(placeholders, ", "),
)
tag, err := h.pgPool.Exec(ctx, sql, ids...)
tag, err := h.delete(ctx, sp, resDocs)
if err != nil {
// TODO check error code
return nil, common.NewError(common.ErrNamespaceNotFound, fmt.Errorf("delete: ns not found: %w", err))
return nil, err
}

deleted += int32(tag.RowsAffected())
Expand All @@ -148,3 +135,26 @@ func (h *Handler) MsgDelete(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,

return &reply, nil
}

// delete prepares and executes actual DELETE request to Postgres.
func (h *Handler) delete(ctx context.Context, sp sqlParam, resDocs []*types.Document) (pgconn.CommandTag, error) {
var p pgdb.Placeholder
placeholders := make([]string, len(resDocs))
ids := make([]any, len(resDocs))
for i, doc := range resDocs {
placeholders[i] = p.Next()
id := must.NotFail(doc.Get("_id"))
ids[i] = must.NotFail(fjson.Marshal(id))
}

sql := fmt.Sprintf(
"DELETE FROM %s WHERE _jsonb->'_id' IN (%s)",
pgx.Identifier{sp.db, sp.collection}.Sanitize(), strings.Join(placeholders, ", "),
)
tag, err := h.pgPool.Exec(ctx, sql, ids...)
if err != nil {
// TODO check error code
return nil, common.NewError(common.ErrNamespaceNotFound, fmt.Errorf("delete: ns not found: %w", err))
}
return tag, nil
}
Loading

0 comments on commit d80126f

Please sign in to comment.