Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pushdown simplest sorting for aggregate command #2530

Merged
merged 26 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration/aggregate_documents_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,7 @@ func TestAggregateCompatSkip(t *testing.T) {
bson.D{{"$match", bson.D{{"v", "foo"}}}},
bson.D{{"$skip", int32(1)}},
},
resultPushdown: true, // $match after $sort can be pushed down
},
"BeforeMatch": {
pipeline: bson.A{
Expand Down
57 changes: 44 additions & 13 deletions internal/handlers/common/aggregations/stages/pushdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,56 @@ import (
"github.com/FerretDB/FerretDB/internal/util/must"
)

// GetPushdownQuery gets pushdown query for aggregation.
// When the first aggregation stage is $match, $match query is
// used for pushdown, otherwise nil is return.
func GetPushdownQuery(stagesDocs []any) *types.Document {
// GetPushdownQuery gets pushdown query ($match and $sort) for aggregation.
//
// If the first two stages are either $match, $sort, or a combination of them, we can push them down.
// In this case, we return the first match and sort statements to pushdown.
// If $match stage is not present, match is returned as nil.
// If $sort stage is not present, sort is returned as nil.
func GetPushdownQuery(stagesDocs []any) (match *types.Document, sort *types.Document) {
if len(stagesDocs) == 0 {
return nil
return
}

firstStageDoc := stagesDocs[0]
firstStage, isDoc := firstStageDoc.(*types.Document)
stagesToPushdown := []any{stagesDocs[0]}

if !isDoc || !firstStage.Has("$match") {
return nil
if len(stagesDocs) > 1 {
stagesToPushdown = append(stagesToPushdown, stagesDocs[1])
}

matchQuery := must.NotFail(firstStage.Get("$match"))
if query, isDoc := matchQuery.(*types.Document); isDoc {
return query
for _, s := range stagesToPushdown {
stage, isDoc := s.(*types.Document)

if !isDoc {
return nil, nil
}

switch {
case stage.Has("$match"):
matchQuery := must.NotFail(stage.Get("$match"))
query, isDoc := matchQuery.(*types.Document)

if !isDoc || match != nil {
continue
}

match = query

case stage.Has("$sort"):
sortQuery := must.NotFail(stage.Get("$sort"))
query, isDoc := sortQuery.(*types.Document)

if !isDoc || sort != nil {
continue
}

sort = query

default:
// not $match nor $sort, we shouldn't continue pushdown
return
}
}

return nil
return
}
10 changes: 9 additions & 1 deletion internal/handlers/pg/msg_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,19 @@ func (h *Handler) MsgAggregate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMs
// If stagesStats contains the same stages as stagesDocuments, we apply aggregation to documents fetched from the DB.
// If stagesStats contains more stages than stagesDocuments, we apply aggregation to statistics fetched from the DB.
if len(stagesStats) == len(stagesDocuments) {
filter, sort := stages.GetPushdownQuery(aggregationStages)
// only documents stages or no stages - fetch documents from the DB and apply stages to them
qp := pgdb.QueryParams{
DB: db,
Collection: collection,
Filter: stages.GetPushdownQuery(aggregationStages),
}

if !h.DisableFilterPushdown {
qp.Filter = filter
}

if h.EnableSortPushdown {
qp.Sort = sort
}

resDocs, err = processStagesDocuments(ctx, &stagesDocumentsParams{dbPool, &qp, stagesDocuments})
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (h *Handler) MsgExplain(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}
}

qp.Filter = stages.GetPushdownQuery(stagesDocs)
qp.Filter, qp.Sort = stages.GetPushdownQuery(stagesDocs)
}

if h.DisableFilterPushdown {
Expand Down
3 changes: 1 addition & 2 deletions internal/handlers/tigris/msg_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,9 @@ func (h *Handler) MsgAggregate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMs
qp := tigrisdb.QueryParams{
DB: db,
Collection: collection,
Filter: stages.GetPushdownQuery(aggregationStages),
}

qp.Filter = stages.GetPushdownQuery(aggregationStages)
qp.Filter, _ = stages.GetPushdownQuery(aggregationStages)

if resDocs, err = processStagesDocuments(ctx, &stagesDocumentsParams{
dbPool, &qp, stagesDocuments,
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/tigris/msg_explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *Handler) MsgExplain(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}
}

filter = stages.GetPushdownQuery(stagesDocs)
filter, _ = stages.GetPushdownQuery(stagesDocs)
}

if h.DisableFilterPushdown {
Expand Down