Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition ranges, covering indexes, smarter iterators #1116

Merged
merged 13 commits into from
Dec 16, 2020
Prev Previous commit
Next Next commit
index optimizations and fixes
  • Loading branch information
Brian Hendriks committed Dec 15, 2020
commit 30b3485dba51ae1f60da7aaf2507bc49f20af046
29 changes: 19 additions & 10 deletions go/libraries/doltcore/sqle/dolt_map_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ type KVToSqlRowConverter struct {
rowSize int
}

func NewKVToSqlRowConverterForCols(cols []schema.Column) *KVToSqlRowConverter {
tagToSqlColIdx := make(map[uint64]int)
for i, col := range cols {
tagToSqlColIdx[col.Tag] = i
}

return &KVToSqlRowConverter{
tagToSqlColIdx: tagToSqlColIdx,
cols: cols,
rowSize: len(cols),
}
}

func (conv *KVToSqlRowConverter) ConvertKVToSqlRow(k, v types.Value) (sql.Row, error) {
keyTup := k.(types.Tuple)
bheni marked this conversation as resolved.
Show resolved Hide resolved
var valTup types.Tuple
Expand Down Expand Up @@ -53,6 +66,10 @@ func (conv *KVToSqlRowConverter) processTuple(cols []interface{}, filled int, tu
return 0, err
}

if tag == nil {
break
}

if sqlColIdx, ok := conv.tagToSqlColIdx[uint64(tag.(types.Uint))]; !ok {
err = tupItr.Skip()

Expand Down Expand Up @@ -83,7 +100,7 @@ type KVGetFunc func(ctx context.Context) (types.Value, types.Value, error)

type DoltMapIter struct {
kvGet KVGetFunc
conv KVToSqlRowConverter
conv *KVToSqlRowConverter
}

func NewDoltMapIterFromNomsMapItr(mapItr types.MapIterator, cols []schema.Column) *DoltMapIter {
Expand All @@ -103,17 +120,9 @@ func NewDoltMapIterFromNomsMapItr(mapItr types.MapIterator, cols []schema.Column
}

func NewDoltMapIter(keyValGet KVGetFunc, cols []schema.Column) *DoltMapIter {
tagToSqlColIdx := make(map[uint64]int)
for i, col := range cols {
tagToSqlColIdx[col.Tag] = i
}

return &DoltMapIter{
kvGet: keyValGet,
conv: KVToSqlRowConverter{
tagToSqlColIdx: tagToSqlColIdx,
cols: cols,
},
conv: NewKVToSqlRowConverterForCols(cols),
}
}

Expand Down
14 changes: 14 additions & 0 deletions go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ func init() {
sqle.MinRowsPerPartition = 2
}

func limitTestQueriesTo(queries ...string) {
querySet := set.NewStrSet(queries)

var broken []enginetest.QueryTest
for _, t := range enginetest.QueryTests {
if querySet.Contains(t.Query) {
broken = append(broken, t)
}
}

enginetest.QueryTests = broken
}

func TestQueries(t *testing.T) {
// limitTestQueriesTo(...) // whitelist queries you want run.
enginetest.TestQueries(t, newDoltHarness(t))
}

Expand Down
5 changes: 2 additions & 3 deletions go/libraries/doltcore/sqle/index_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package sqle

import (
"context"
"errors"
"fmt"

"github.com/dolthub/go-mysql-server/sql"
Expand Down Expand Up @@ -140,7 +139,7 @@ func (il *doltIndexLookup) indexCoversCols(cols []string) bool {
return false
}

idxCols := il.idx.IndexSchema().GetAllCols()
idxCols := il.idx.IndexSchema().GetPKCols()
covers := true
for _, colName := range cols {
if _, ok := idxCols.GetByName(colName); !ok {
Expand All @@ -164,7 +163,7 @@ func (il *doltIndexLookup) RowIterForRanges(ctx *sql.Context, ranges []lookup.Ra
if covers {
return NewCoveringIndexRowIterAdapter(ctx, il.idx, nrr, columns), nil
} else {
return nil, errors.New("fix me") //NewIndexLookupRowIterAdapter(ctx, il.idx, idxItr), nil
return NewIndexLookupRowIterAdapter(ctx, il.idx, nrr), nil
}
}

Expand Down
76 changes: 60 additions & 16 deletions go/libraries/doltcore/sqle/index_row_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,37 @@ import (

"github.com/dolthub/go-mysql-server/sql"

"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/utils/async"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/types"
)

type indexLookupRowIterAdapter struct {
idx DoltIndex
keyIter IndexLookupKeyIterator
keyIter nomsKeyIter
pkTags *set.Uint64Set
conv *KVToSqlRowConverter
ctx *sql.Context
rowChan chan sql.Row
err error
buffer []sql.Row
}

type keyPos struct {
key row.TaggedValues
key types.Tuple
position int
}

// NewIndexLookupRowIterAdapter returns a new indexLookupRowIterAdapter.
func NewIndexLookupRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter IndexLookupKeyIterator) *indexLookupRowIterAdapter {
func NewIndexLookupRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter nomsKeyIter) *indexLookupRowIterAdapter {
pkTags := set.NewUint64Set(idx.Schema().GetPKCols().Tags)
conv := NewKVToSqlRowConverterForCols(idx.Schema().GetAllCols().GetColumns())
iter := &indexLookupRowIterAdapter{
idx: idx,
keyIter: keyIter,
conv: conv,
pkTags: pkTags,
ctx: ctx,
rowChan: make(chan sql.Row, runtime.NumCPU()*10),
buffer: make([]sql.Row, runtime.NumCPU()*5),
Expand Down Expand Up @@ -82,13 +86,13 @@ func (i *indexLookupRowIterAdapter) queueRows() {
shouldBreak := false
pos := 0
for ; pos < len(i.buffer); pos++ {
var indexKey row.TaggedValues
indexKey, err = i.keyIter.NextKey(i.ctx)
var indexKey types.Value
indexKey, err = i.keyIter.ReadKey(i.ctx)
if err != nil {
break
}
exec.Execute(keyPos{
key: indexKey,
key: indexKey.(types.Tuple),
position: pos,
})
}
Expand Down Expand Up @@ -121,13 +125,54 @@ func (i *indexLookupRowIterAdapter) queueRows() {
}
}

func (i *indexLookupRowIterAdapter) indexKeyToTableKey(nbf *types.NomsBinFormat, indexKey types.Tuple) (types.Value, error){
tplItr, err := indexKey.Iterator()

if err != nil {
return nil, err
}

var resVals []types.Value
for {
_, tagVal, err := tplItr.Next()

if err != nil {
return nil, err
}

if tagVal == nil {
break
}

tag := uint64(tagVal.(types.Uint))

if i.pkTags.Contains(tag) {
_, valVal, err := tplItr.Next()

if err != nil {
return nil, err
}

resVals = append(resVals, tagVal, valVal)
} else {
err := tplItr.Skip()

if err != nil {
return nil, err
}
}
}

return types.NewTuple(nbf, resVals...)
}

// processKey is called within queueRows and processes each key, sending the resulting row to the row channel.
func (i *indexLookupRowIterAdapter) processKey(_ context.Context, valInt interface{}) error {
val := valInt.(keyPos)

tableData := i.idx.TableData()
pkTuple := val.key.NomsTupleForPKCols(tableData.Format(), i.idx.Schema().GetPKCols())
pkTupleVal, err := pkTuple.Value(i.ctx)
pkTupleVal, err := i.indexKeyToTableKey(tableData.Format(), val.key)

if err != nil {
return err
}
Expand All @@ -136,19 +181,16 @@ func (i *indexLookupRowIterAdapter) processKey(_ context.Context, valInt interfa
if err != nil {
return err
}

if fieldsVal == nil {
return nil
}

r, err := row.FromNoms(i.idx.Schema(), pkTupleVal.(types.Tuple), fieldsVal.(types.Tuple))
sqlRow, err := i.conv.ConvertKVToSqlRow(pkTupleVal, fieldsVal)
if err != nil {
return err
}

sqlRow, err := sqlutil.DoltRowToSqlRow(r, i.idx.Schema())
if err != nil {
return err
}
i.buffer[val.position] = sqlRow
return nil
}
Expand All @@ -164,13 +206,15 @@ type coveringIndexRowIterAdapter struct {
}

func NewCoveringIndexRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter nomsKeyIter, resultCols []string) *coveringIndexRowIterAdapter {
idxCols := idx.IndexSchema().GetPKCols()
sch := idx.Schema()
cols := sch.GetAllCols().GetColumns()
tagToSqlColIdx := make(map[uint64]int)

resultColSet := set.NewStrSet(resultCols)
for i, col := range cols {
if resultColSet.Contains(col.Name) {
_, partOfIdxKey := idxCols.GetByName(col.Name)
if partOfIdxKey && resultColSet.Contains(col.Name) {
tagToSqlColIdx[col.Tag] = i
}
}
Expand Down
6 changes: 3 additions & 3 deletions go/store/store_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package store
bheni marked this conversation as resolved.
Show resolved Hide resolved

import (
/*import (
bheni marked this conversation as resolved.
Show resolved Hide resolved
"context"
"math/rand"
"os"
Expand All @@ -23,7 +23,7 @@ const (

func poe(err error) {
if err != nil {
poe(err)
panic(err)
}
}

Expand Down Expand Up @@ -168,4 +168,4 @@ func BenchmarkSimulatedCoveringIndex(b *testing.B) {
}
}
}
}
}*/