Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support _id aggregation operators for $group stage #3096

Merged
merged 19 commits into from
Jul 26, 2023
18 changes: 14 additions & 4 deletions integration/aggregate_documents_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,25 +618,35 @@ func TestAggregateCompatGroup(t *testing.T) {
},
"IDType": {
pipeline: bson.A{bson.D{{"$group", bson.D{
{"_id", bson.D{{"$type", "_id"}}},
{"_id", bson.D{{"$type", "$v"}}},
{"count", bson.D{{"$count", bson.D{}}}},
}}}},
skip: "https://github.com/FerretDB/FerretDB/issues/2679",
},
"IDSum": {
pipeline: bson.A{
bson.D{{"$sort", bson.D{{"_id", 1}}}},
bson.D{{"$group", bson.D{{"_id", bson.D{{"$sum", "$v"}}}}}},
bson.D{{"$sort", bson.D{{"_id", 1}}}},
},
skip: "https://github.com/FerretDB/FerretDB/issues/2694",
},
"IDSumNonExistentField": {
pipeline: bson.A{
bson.D{{"$sort", bson.D{{"_id", 1}}}},
bson.D{{"$group", bson.D{{"_id", bson.D{{"$sum", "$non-existent"}}}}}},
bson.D{{"$sort", bson.D{{"_id", 1}}}},
},
skip: "https://github.com/FerretDB/FerretDB/issues/2694",
},
"IDSumInvalid": {
pipeline: bson.A{
bson.D{{"$group", bson.D{{"_id", bson.D{{"$sum", "$"}}}}}},
},
resultType: emptyResult,
},
"IDSumRecursiveInvalid": {
pipeline: bson.A{
bson.D{{"$group", bson.D{{"_id", bson.D{{"$sum", bson.D{{"$sum", "$"}}}}}}}},
},
resultType: emptyResult,
},
}

