Skip to content

Commit

Permalink
major kvstore interface and impl overhaul
Browse files Browse the repository at this point in the history
clarified the interface contract
  • Loading branch information
mschoch committed Sep 23, 2015
1 parent 5897997 commit 900f1b4
Show file tree
Hide file tree
Showing 68 changed files with 2,610 additions and 3,318 deletions.
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ import (
_ "github.com/blevesearch/bleve/index/store/boltdb"
_ "github.com/blevesearch/bleve/index/store/goleveldb"
_ "github.com/blevesearch/bleve/index/store/gtreap"
_ "github.com/blevesearch/bleve/index/store/inmem"

// index types
_ "github.com/blevesearch/bleve/index/upside_down"
Expand Down
4 changes: 4 additions & 0 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/blevesearch/bleve/document"
)

var ErrorUnknownStorageType = fmt.Errorf("unknown storage type")

type Index interface {
Open() error
Close() error
Expand Down Expand Up @@ -45,6 +47,8 @@ type IndexReader interface {
DocIDReader(start, end string) (DocIDReader, error)

FieldDict(field string) (FieldDict, error)

// FieldDictRange is currently defined to include the start and end terms
FieldDictRange(field string, startTerm []byte, endTerm []byte) (FieldDict, error)
FieldDictPrefix(field string, termPrefix []byte) (FieldDict, error)

Expand Down
56 changes: 20 additions & 36 deletions index/store/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,39 @@ type op struct {
}

type EmulatedBatch struct {
w KVWriter
ops []*op
merge *EmulatedMerge
Ops []*op
Merger *EmulatedMerge
}

func NewEmulatedBatch(w KVWriter, mo MergeOperator) *EmulatedBatch {
func NewEmulatedBatch(mo MergeOperator) *EmulatedBatch {
return &EmulatedBatch{
w: w,
ops: make([]*op, 0, 1000),
merge: NewEmulatedMerge(mo),
Ops: make([]*op, 0, 1000),
Merger: NewEmulatedMerge(mo),
}
}

func (b *EmulatedBatch) Set(key, val []byte) {
b.ops = append(b.ops, &op{key, val})
ck := make([]byte, len(key))
copy(ck, key)
cv := make([]byte, len(val))
copy(cv, val)
b.Ops = append(b.Ops, &op{ck, cv})
}

func (b *EmulatedBatch) Delete(key []byte) {
b.ops = append(b.ops, &op{key, nil})
ck := make([]byte, len(key))
copy(ck, key)
b.Ops = append(b.Ops, &op{ck, nil})
}

func (b *EmulatedBatch) Merge(key, val []byte) {
b.merge.Merge(key, val)
ck := make([]byte, len(key))
copy(ck, key)
cv := make([]byte, len(val))
copy(cv, val)
b.Merger.Merge(key, val)
}

func (b *EmulatedBatch) Execute() error {
// first process merges
err := b.merge.Execute(b.w)
if err != nil {
return err
}

// now apply all the ops
for _, op := range b.ops {
if op.V != nil {
err := b.w.Set(op.K, op.V)
if err != nil {
return err
}
} else {
err := b.w.Delete(op.K)
if err != nil {
return err
}
}
}
return nil
}

func (b *EmulatedBatch) Close() error {
return nil
func (b *EmulatedBatch) Reset() {
b.Ops = b.Ops[:0]
}
17 changes: 13 additions & 4 deletions index/store/boltdb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,40 @@
package boltdb

import (
"bytes"

"github.com/boltdb/bolt"
)

type Iterator struct {
store *Store
tx *bolt.Tx
cursor *bolt.Cursor
prefix []byte
start []byte
end []byte
valid bool
key []byte
val []byte
}

func (i *Iterator) SeekFirst() {
i.key, i.val = i.cursor.First()
func (i *Iterator) updateValid() {
i.valid = (i.key != nil)
if i.valid && i.prefix != nil {
i.valid = bytes.HasPrefix(i.key, i.prefix)
} else if i.end != nil {
i.valid = bytes.Compare(i.key, i.end) < 0
}
}

func (i *Iterator) Seek(k []byte) {
i.key, i.val = i.cursor.Seek(k)
i.valid = (i.key != nil)
i.updateValid()
}

func (i *Iterator) Next() {
i.key, i.val = i.cursor.Next()
i.valid = (i.key != nil)
i.updateValid()
}

func (i *Iterator) Current() ([]byte, []byte, bool) {
Expand Down
32 changes: 25 additions & 7 deletions index/store/boltdb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,44 @@ type Reader struct {
tx *bolt.Tx
}

func (r *Reader) BytesSafeAfterClose() bool {
return false
}

func (r *Reader) Get(key []byte) ([]byte, error) {
rv := r.tx.Bucket([]byte(r.store.bucket)).Get(key)
var rv []byte
v := r.tx.Bucket([]byte(r.store.bucket)).Get(key)
if v != nil {
rv = make([]byte, len(v))
copy(rv, v)
}
return rv, nil
}

func (r *Reader) Iterator(key []byte) store.KVIterator {
func (r *Reader) PrefixIterator(prefix []byte) store.KVIterator {
b := r.tx.Bucket([]byte(r.store.bucket))
cursor := b.Cursor()

rv := &Iterator{
store: r.store,
tx: r.tx,
cursor: cursor,
prefix: prefix,
}

rv.Seek(prefix)
return rv
}

func (r *Reader) RangeIterator(start, end []byte) store.KVIterator {
b := r.tx.Bucket([]byte(r.store.bucket))
cursor := b.Cursor()

rv := &Iterator{
store: r.store,
tx: r.tx,
cursor: cursor,
start: start,
end: end,
}

rv.Seek(key)
rv.Seek(start)
return rv
}

Expand Down
70 changes: 22 additions & 48 deletions index/store/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package boltdb

import (
"fmt"
"sync"

"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
Expand All @@ -24,40 +23,41 @@ type Store struct {
path string
bucket string
db *bolt.DB
writer sync.Mutex
mo store.MergeOperator
}

func New(path string, bucket string) *Store {
rv := Store{
path: path,
bucket: bucket,
func New(mo store.MergeOperator, config map[string]interface{}) (store.KVStore, error) {
path, ok := config["path"].(string)
if !ok {
return nil, fmt.Errorf("must specify path")
}
return &rv
}

func (bs *Store) Open() error {
bucket, ok := config["bucket"].(string)
if !ok {
bucket = "bleve"
}

var err error
bs.db, err = bolt.Open(bs.path, 0600, nil)
db, err := bolt.Open(path, 0600, nil)
if err != nil {
return err
return nil, err
}

err = bs.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(bs.bucket))
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(bucket))

return err
})
if err != nil {
return err
return nil, err
}

return nil
}

func (bs *Store) SetMergeOperator(mo store.MergeOperator) {
bs.mo = mo
rv := Store{
path: path,
bucket: bucket,
db: db,
mo: mo,
}
return &rv, nil
}

func (bs *Store) Close() error {
Expand All @@ -76,37 +76,11 @@ func (bs *Store) Reader() (store.KVReader, error) {
}

func (bs *Store) Writer() (store.KVWriter, error) {
bs.writer.Lock()
tx, err := bs.db.Begin(true)
if err != nil {
bs.writer.Unlock()
return nil, err
}
reader := &Reader{
store: bs,
tx: tx,
}
return &Writer{
store: bs,
tx: tx,
reader: reader,
store: bs,
}, nil
}

func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {
path, ok := config["path"].(string)
if !ok {
return nil, fmt.Errorf("must specify path")
}

bucket, ok := config["bucket"].(string)
if !ok {
bucket = "bleve"
}

return New(path, bucket), nil
}

func init() {
registry.RegisterKVStore(Name, StoreConstructor)
registry.RegisterKVStore(Name, New)
}
Loading

0 comments on commit 900f1b4

Please sign in to comment.