Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition ranges, covering indexes, smarter iterators #1116

Merged
merged 13 commits into from
Dec 16, 2020
Prev Previous commit
Next Next commit
test new map iterator for indexes
  • Loading branch information
Brian Hendriks committed Dec 12, 2020
commit 7bef098c2120f4057056a8e7b90c2aa828fa00bb
128 changes: 128 additions & 0 deletions go/libraries/doltcore/sqle/dolt_map_iter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package sqle
bheni marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"io"

"github.com/dolthub/go-mysql-server/sql"

"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/types"
)

type KVToSqlRowConverter struct {
tagToSqlColIdx map[uint64]int
cols []schema.Column
rowSize int
}

func (conv *KVToSqlRowConverter) ConvertKVToSqlRow(k, v types.Value) (sql.Row, error) {
keyTup := k.(types.Tuple)
bheni marked this conversation as resolved.
Show resolved Hide resolved
var valTup types.Tuple
if !types.IsNull(v) {
valTup = v.(types.Tuple)
}

cols := make([]interface{}, conv.rowSize)
filled, err := conv.processTuple(cols, 0, keyTup)
if err != nil {
return nil, err
}

if !valTup.Empty() {
filled, err = conv.processTuple(cols, filled, valTup)
if err != nil {
return nil, err
}
}

return cols, err
}

func (conv *KVToSqlRowConverter) processTuple(cols []interface{}, filled int, tup types.Tuple) (int, error) {
tupItr, err := tup.Iterator()

if err != nil {
return 0, err
}

for filled < len(conv.tagToSqlColIdx) {
bheni marked this conversation as resolved.
Show resolved Hide resolved
_, tag, err := tupItr.Next()

if err != nil {
return 0, err
}

if sqlColIdx, ok := conv.tagToSqlColIdx[uint64(tag.(types.Uint))]; !ok {
err = tupItr.Skip()

if err != nil {
return 0, err
}
} else {
_, val, err := tupItr.Next()

if err != nil {
return 0, err
}

cols[sqlColIdx], err = conv.cols[sqlColIdx].TypeInfo.ConvertNomsValueToValue(val)

if err != nil {
return 0, err
}

filled++
}
}

return filled, nil
}

type KVGetFunc func(ctx context.Context) (types.Value, types.Value, error)

type DoltMapIter struct {
kvGet KVGetFunc
conv KVToSqlRowConverter
}

func NewDoltMapIterFromNomsMapItr(mapItr types.MapIterator, cols []schema.Column) *DoltMapIter {
getFunc := func(ctx context.Context) (types.Value, types.Value, error) {
k, v, err := mapItr.Next(ctx)

if err != nil {
return nil, nil, err
} else if k == nil {
return nil, nil, io.EOF
}

return k, v, nil
}

return NewDoltMapIter(getFunc, cols)
}

func NewDoltMapIter(keyValGet KVGetFunc, cols []schema.Column) *DoltMapIter {
tagToSqlColIdx := make(map[uint64]int)
for i, col := range cols {
tagToSqlColIdx[col.Tag] = i
}

return &DoltMapIter{
kvGet: keyValGet,
conv: KVToSqlRowConverter{
tagToSqlColIdx: tagToSqlColIdx,
cols: cols,
},
}
}

func (dmi *DoltMapIter) Next(ctx context.Context) (sql.Row, error) {
k, v, err := dmi.kvGet(ctx)

if err != nil {
return nil, err
}

return dmi.conv.ConvertKVToSqlRow(k, v)
}
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
package enginetest

import (
"github.com/dolthub/dolt/go/libraries/utils/set"
"testing"

"github.com/dolthub/go-mysql-server/enginetest"
"github.com/dolthub/go-mysql-server/sql"

"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/utils/set"
)