Expand Down
52 changes: 51 additions & 1 deletion integration/aggregate_documents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestAggregateGroupErrors(t *testing.T) {
altMessage string // optional, alternative error message for FerretDB, ignored if empty
skip string // optional, skip test with a specified reason
}{
"StageGroupUnaryOperatorSum": {
"GroupUnaryOperatorSum": {
pipeline: bson.A{
bson.D{{"$group", bson.D{{"sum", bson.D{{"$sum", bson.A{}}}}}}},
},
Expand All @@ -105,6 +105,56 @@ func TestAggregateGroupErrors(t *testing.T) {
},
altMessage: "The $sum accumulator is a unary operator",
},
"GroupTypeEmpty": {
pipeline: bson.A{
bson.D{{"$group", bson.D{{"v", bson.D{}}}}},
},
err: &mongo.CommandError{
Code: 40234,
Name: "Location40234",
Message: "The field 'v' must be an accumulator object",
},
},
"GroupTwoOperators": {
pipeline: bson.A{
bson.D{{"$group", bson.D{{"_id", bson.D{{"$type", int32(42)}, {"$op", int32(42)}}}}}},
},
err: &mongo.CommandError{
Code: 15983,
Name: "Location15983",
Message: "An object representing an expression must have exactly one field: { $type: 42, $op: 42 }",
},
},
"GroupTypeInvalidLen": {
pipeline: bson.A{
bson.D{{"$group", bson.D{{"_id", bson.D{{"$type", bson.A{"foo", "bar"}}}}}}},
},
err: &mongo.CommandError{
Code: 16020,
Name: "Location16020",
Message: "Expression $type takes exactly 1 arguments. 2 were passed in.",
},
},
"GroupNonExistentOperator": {
pipeline: bson.A{
bson.D{{"$group", bson.D{{"_id", bson.D{{"$non-existent", "foo"}}}}}},
},
err: &mongo.CommandError{
Code: 168,
Name: "InvalidPipelineOperator",
Message: "Unrecognized expression '$non-existent'",
},
},
"GroupRecursiveNonExistentOperator": {
pipeline: bson.A{
bson.D{{"$group", bson.D{{"_id", bson.D{{"$type", bson.D{{"$non-existent", "foo"}}}}}}}},
},
err: &mongo.CommandError{
Code: 168,
Name: "InvalidPipelineOperator",
Message: "Unrecognized expression '$non-existent'",
},
},
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
} {
name, tc := name, tc
t.Run(name, func(t *testing.T) {
Expand Down
237 changes: 180 additions & 57 deletions internal/handlers/common/aggregations/stages/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/common/aggregations"
"github.com/FerretDB/FerretDB/internal/handlers/common/aggregations/operators"
"github.com/FerretDB/FerretDB/internal/handlers/common/aggregations/operators/accumulators"
"github.com/FerretDB/FerretDB/internal/handlers/commonerrors"
"github.com/FerretDB/FerretDB/internal/types"
Expand Down Expand Up @@ -82,6 +84,11 @@ func newGroup(stage *types.Document) (aggregations.Stage, error) {

if field == "_id" {
if doc, ok := v.(*types.Document); ok {
if operators.IsOperator(doc) {
groupKey = v
continue
}

if err = validateExpression("$group", doc); err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,73 +172,98 @@ func (g *group) Process(ctx context.Context, iter types.DocumentsIterator, close

// groupDocuments groups documents by group expression.
func (g *group) groupDocuments(ctx context.Context, in []*types.Document) ([]groupedDocuments, error) {
groupKey, ok := g.groupExpression.(string)
if !ok {
// non-string key aggregates values of all `in` documents into one aggregated document.
return []groupedDocuments{{
groupID: g.groupExpression,
documents: in,
}}, nil
}

expression, err := aggregations.NewExpression(groupKey)
if err != nil {
var exprErr *aggregations.ExpressionError
if !errors.As(err, &exprErr) {
return nil, lazyerrors.Error(err)
switch groupKey := g.groupExpression.(type) {
case *types.Document:
op, err := operators.NewOperator(groupKey)
if err != nil {
return nil, processOperatorError(err)
}

switch exprErr.Code() {
case aggregations.ErrNotExpression:
// constant value aggregates values of all `in` documents into one aggregated document.
return []groupedDocuments{{
groupID: groupKey,
documents: in,
}}, nil
case aggregations.ErrEmptyFieldPath:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
// TODO
commonerrors.ErrGroupInvalidFieldPath,
"'$' by itself is not a valid Expression",
"$group (stage)",
)
case aggregations.ErrInvalidExpression:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
fmt.Sprintf("'%s' starts with an invalid character for a user variable name", types.FormatAnyValue(groupKey)),
"$group (stage)",
)
case aggregations.ErrEmptyVariable:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
"empty variable names are not allowed",
"$group (stage)",
)
// TODO https://github.com/FerretDB/FerretDB/issues/2275
case aggregations.ErrUndefinedVariable:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrGroupUndefinedVariable,
fmt.Sprintf("Use of undefined variable: %s", types.FormatAnyValue(groupKey)),
"$group (stage)",
)
default:
panic(fmt.Sprintf("unhandled field path error %s", exprErr.Error()))
var group groupMap

for _, doc := range in {
val, err := op.Process(doc)
if err != nil {
return nil, processOperatorError(err)
}

group.addOrAppend(val, doc)
rumyantseva marked this conversation as resolved.
Show resolved Hide resolved
}
}

var group groupMap
return group.docs, nil

for _, doc := range in {
val, err := expression.Evaluate(doc)
case *types.Array, float64, types.Binary, types.ObjectID, bool, time.Time, types.NullType,
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
types.Regex, int32, types.Timestamp, int64:
// non-string or document key aggregates values of all `in` documents into one aggregated document.

case string:
expression, err := aggregations.NewExpression(groupKey)
if err != nil {
// $group treats non-existent fields as nulls
val = types.Null
var exprErr *aggregations.ExpressionError
if !errors.As(err, &exprErr) {
return nil, lazyerrors.Error(err)
}

switch exprErr.Code() {
case aggregations.ErrNotExpression:
// constant value aggregates values of all `in` documents into one aggregated document.
return []groupedDocuments{{
groupID: groupKey,
documents: in,
}}, nil
case aggregations.ErrEmptyFieldPath:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
// TODO
commonerrors.ErrGroupInvalidFieldPath,
"'$' by itself is not a valid Expression",
"$group (stage)",
)
case aggregations.ErrInvalidExpression:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
fmt.Sprintf("'%s' starts with an invalid character for a user variable name", types.FormatAnyValue(groupKey)),
"$group (stage)",
)
case aggregations.ErrEmptyVariable:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
"empty variable names are not allowed",
"$group (stage)",
)
// TODO https://github.com/FerretDB/FerretDB/issues/2275
case aggregations.ErrUndefinedVariable:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrGroupUndefinedVariable,
fmt.Sprintf("Use of undefined variable: %s", types.FormatAnyValue(groupKey)),
"$group (stage)",
)
default:
panic(fmt.Sprintf("unhandled field path error %s", exprErr.Error()))
}
}

group.addOrAppend(val, doc)
var group groupMap

for _, doc := range in {
val, err := expression.Evaluate(doc)
if err != nil {
// $group treats non-existent fields as nulls
val = types.Null
}

group.addOrAppend(val, doc)
}

return group.docs, nil

default:
panic(fmt.Sprintf("unexpected type %[1]T (%#[1]v)", groupKey))
}

return group.docs, nil
return []groupedDocuments{{
groupID: g.groupExpression,
documents: in,
}}, nil
}

// groupedDocuments contains group key and the documents for that group.
Expand Down Expand Up @@ -265,6 +297,97 @@ func (m *groupMap) addOrAppend(groupKey any, docs ...*types.Document) {
})
}

// processOperatorError takes internal error related to operator evaluation and
// returns proper CommandError that can be returned by $project aggregation stage.
//
// Command error codes:
// - ErrEmptySubProject when operator value is empty.
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
// - ErrFieldPathInvalidName when FieldPath is invalid.
// - ErrNotImplemented when the operator or expression is not implemented yet.
// - ErrOperatorWrongLenOfArgs when the operator has an invalid number of arguments.
// - ErrInvalidPipelineOperator when the operator does not exist.
// - ErrFailedToParse when operator has invalid variable expression.
// - ErrGroupInvalidFieldPath when operator has empty path expression.
func processOperatorError(err error) error {
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
return nil
}
chilagrow marked this conversation as resolved.
Show resolved Hide resolved

var opErr operators.OperatorError
var exErr *aggregations.ExpressionError

switch {
case errors.As(err, &opErr):
switch opErr.Code() {
case operators.ErrTooManyFields:
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFieldPathInvalidName,
"Invalid $group :: caused by :: FieldPath field names may not start with '$'."+
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
" Consider using $getField or $setField.",
"$group (stage)",
)
case operators.ErrNotImplemented:
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrNotImplemented,
"Invalid $group :: caused by :: "+opErr.Error(),
"$group (stage)",
)
case operators.ErrArgsInvalidLen:
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrOperatorWrongLenOfArgs,
"Invalid $group :: caused by :: "+opErr.Error(),
"$group (stage)",
)
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
case operators.ErrInvalidExpression:
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrAggregateInvalidExpression,
"Invalid $group :: caused by :: "+opErr.Error(),
"$group (stage)",
)
case operators.ErrInvalidNestedExpression:
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrInvalidPipelineOperator,
"Invalid $group :: caused by :: "+opErr.Error(),
"$group (stage)",
)
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
}

case errors.As(err, &exErr):
switch exErr.Code() {
case aggregations.ErrNotExpression:
// handled by upstream and this should not be reachable for existing expression implementation
fallthrough
case aggregations.ErrInvalidExpression:
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
"Invalid $group :: caused by :: '$' starts with an invalid character for a user variable name",
"$group (stage)",
)
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
case aggregations.ErrEmptyFieldPath:
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrGroupInvalidFieldPath,
"Invalid $group :: caused by :: '$' by itself is not a valid FieldPath",
"$group (stage)",
)
case aggregations.ErrUndefinedVariable:
// TODO https://github.com/FerretDB/FerretDB/issues/2275
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrNotImplemented,
"Aggregation expression variables are not implemented yet",
"$group (stage)",
)
case aggregations.ErrEmptyVariable:
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
"Invalid $group :: caused by :: empty variable names are not allowed",
"$group (stage)",
)
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
}
}

return lazyerrors.Error(err)
}

// check interfaces
var (
_ aggregations.Stage = (*group)(nil)
Expand Down