Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support _id aggregation operators for $group stage #3096

Merged
merged 19 commits into from
Jul 26, 2023
6 changes: 2 additions & 4 deletions integration/aggregate_documents_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,25 +618,23 @@ func TestAggregateCompatGroup(t *testing.T) {
},
"IDType": {
pipeline: bson.A{bson.D{{"$group", bson.D{
{"_id", bson.D{{"$type", "_id"}}},
{"_id", bson.D{{"$type", "$v"}}},
{"count", bson.D{{"$count", bson.D{}}}},
}}}},
skip: "https://github.com/FerretDB/FerretDB/issues/2679",
},
"IDSum": {
pipeline: bson.A{
bson.D{{"$sort", bson.D{{"_id", 1}}}},
bson.D{{"$group", bson.D{{"_id", bson.D{{"$sum", "$v"}}}}}},
bson.D{{"$sort", bson.D{{"_id", 1}}}},
},
skip: "https://github.com/FerretDB/FerretDB/issues/2694",
},
"IDSumNonExistentField": {
pipeline: bson.A{
bson.D{{"$sort", bson.D{{"_id", 1}}}},
bson.D{{"$group", bson.D{{"_id", bson.D{{"$sum", "$non-existent"}}}}}},
bson.D{{"$sort", bson.D{{"_id", 1}}}},
},
skip: "https://github.com/FerretDB/FerretDB/issues/2694",
},
}

Expand Down
147 changes: 91 additions & 56 deletions internal/handlers/common/aggregations/stages/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/common/aggregations"
"github.com/FerretDB/FerretDB/internal/handlers/common/aggregations/operators"
"github.com/FerretDB/FerretDB/internal/handlers/common/aggregations/operators/accumulators"
"github.com/FerretDB/FerretDB/internal/handlers/commonerrors"
"github.com/FerretDB/FerretDB/internal/types"
Expand Down Expand Up @@ -82,6 +84,10 @@ func newGroup(stage *types.Document) (aggregations.Stage, error) {

if field == "_id" {
if doc, ok := v.(*types.Document); ok {
if operators.IsOperator(doc) {
continue
}

if err = validateExpression("$group", doc); err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,73 +171,102 @@ func (g *group) Process(ctx context.Context, iter types.DocumentsIterator, close

// groupDocuments groups documents by group expression.
func (g *group) groupDocuments(ctx context.Context, in []*types.Document) ([]groupedDocuments, error) {
groupKey, ok := g.groupExpression.(string)
if !ok {
// non-string key aggregates values of all `in` documents into one aggregated document.
return []groupedDocuments{{
groupID: g.groupExpression,
documents: in,
}}, nil
}
switch groupKey := g.groupExpression.(type) {
case *types.Document:
if !operators.IsOperator(groupKey) {
break
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
}

expression, err := aggregations.NewExpression(groupKey)
if err != nil {
var exprErr *aggregations.ExpressionError
if !errors.As(err, &exprErr) {
return nil, lazyerrors.Error(err)
op, err := operators.NewOperator(groupKey)
if err != nil {
return nil, err
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
}

switch exprErr.Code() {
case aggregations.ErrNotExpression:
// constant value aggregates values of all `in` documents into one aggregated document.
return []groupedDocuments{{
groupID: groupKey,
documents: in,
}}, nil
case aggregations.ErrEmptyFieldPath:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
// TODO
commonerrors.ErrGroupInvalidFieldPath,
"'$' by itself is not a valid Expression",
"$group (stage)",
)
case aggregations.ErrInvalidExpression:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
fmt.Sprintf("'%s' starts with an invalid character for a user variable name", types.FormatAnyValue(groupKey)),
"$group (stage)",
)
case aggregations.ErrEmptyVariable:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
"empty variable names are not allowed",
"$group (stage)",
)
// TODO https://github.com/FerretDB/FerretDB/issues/2275
case aggregations.ErrUndefinedVariable:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrGroupUndefinedVariable,
fmt.Sprintf("Use of undefined variable: %s", types.FormatAnyValue(groupKey)),
"$group (stage)",
)
default:
panic(fmt.Sprintf("unhandled field path error %s", exprErr.Error()))
var group groupMap

for _, doc := range in {
val, err := op.Process(doc)
if err != nil {
return nil, err
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
}

group.addOrAppend(val, doc)
rumyantseva marked this conversation as resolved.
Show resolved Hide resolved
}
}

var group groupMap
return group.docs, nil

for _, doc := range in {
val, err := expression.Evaluate(doc)
case *types.Array, float64, types.Binary, types.ObjectID, bool, time.Time, types.NullType,
chilagrow marked this conversation as resolved.
Show resolved Hide resolved
types.Regex, int32, types.Timestamp, int64:
// non-string or document key aggregates values of all `in` documents into one aggregated document.

case string:
expression, err := aggregations.NewExpression(groupKey)
if err != nil {
// $group treats non-existent fields as nulls
val = types.Null
var exprErr *aggregations.ExpressionError
if !errors.As(err, &exprErr) {
return nil, lazyerrors.Error(err)
}

switch exprErr.Code() {
case aggregations.ErrNotExpression:
// constant value aggregates values of all `in` documents into one aggregated document.
return []groupedDocuments{{
groupID: groupKey,
documents: in,
}}, nil
case aggregations.ErrEmptyFieldPath:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
// TODO
commonerrors.ErrGroupInvalidFieldPath,
"'$' by itself is not a valid Expression",
"$group (stage)",
)
case aggregations.ErrInvalidExpression:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
fmt.Sprintf("'%s' starts with an invalid character for a user variable name", types.FormatAnyValue(groupKey)),
"$group (stage)",
)
case aggregations.ErrEmptyVariable:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrFailedToParse,
"empty variable names are not allowed",
"$group (stage)",
)
// TODO https://github.com/FerretDB/FerretDB/issues/2275
case aggregations.ErrUndefinedVariable:
return nil, commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrGroupUndefinedVariable,
fmt.Sprintf("Use of undefined variable: %s", types.FormatAnyValue(groupKey)),
"$group (stage)",
)
default:
panic(fmt.Sprintf("unhandled field path error %s", exprErr.Error()))
}
}

group.addOrAppend(val, doc)
var group groupMap

for _, doc := range in {
val, err := expression.Evaluate(doc)
if err != nil {
// $group treats non-existent fields as nulls
val = types.Null
}

group.addOrAppend(val, doc)
}

return group.docs, nil

default:
panic(fmt.Sprintf("unexpected type %[1]T (%#[1]v)", groupKey))
}

return group.docs, nil
return []groupedDocuments{{
groupID: g.groupExpression,
documents: in,
}}, nil
}

// groupedDocuments contains group key and the documents for that group.
Expand Down