Skip to content

Commit

Permalink
feat:support iterator (lotusdblabs#153)
Browse files Browse the repository at this point in the history
  • Loading branch information
akiozihao authored Apr 13, 2024
1 parent 92e16ee commit 669e9b0
Show file tree
Hide file tree
Showing 11 changed files with 1,180 additions and 11 deletions.
99 changes: 99 additions & 0 deletions bptree.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lotusdb

import (
"bytes"
"context"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -198,3 +199,101 @@ func (bt *BPTree) Sync() error {
}
return nil
}

// bptreeIterator implement IteratorI
type bptreeIterator struct {
key []byte
value []byte
tx *bbolt.Tx
cursor *bbolt.Cursor
options IteratorOptions
}

// NewBptreeIterator
func NewBptreeIterator(tx *bbolt.Tx, options IteratorOptions) *bptreeIterator {
cursor := tx.Bucket(indexBucketName).Cursor()
return &bptreeIterator{
cursor: cursor,
options: options,
tx: tx,
}
}

// Rewind seek the first key in the iterator.
func (bi *bptreeIterator) Rewind() {
if bi.options.Reverse {
bi.key, bi.value = bi.cursor.Last()
if len(bi.options.Prefix) == 0 {
return
}
for bi.key != nil && !bytes.HasPrefix(bi.key, bi.options.Prefix) {
bi.key, bi.value = bi.cursor.Prev()
}
} else {
bi.key, bi.value = bi.cursor.First()
if len(bi.options.Prefix) == 0 {
return
}
for bi.key != nil && !bytes.HasPrefix(bi.key, bi.options.Prefix) {
bi.key, bi.value = bi.cursor.Next()
}
}
}

// Seek move the iterator to the key which is
// greater(less when reverse is true) than or equal to the specified key.
func (bi *bptreeIterator) Seek(key []byte) {
bi.key, bi.value = bi.cursor.Seek(key)
if !bytes.Equal(bi.key, key) && bi.options.Reverse {
bi.key, bi.value = bi.cursor.Prev()
}
if len(bi.options.Prefix) == 0 {
return
}
if !bytes.HasPrefix(bi.Key(), bi.options.Prefix) {
bi.Next()
}
}

// Next moves the iterator to the next key.
func (bi *bptreeIterator) Next() {
if bi.options.Reverse {
bi.key, bi.value = bi.cursor.Prev()
if len(bi.options.Prefix) == 0 {
return
}
// prefix scan
for bi.key != nil && !bytes.HasPrefix(bi.key, bi.options.Prefix) {
bi.key, bi.value = bi.cursor.Prev()
}
} else {
bi.key, bi.value = bi.cursor.Next()
if len(bi.options.Prefix) == 0 {
return
}
// prefix scan
for bi.key != nil && !bytes.HasPrefix(bi.key, bi.options.Prefix) {
bi.key, bi.value = bi.cursor.Next()
}
}
}

// Key get the current key.
func (bi *bptreeIterator) Key() []byte {
return bi.key
}

// Value get the current value.
func (bi *bptreeIterator) Value() any {
return bi.value
}

// Valid returns whether the iterator is exhausted.
func (bi *bptreeIterator) Valid() bool {
return bi.key != nil
}

// Close the iterator.
func (bi *bptreeIterator) Close() error {
return bi.tx.Rollback()
}
175 changes: 175 additions & 0 deletions bptree_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lotusdb

import (
"bytes"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -303,3 +304,177 @@ func testbptreeSync(t *testing.T, partitionNum int) {
err = bt.Sync()
assert.Nil(t, err)
}

func Test_bptreeIterator(t *testing.T) {
options := indexOptions{
indexType: BTree,
dirPath: filepath.Join(os.TempDir(), "bptree-cursorIterator"+strconv.Itoa(1)),
partitionNum: 1,
keyHashFunction: xxhash.Sum64,
}

err := os.MkdirAll(options.dirPath, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(options.dirPath)
}()
bt, err := openBTreeIndex(options)
assert.Nil(t, err)
m := map[string]*wal.ChunkPosition{
"key 0": {SegmentId: 0, BlockNumber: 0, ChunkOffset: 0, ChunkSize: 0},
"key 1": {SegmentId: 1, BlockNumber: 1, ChunkOffset: 1, ChunkSize: 1},
"key 2": {SegmentId: 2, BlockNumber: 2, ChunkOffset: 2, ChunkSize: 2},
}
m2 := map[string]*wal.ChunkPosition{
"abc 0": {SegmentId: 3, BlockNumber: 3, ChunkOffset: 3, ChunkSize: 3},
"abc 1": {SegmentId: 4, BlockNumber: 4, ChunkOffset: 4, ChunkSize: 4},
}
var keyPositions, keyPositions2 []*KeyPosition
keyPositions = append(keyPositions, &KeyPosition{
key: []byte("key 0"),
partition: 0,
position: &wal.ChunkPosition{SegmentId: 0, BlockNumber: 0, ChunkOffset: 0, ChunkSize: 0},
}, &KeyPosition{
key: []byte("key 1"),
partition: 0,
position: &wal.ChunkPosition{SegmentId: 1, BlockNumber: 1, ChunkOffset: 1, ChunkSize: 1},
}, &KeyPosition{
key: []byte("key 2"),
partition: 0,
position: &wal.ChunkPosition{SegmentId: 2, BlockNumber: 2, ChunkOffset: 2, ChunkSize: 2},
},
)

keyPositions2 = append(keyPositions, &KeyPosition{
key: []byte("abc 0"),
partition: 0,
position: &wal.ChunkPosition{SegmentId: 3, BlockNumber: 3, ChunkOffset: 3, ChunkSize: 3},
}, &KeyPosition{
key: []byte("key abc"),
partition: 0,
position: &wal.ChunkPosition{SegmentId: 4, BlockNumber: 4, ChunkOffset: 4, ChunkSize: 4},
}, &KeyPosition{
key: []byte("abc 1"),
partition: 0,
position: &wal.ChunkPosition{SegmentId: 4, BlockNumber: 4, ChunkOffset: 4, ChunkSize: 4},
})

err = bt.PutBatch(keyPositions)
assert.Nil(t, err)

tree := bt.trees[0]
tx, err := tree.Begin(true)
assert.Nil(t, err)
iteratorOptions := IteratorOptions{
Reverse: false,
}

itr := NewBptreeIterator(tx, iteratorOptions)
assert.Nil(t, err)
var prev []byte
itr.Rewind()
for itr.Valid() {
currKey := itr.Key()
assert.True(t, prev == nil || bytes.Compare(prev, currKey) == -1)
assert.Equal(t, m[string(itr.Key())].Encode(), itr.Value())
prev = currKey
itr.Next()
}
err = itr.Close()
assert.Nil(t, err)

tx, err = tree.Begin(true)
assert.Nil(t, err)
iteratorOptions = IteratorOptions{
Reverse: true,
}
prev = nil

itr = NewBptreeIterator(tx, iteratorOptions)
assert.Nil(t, err)
itr.Rewind()
for itr.Valid() {
currKey := itr.Key()
assert.True(t, prev == nil || bytes.Compare(prev, currKey) == 1)
assert.Equal(t, m[string(itr.Key())].Encode(), itr.Value())
prev = currKey
itr.Next()
}
itr.Seek([]byte("key 4"))
assert.Equal(t, []byte("key 2"), itr.Key())

itr.Seek([]byte("key 2"))
assert.Equal(t, []byte("key 2"), itr.Key())

itr.Seek([]byte("aye 2"))
assert.False(t, itr.Valid())
err = itr.Close()
assert.Nil(t, err)

tx, err = tree.Begin(true)
assert.Nil(t, err)
iteratorOptions = IteratorOptions{
Reverse: false,
}
prev = nil

itr = NewBptreeIterator(tx, iteratorOptions)
assert.Nil(t, err)
itr.Rewind()
for itr.Valid() {
currKey := itr.Key()
assert.True(t, prev == nil || bytes.Compare(prev, currKey) == -1)
assert.Equal(t, m[string(itr.Key())].Encode(), itr.Value())
prev = currKey
itr.Next()
}

itr.Seek([]byte("key 0"))
assert.Equal(t, []byte("key 0"), itr.Key())
itr.Seek([]byte("key 4"))
assert.False(t, itr.Valid())

itr.Seek([]byte("aye 2"))
assert.Equal(t, []byte("key 0"), itr.Key())
err = itr.Close()
assert.Nil(t, err)

// prefix
err = bt.PutBatch(keyPositions2)
assert.Nil(t, err)

tx, err = tree.Begin(true)
assert.Nil(t, err)
iteratorOptions = IteratorOptions{
Reverse: false,
Prefix: []byte("not valid"),
}

itr = NewBptreeIterator(tx, iteratorOptions)
assert.Nil(t, err)
itr.Rewind()
assert.False(t, itr.Valid())
err = itr.Close()
assert.Nil(t, err)

tx, err = tree.Begin(true)
assert.Nil(t, err)
iteratorOptions = IteratorOptions{
Reverse: false,
Prefix: []byte("abc"),
}

itr = NewBptreeIterator(tx, iteratorOptions)
assert.Nil(t, err)
itr.Rewind()
assert.True(t, itr.Valid())

for itr.Valid() {
assert.True(t, bytes.HasPrefix(itr.Key(), iteratorOptions.Prefix))
assert.Equal(t, m2[string(itr.Key())].Encode(), itr.Value())
itr.Next()
}
err = itr.Close()
assert.Nil(t, err)

}
Loading

0 comments on commit 669e9b0

Please sign in to comment.