Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for Seek on iterator and IteratorWithRange on storage #85

Merged
merged 4 commits into from
Feb 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Iterator interface {
Key() string
Value() (interface{}, error)
Release()
Seek(key string) bool
}

type iterator struct {
Expand Down Expand Up @@ -41,3 +42,7 @@ func (i *iterator) Value() (interface{}, error) {
func (i *iterator) Release() {
i.iter.Release()
}

func (i *iterator) Seek(key string) bool {
return i.iter.Seek([]byte(key))
}
11 changes: 11 additions & 0 deletions mock/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ func (_mr *_MockStorageRecorder) Iterator() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Iterator")
}

func (_m *MockStorage) IteratorWithRange(start, limit []byte) (storage.Iterator, error) {
ret := _m.ctrl.Call(_m, "Iterator")
ret0, _ := ret[0].(storage.Iterator)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockStorageRecorder) IteratorWithRange() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "IteratorWithRange")
}

func (_m *MockStorage) MarkRecovered() error {
ret := _m.ctrl.Call(_m, "MarkRecovered")
ret0, _ := ret[0].(error)
Expand Down
4 changes: 4 additions & 0 deletions storage/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (f *file) Iterator() (Iterator, error) {
return new(NullIter), nil
}

func (f *file) IteratorWithRange(start, limit []byte) (Iterator, error) {
return new(NullIter), nil
}

func (f *file) Open() error {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions storage/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ func (i *iterator) Release() {
i.iter.Release()
i.snap.Release()
}

func (i *iterator) Seek(key []byte) bool {
return i.iter.Seek(key)
}
48 changes: 41 additions & 7 deletions storage/memory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package storage

import "fmt"
import (
"bytes"
"fmt"

"github.com/syndtr/goleveldb/leveldb/util"
)

type memiter struct {
current int
Expand Down Expand Up @@ -41,13 +46,19 @@ func (i *memiter) Release() {
i.current = len(i.keys)
}

func (m *memory) Iterator() (Iterator, error) {
keys := make([]string, 0, len(m.storage))
for k := range m.storage {
keys = append(keys, k)
func (i *memiter) Seek(key []byte) bool {
seek := make(map[string][]byte)
keys := []string{}
for k, v := range i.storage {
if bytes.ContainsAny(key, k) {
keys = append(keys, k)
seek[k] = v
}
}

return &memiter{-1, keys, m.storage}, nil
i.current = -1
i.storage = seek
i.keys = keys
return !i.exhausted()
}

type memory struct {
Expand Down Expand Up @@ -87,6 +98,29 @@ func (m *memory) Delete(key string) error {
return nil
}

func (m *memory) Iterator() (Iterator, error) {
keys := make([]string, 0, len(m.storage))
for k := range m.storage {
keys = append(keys, k)
}

return &memiter{-1, keys, m.storage}, nil
}

func (m *memory) IteratorWithRange(start, limit []byte) (Iterator, error) {
keys := []string{} // using slice as keys has an unknown size
if len(limit) == 0 {
limit = util.BytesPrefix(start).Limit
}
for k := range m.storage {
if bytes.Compare([]byte(k), start) > -1 && bytes.Compare([]byte(k), limit) < 1 {
keys = append(keys, k)
}
}

return &memiter{-1, keys, m.storage}, nil
}

func (m *memory) MarkRecovered() error {
return nil
}
Expand Down
13 changes: 13 additions & 0 deletions storage/multi_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,16 @@ func (m *multiIterator) Release() {
m.current = 0
m.iters = []Iterator{&NullIter{}}
}

func (m *multiIterator) Seek(key []byte) bool {
m.current = 0
iters := []Iterator{}
ok := false
for i := range m.iters {
if m.iters[i].Seek(key) {
iters = append(iters, m.iters[i])
ok = true
}
}
return ok
}
15 changes: 14 additions & 1 deletion storage/multi_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ func TestMultiIterator(t *testing.T) {
ensure.DeepEqual(t, expected[string(iter.Key())], string(val))
count++
}

ensure.DeepEqual(t, count, len(expected))

k := []byte("storage-0")
iter = NewMultiIterator(iters)
ensure.True(t, iter.Seek(k), "seek return false should return true")
ensure.True(t, iter.Next(), "Iterator should have a value")
ensure.DeepEqual(t, iter.Key(), k, "key mismatch")

total := 1
for iter.Next() {
_, err := iter.Value()
ensure.Nil(t, err)
total++
}
ensure.DeepEqual(t, total, 3, "not enough element found in iter seek")
}
8 changes: 8 additions & 0 deletions storage/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (n *Null) Iterator() (Iterator, error) {
return new(NullIter), nil
}

// IteratorWithRange returns an Iterator that is immediately exhausted.
func (n *Null) IteratorWithRange(start, limit []byte) (Iterator, error) {
return new(NullIter), nil
}

// Open does nothing and doesn't error.
func (n *Null) Open() error {
return nil
Expand Down Expand Up @@ -86,3 +91,6 @@ func (ni *NullIter) Value() ([]byte, error) {

// Release does nothing.
func (ni *NullIter) Release() {}

// Seek do nothing
func (ni *NullIter) Seek(key []byte) bool { return false }
23 changes: 23 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Iterator interface {
// Release releases the iterator. After release, the iterator is not usable
// anymore.
Release()
// Seek for a key in the iterator
Seek(key []byte) bool
}

// Storage abstracts the interface for a persistent local storage
Expand All @@ -37,6 +39,7 @@ type Storage interface {
SetOffset(value int64) error
GetOffset(defValue int64) (int64, error)
Iterator() (Iterator, error)
IteratorWithRange(start, limit []byte) (Iterator, error)
MarkRecovered() error
Recovered() bool
Open() error
Expand Down Expand Up @@ -89,6 +92,26 @@ func (s *storage) Iterator() (Iterator, error) {
}, nil
}

// Iterator returns an iterator that traverses over a snapshot of the storage.
func (s *storage) IteratorWithRange(start, limit []byte) (Iterator, error) {
snap, err := s.db.GetSnapshot()
if err != nil {
return nil, err
}

if limit != nil && len(limit) > 0 {
return &iterator{
iter: s.store.NewIterator(&util.Range{Start: start, Limit: limit}, nil),
snap: snap,
}, nil
}
return &iterator{
iter: s.store.NewIterator(util.BytesPrefix(start), nil),
snap: snap,
}, nil

}

func (s *storage) Has(key string) (bool, error) {
return s.store.Has([]byte(key), nil)
}
Expand Down
7 changes: 7 additions & 0 deletions storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ func TestMemIter(t *testing.T) {
ensure.True(t, val == nil, "exhausted iterator should return nil value, returned %s", val)

ensure.DeepEqual(t, found, kv, "found doesn't match kv, iterator probably didn't return all values")

k := []byte("key-1")
iter, err = storage.IteratorWithRange(k, nil)

ensure.True(t, iter.Next(), "next should return true after a IteratorWithRange")
ensure.DeepEqual(t, iter.Key(), k, "the first matching key in IteratorWithRange is not corresponding to the value")

}

func TestGetHas(t *testing.T) {
Expand Down
23 changes: 23 additions & 0 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,29 @@ func (v *View) Iterator() (Iterator, error) {
}, nil
}

// IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range.
func (v *View) IteratorWithRange(start, limit string) (Iterator, error) {
iters := make([]storage.Iterator, 0, len(v.partitions))
for i := range v.partitions {
iter, err := v.partitions[i].st.IteratorWithRange([]byte(start), []byte(limit))
if err != nil {
// release already opened iterators
for i := range iters {
iters[i].Release()
}

return nil, fmt.Errorf("error opening partition iterator: %v", err)
}

iters = append(iters, iter)
}

return &iterator{
iter: storage.NewMultiIterator(iters),
codec: v.opts.tableCodec,
}, nil
}

// Evict removes the given key only from the local cache. In order to delete a
// key from Kafka and other Views, context.Delete should be used on a Processor.
func (v *View) Evict(key string) error {
Expand Down