Skip to content

Commit

Permalink
Implement pushdown for aggregate (#3607)
Browse files Browse the repository at this point in the history
  • Loading branch information
noisersup authored Oct 23, 2023
1 parent 05a2ee3 commit 0a526c0
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 18 deletions.
10 changes: 1 addition & 9 deletions integration/aggregate_documents_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ func TestAggregateCompatOptions(t *testing.T) {
}

func TestAggregateCompatStages(t *testing.T) {
setup.SkipForPostgreSQL(t, "https://github.com/FerretDB/FerretDB/issues/3520")

t.Parallel()

testCases := map[string]aggregateStagesCompatTestCase{
Expand Down Expand Up @@ -904,8 +902,6 @@ func TestAggregateCompatGroupCount(t *testing.T) {
}

func TestAggregateCompatLimit(t *testing.T) {
setup.SkipForPostgreSQL(t, "https://github.com/FerretDB/FerretDB/issues/3520")

t.Parallel()

testCases := map[string]aggregateStagesCompatTestCase{
Expand Down Expand Up @@ -1247,8 +1243,6 @@ func TestAggregateCompatGroupSum(t *testing.T) {
}

func TestAggregateCompatMatch(t *testing.T) {
setup.SkipForPostgreSQL(t, "https://github.com/FerretDB/FerretDB/issues/3520")

t.Parallel()

// TODO https://github.com/FerretDB/FerretDB/issues/2291
Expand All @@ -1257,7 +1251,7 @@ func TestAggregateCompatMatch(t *testing.T) {
testCases := map[string]aggregateStagesCompatTestCase{
"ID": {
pipeline: bson.A{bson.D{{"$match", bson.D{{"_id", "string"}}}}},
resultPushdown: pgPushdown,
resultPushdown: allPushdown,
},
"Int": {
pipeline: bson.A{
Expand Down Expand Up @@ -1510,8 +1504,6 @@ func TestAggregateCompatUnwind(t *testing.T) {
}

func TestAggregateCompatSkip(t *testing.T) {
setup.SkipForPostgreSQL(t, "https://github.com/FerretDB/FerretDB/issues/3520")

t.Parallel()

testCases := map[string]aggregateStagesCompatTestCase{
Expand Down
4 changes: 2 additions & 2 deletions internal/backends/postgresql/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ func (c *collection) Explain(ctx context.Context, params *backends.ExplainParams

res.QueryPushdown = where != ""

q += where

if params.Sort != nil {
var sort string
var sortArgs []any
Expand All @@ -330,8 +332,6 @@ func (c *collection) Explain(ctx context.Context, params *backends.ExplainParams
res.SortPushdown = sort != ""
}

q += where

if params.Limit != 0 {
q += fmt.Sprintf(` LIMIT %s`, placeholder.Next())
args = append(args, params.Limit)
Expand Down
34 changes: 30 additions & 4 deletions internal/handlers/sqlite/msg_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,34 @@ func (h *Handler) MsgAggregate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMs
var iter iterator.Interface[struct{}, *types.Document]

if len(collStatsDocuments) == len(stagesDocuments) {
// TODO https://github.com/FerretDB/FerretDB/issues/3235
// TODO https://github.com/FerretDB/FerretDB/issues/3181
iter, err = processStagesDocuments(ctx, closer, &stagesDocumentsParams{c, stagesDocuments})
filter, sort := aggregations.GetPushdownQuery(aggregationStages)

// only documents stages or no stages - fetch documents from the DB and apply stages to them
qp := new(backends.QueryParams)

if !h.DisableFilterPushdown {
qp.Filter = filter
}

// Skip sorting if there are more than one sort parameters
if h.EnableSortPushdown && sort.Len() == 1 {
var order types.SortType

k := sort.Keys()[0]
v := sort.Values()[0]

order, err = common.GetSortType(k, v)
if err != nil {
return nil, err
}

qp.Sort = &backends.SortField{
Key: k,
Descending: order == types.Descending,
}
}

iter, err = processStagesDocuments(ctx, closer, &stagesDocumentsParams{c, qp, stagesDocuments})
} else {
// TODO https://github.com/FerretDB/FerretDB/issues/2423
statistics := stages.GetStatistics(collStatsDocuments)
Expand Down Expand Up @@ -323,12 +348,13 @@ func (h *Handler) MsgAggregate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMs
// stagesDocumentsParams contains the parameters for processStagesDocuments.
type stagesDocumentsParams struct {
c backends.Collection
qp *backends.QueryParams
stages []aggregations.Stage
}

// processStagesDocuments retrieves the documents from the database and then processes them through the stages.
func processStagesDocuments(ctx context.Context, closer *iterator.MultiCloser, p *stagesDocumentsParams) (types.DocumentsIterator, error) { //nolint:lll // for readability
queryRes, err := p.c.Query(ctx, nil)
queryRes, err := p.c.Query(ctx, p.qp)
if err != nil {
closer.Close()
return nil, lazyerrors.Error(err)
Expand Down
18 changes: 15 additions & 3 deletions internal/handlers/sqlite/msg_explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/FerretDB/FerretDB/build/version"
"github.com/FerretDB/FerretDB/internal/backends"
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/common/aggregations"
"github.com/FerretDB/FerretDB/internal/handlers/commonerrors"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
Expand Down Expand Up @@ -78,9 +79,12 @@ func (h *Handler) MsgExplain(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, lazyerrors.Error(err)
}

var qp backends.ExplainParams
if !h.DisableFilterPushdown {
qp.Filter = params.Filter
qp := backends.ExplainParams{
Filter: params.Filter,
}

if params.Aggregate {
qp.Filter, params.Sort = aggregations.GetPushdownQuery(params.StagesDocs)
}

// Skip sorting if there are more than one sort parameters
Expand Down Expand Up @@ -110,6 +114,14 @@ func (h *Handler) MsgExplain(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
qp.Limit = params.Limit
}

if h.DisableFilterPushdown {
qp.Filter = nil
}

if !h.EnableSortPushdown {
qp.Sort = nil
}

res, err := coll.Explain(ctx, &qp)
if err != nil {
return nil, lazyerrors.Error(err)
Expand Down

0 comments on commit 0a526c0

Please sign in to comment.