Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pushdown simplest sorting for aggregate command #2530

Merged
merged 26 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
wip
  • Loading branch information
noisersup committed Apr 24, 2023
commit e13337b90a57a1d46d68f43884adc3f2dd26b55f
1 change: 1 addition & 0 deletions internal/handlers/pg/msg_find.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (h *Handler) MsgFind(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, er
DB: params.DB,
Collection: params.Collection,
Comment: params.Comment,
NativeSort: true,
}

// get comment from query, e.g. db.collection.find({$comment: "test"})
Expand Down
79 changes: 72 additions & 7 deletions internal/handlers/pg/pgdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"time"

"github.com/AlekSi/pointer"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/maps"

Expand All @@ -47,6 +48,7 @@ type QueryParams struct {
Collection string
Comment string
Explain bool
NativeSort bool
}

// Explain returns SQL EXPLAIN results for given query parameters.
Expand Down Expand Up @@ -132,13 +134,14 @@ func QueryDocuments(ctx context.Context, tx pgx.Tx, qp *QueryParams) (types.Docu

// iteratorParams contains parameters for building an iterator.
type iteratorParams struct {
schema string
table string
comment string
explain bool
filter *types.Document
forUpdate bool // if SELECT FOR UPDATE is needed.
unmarshal func(b []byte) (*types.Document, error) // if set, iterator uses unmarshal to convert row to *types.Document.
schema string
table string
comment string
explain bool
nativeSort bool
filter *types.Document
forUpdate bool // if SELECT FOR UPDATE is needed.
unmarshal func(b []byte) (*types.Document, error) // if set, iterator uses unmarshal to convert row to *types.Document.
}

// buildIterator returns an iterator to fetch documents for given iteratorParams.
Expand Down Expand Up @@ -172,6 +175,18 @@ func buildIterator(ctx context.Context, tx pgx.Tx, p *iteratorParams) (types.Doc
query += ` FOR UPDATE`
}

if p.nativeSort {
sort, arg, err := prepareSortClause(p.filter, asc)
if err != nil {
return nil, lazyerrors.Error(err)
}

if sort != "" {
query += sort
args = append(args, arg)
}
}

rows, err := tx.Query(ctx, query, args...)
if err != nil {
return nil, lazyerrors.Error(err)
Expand All @@ -180,6 +195,56 @@ func buildIterator(ctx context.Context, tx pgx.Tx, p *iteratorParams) (types.Doc
return newIterator(ctx, rows, p), nil
}

type order int8

const (
desc order = -1
asc order = 1
)

func prepareSortClause(sqlFilters *types.Document, o order) (string, any, error) {
iter := sqlFilters.Iterator()
defer iter.Close()

var key *string

for {
k, _, err := iter.Next()
if err != nil {
if errors.Is(err, iterator.ErrIteratorDone) {
break
}

return "", nil, lazyerrors.Error(err)
}

if key == nil {
key = pointer.ToString(k)
continue
}

if k != *key {
return "", nil, nil
}
}

if key == nil {
return "", nil, nil
}

var sqlOrder string
switch o {
case desc:
sqlOrder = " DESC"
case asc:
sqlOrder = " ASC"
default:
panic(fmt.Sprint("forbidden order:", o))
}

return " ORDER BY $1" + sqlOrder, key, nil
}

// prepareWhereClause adds WHERE clause with given filters to the query and returns the query and arguments.
func prepareWhereClause(sqlFilters *types.Document) (string, []any, error) {
var filters []string
Expand Down