func init() {
Expand Down
24 changes: 8 additions & 16 deletions go/libraries/doltcore/sqle/index_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
package sqle

import (
"context"
"errors"
"fmt"

"github.com/dolthub/go-mysql-server/sql"

"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/lookup"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/store/types"
)

type IndexLookupKeyIterator interface {
Expand Down Expand Up @@ -156,26 +158,16 @@ func (il *doltIndexLookup) RowIterForRanges(ctx *sql.Context, ranges []lookup.Ra
readRanges[i] = lookupRange.ToReadRange()
}

idxItr := &doltIndexKeyIter{indexMapIter: noms.NewNomsRangeReader(il.idx.IndexSchema(), il.idx.IndexRowData(), readRanges)}
nrr := noms.NewNomsRangeReader(il.idx.IndexSchema(), il.idx.IndexRowData(), readRanges)

covers := il.indexCoversCols(columns)
if covers {
return NewCoveringIndexRowIterAdapter(ctx, il.idx, idxItr), nil
return NewCoveringIndexRowIterAdapter(ctx, il.idx, nrr, columns), nil
} else {
return NewIndexLookupRowIterAdapter(ctx, il.idx, idxItr), nil
return nil, errors.New("fix me") //NewIndexLookupRowIterAdapter(ctx, il.idx, idxItr), nil
}
}

type doltIndexKeyIter struct {
indexMapIter table.TableReadCloser
}

var _ IndexLookupKeyIterator = (*doltIndexKeyIter)(nil)

func (iter *doltIndexKeyIter) NextKey(ctx *sql.Context) (row.TaggedValues, error) {
indexRow, err := iter.indexMapIter.ReadRow(ctx)
if err != nil {
return nil, err
}
return row.GetTaggedVals(indexRow)
type nomsKeyIter interface {
ReadKey(ctx context.Context) (types.Value, error)
}
59 changes: 29 additions & 30 deletions go/libraries/doltcore/sqle/index_row_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ package sqle

import (
"context"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"io"
"runtime"

"github.com/dolthub/go-mysql-server/sql"

"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/utils/async"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/types"

"github.com/dolthub/go-mysql-server/sql"
)

type indexLookupRowIterAdapter struct {
Expand Down Expand Up @@ -154,54 +155,52 @@ func (i *indexLookupRowIterAdapter) processKey(_ context.Context, valInt interfa

type coveringIndexRowIterAdapter struct {
idx DoltIndex
keyIter IndexLookupKeyIterator
keyIter nomsKeyIter
conv KVToSqlRowConverter
ctx *sql.Context
pkCols *schema.ColCollection
pkCols *schema.ColCollection
nonPKCols *schema.ColCollection
nbf *types.NomsBinFormat
}

func NewCoveringIndexRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter IndexLookupKeyIterator) *coveringIndexRowIterAdapter {
func NewCoveringIndexRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter nomsKeyIter, resultCols []string) *coveringIndexRowIterAdapter {
sch := idx.Schema()
cols := sch.GetAllCols().GetColumns()
tagToSqlColIdx := make(map[uint64]int)

resultColSet := set.NewStrSet(resultCols)
for i, col := range cols {
if resultColSet.Contains(col.Name) {
tagToSqlColIdx[col.Tag] = i
}
}

return &coveringIndexRowIterAdapter{
idx: idx,
keyIter: keyIter,
ctx: ctx,
pkCols: sch.GetPKCols(),
conv: KVToSqlRowConverter{
tagToSqlColIdx: tagToSqlColIdx,
cols: cols,
rowSize: len(cols),
},
ctx: ctx,
pkCols: sch.GetPKCols(),
nonPKCols: sch.GetNonPKCols(),
nbf: idx.TableData().Format(),
nbf: idx.TableData().Format(),
}
}

// Next returns the next row from the iterator.
func (ci *coveringIndexRowIterAdapter) Next() (sql.Row, error) {
taggedVals, err := ci.keyIter.NextKey(ci.ctx)
key, err := ci.keyIter.ReadKey(ci.ctx)

if err != nil {
return nil, err
}

pk, err := taggedVals.NomsTupleForPKCols(ci.nbf, ci.pkCols).Value(ci.ctx)

if err != nil {
return nil, err
}

nonPK, err := taggedVals.NomsTupleForNonPKCols(ci.nbf, ci.nonPKCols).Value(ci.ctx)

if err != nil {
return nil, err
}

r, err := row.FromNoms(ci.idx.Schema(), pk.(types.Tuple), nonPK.(types.Tuple))

if err != nil {
return nil, err
}

return sqlutil.DoltRowToSqlRow(r, ci.idx.Schema())
return ci.conv.ConvertKVToSqlRow(key, nil)
}

func (ci *coveringIndexRowIterAdapter) Close() error {
return nil
}
}
16 changes: 8 additions & 8 deletions go/libraries/doltcore/sqle/indexed_dolt_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package sqle
import (
"encoding/binary"
"errors"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/lookup"
"github.com/dolthub/go-mysql-server/sql"
"io"
"sync"

"github.com/dolthub/go-mysql-server/sql"

"github.com/dolthub/dolt/go/libraries/doltcore/sqle/lookup"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
)

Expand Down Expand Up @@ -64,10 +65,9 @@ func (idt *IndexedDoltTable) PartitionRows(ctx *sql.Context, _ sql.Partition) (s
return idt.indexLookup.RowIter(ctx)
}


type rangePartition struct {
partitionRange lookup.Range
keyBytes []byte
keyBytes []byte
}

func (rp rangePartition) Key() []byte {
Expand All @@ -76,15 +76,15 @@ func (rp rangePartition) Key() []byte {

type rangePartitionIter struct {
ranges []lookup.Range
curr int
mu *sync.Mutex
curr int
mu *sync.Mutex
}

func NewRangePartitionIter(ranges []lookup.Range) *rangePartitionIter {
return &rangePartitionIter{
ranges: ranges,
curr: 0,
mu: &sync.Mutex{},
curr: 0,
mu: &sync.Mutex{},
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/sqlutil/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,4 @@ func GetColNamesFromSqlSchema(sqlSch sql.Schema) []string {
}

return colNames
}
}
Loading