Skip to content

Commit

Permalink
fix vlog compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Mar 20, 2022
1 parent be45d89 commit f2fc1c1
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 103 deletions.
9 changes: 6 additions & 3 deletions cf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/flower-corp/lotusdb/index"
"github.com/flower-corp/lotusdb/logfile"
Expand Down Expand Up @@ -194,10 +195,9 @@ func (cf *ColumnFamily) Get(key []byte) ([]byte, error) {
indexMeta, err := cf.indexer.Get(key)
if err != nil {
return nil, err
} else if indexMeta == nil {
}
if indexMeta == nil {
return nil, nil
} else if len(indexMeta.Value) != 0 {
return indexMeta.Value, nil
}

// get value from value log.
Expand All @@ -206,6 +206,9 @@ func (cf *ColumnFamily) Get(key []byte) ([]byte, error) {
if err != nil {
return nil, err
}
if ent.ExpiredAt != 0 && ent.ExpiredAt <= time.Now().Unix() {
return nil, nil
}
if len(ent.Value) != 0 {
return ent.Value, nil
}
Expand Down
66 changes: 40 additions & 26 deletions flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,33 +55,47 @@ func (cf *ColumnFamily) listenAndFlush() {
case table := <-cf.flushChn:
var nodes []*index.IndexerNode
var deletedKeys [][]byte
iter := table.sklIter
// iterate and write data to bptree and value log(if any).
for table.sklIter.SeekToFirst(); iter.Valid(); iter.Next() {
node := &index.IndexerNode{Key: iter.Key()}
mv := decodeMemValue(iter.Value())
key := iter.Key()

// delete invalid keys from indexer.
if mv.typ == byte(logfile.TypeDelete) || (mv.expiredAt != 0 && mv.expiredAt <= time.Now().Unix()) {
deletedKeys = append(deletedKeys, key)
} else {
if len(mv.value) >= cf.opts.ValueThreshold {
valuePos, esize, err := cf.vlog.Write(&logfile.LogEntry{Key: key, Value: mv.value})
if err != nil {
logger.Errorf("write to value log err.%+v", err)
return
}
node.Meta = &index.IndexerMeta{
Fid: valuePos.Fid,
Offset: valuePos.Offset,
EntrySize: esize,
}
iterateTable := func() error {
table.Lock()
defer table.Unlock()
iter := table.sklIter
// iterate and write data to bptree and value log(if any).
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
key := iter.Key()
node := &index.IndexerNode{Key: key}
mv := decodeMemValue(iter.Value())

// delete invalid keys from indexer.
if mv.typ == byte(logfile.TypeDelete) || (mv.expiredAt != 0 && mv.expiredAt <= time.Now().Unix()) {
deletedKeys = append(deletedKeys, key)
} else {
node.Meta = &index.IndexerMeta{Value: mv.value}
if len(mv.value) >= cf.opts.ValueThreshold {
valuePos, esize, err := cf.vlog.Write(&logfile.LogEntry{
Key: key,
Value: mv.value,
ExpiredAt: mv.expiredAt,
})
if err != nil {
return err
}
node.Meta = &index.IndexerMeta{
Fid: valuePos.Fid,
Offset: valuePos.Offset,
EntrySize: esize,
}
} else {
node.Meta = &index.IndexerMeta{Value: mv.value}
}
nodes = append(nodes, node)
}
nodes = append(nodes, node)
}
return nil
}

if err := iterateTable(); err != nil {
logger.Errorf("listenAndFlush: handle iterate table err.%+v", err)
return
}
if err := cf.flushUpdateIndex(nodes, deletedKeys); err != nil {
logger.Errorf("listenAndFlush: update index err.%+v", err)
Expand Down Expand Up @@ -111,12 +125,12 @@ func (cf *ColumnFamily) flushUpdateIndex(nodes []*index.IndexerNode, keys [][]by
cf.flushLock.Lock()
defer cf.flushLock.Unlock()
// must put and delete in batch.
putOpts := index.PutOptions{SendDiscard: true}
if _, err := cf.indexer.PutBatch(nodes, putOpts); err != nil {
writeOpts := index.WriteOptions{SendDiscard: true}
if _, err := cf.indexer.PutBatch(nodes, writeOpts); err != nil {
return err
}
if len(keys) > 0 {
if err := cf.indexer.DeleteBatch(keys); err != nil {
if err := cf.indexer.DeleteBatch(keys, writeOpts); err != nil {
return err
}
}
Expand Down
10 changes: 6 additions & 4 deletions index/bptree.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (b *BPTree) Put(key, value []byte) (err error) {
// The offset marks the transaction write position of the current batch.
// If this function fails during execution, we can write again from the offset position.
// If offset == len(kv) - 1 , all writes are successful.
func (b *BPTree) PutBatch(nodes []*IndexerNode, opts PutOptions) (offset int, err error) {
func (b *BPTree) PutBatch(nodes []*IndexerNode, opts WriteOptions) (offset int, err error) {
batchLoopNum := len(nodes) / b.opts.BatchSize
if len(nodes)%b.opts.BatchSize > 0 {
batchLoopNum++
Expand Down Expand Up @@ -162,7 +162,7 @@ func (b *BPTree) PutBatch(nodes []*IndexerNode, opts PutOptions) (offset int, er
}

// DeleteBatch delete data in batch.
func (b *BPTree) DeleteBatch(keys [][]byte) error {
func (b *BPTree) DeleteBatch(keys [][]byte, opts WriteOptions) error {
batchLoopNum := len(keys) / b.opts.BatchSize
if len(keys)%b.opts.BatchSize > 0 {
batchLoopNum++
Expand All @@ -185,14 +185,16 @@ func (b *BPTree) DeleteBatch(keys [][]byte) error {
if oldVal, err := bucket.Delete(keys[itemIdx]); err != nil {
_ = tx.Rollback()
return err
} else if len(oldVal) > 0 {
} else if len(oldVal) > 0 && opts.SendDiscard {
oldValues = append(oldValues, oldVal)
}
}
if err := tx.Commit(); err != nil {
return err
}
b.sendDiscard(oldValues)
if opts.SendDiscard {
b.sendDiscard(oldValues)
}
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions index/bptree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestBPTree_PutBatch(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := tt.fields.tree
gotOffset, err := b.PutBatch(tt.args.nodes, PutOptions{})
gotOffset, err := b.PutBatch(tt.args.nodes, WriteOptions{})
if (err != nil) != tt.wantErr {
t.Errorf("PutBatch() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestBPTree_DeleteBatch(t *testing.T) {
Meta: &IndexerMeta{Value: GetValue16B()},
})
}
_, err = tree.PutBatch(nodes, PutOptions{})
_, err = tree.PutBatch(nodes, WriteOptions{})
assert.Nil(t, err)

getKeys := func(nums int) [][]byte {
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestBPTree_DeleteBatch(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := tt.fields.tree
if err := b.DeleteBatch(tt.args.keys); (err != nil) != tt.wantErr {
if err := b.DeleteBatch(tt.args.keys, WriteOptions{}); (err != nil) != tt.wantErr {
t.Errorf("DeleteBatch() error = %v, wantErr %v", err, tt.wantErr)
}

Expand Down
8 changes: 4 additions & 4 deletions index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,22 @@ type IndexerMeta struct {
EntrySize int
}

// PutOptions options for put or put batch.
type PutOptions struct {
// WriteOptions options for updates batch.
type WriteOptions struct {
SendDiscard bool
}

// Indexer index data are stored in indexer.
type Indexer interface {
Put(key []byte, value []byte) (err error)

PutBatch(kv []*IndexerNode, opts PutOptions) (offset int, err error)
PutBatch(kv []*IndexerNode, opts WriteOptions) (offset int, err error)

Get(key []byte) (meta *IndexerMeta, err error)

Delete(key []byte) error

DeleteBatch(keys [][]byte) error
DeleteBatch(keys [][]byte, opts WriteOptions) error

Sync() error

Expand Down
4 changes: 4 additions & 0 deletions memtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/flower-corp/lotusdb/logfile"
"github.com/flower-corp/lotusdb/logger"
"io"
"sync"
"sync/atomic"
"time"
)
Expand All @@ -19,6 +20,7 @@ type (
// Once a memtable is full(memtable has its threshold, see MemtableSize in options), it becomes immutable and replaced by a new memtable.
// A background goroutine will flush the content of memtable into indexer or vlog, after which the memtable can be deleted.
memtable struct {
sync.RWMutex
sklIter *arenaskl.Iterator
skl *arenaskl.Skiplist
wal *logfile.LogFile
Expand Down Expand Up @@ -144,6 +146,8 @@ func (mt *memtable) put(key []byte, value []byte, deleted bool, opts WriteOption
// get value from memtable.
// if the specified key is marked as deleted or expired, a true bool value is returned.
func (mt *memtable) get(key []byte) (bool, []byte) {
mt.Lock()
defer mt.Unlock()
if found := mt.sklIter.Seek(key); !found {
return false, nil
}
Expand Down
49 changes: 39 additions & 10 deletions vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,23 +151,23 @@ func (vlog *valueLog) Read(fid uint32, offset int64) (*logfile.LogEntry, error)
// Write new VLogEntry to value log file.
// If the active log file is full, it will be closed and a new active file will be created to replace it.
func (vlog *valueLog) Write(ent *logfile.LogEntry) (*valuePos, int, error) {
vlog.Lock()
defer vlog.Unlock()
buf, eSize := logfile.EncodeEntry(ent)
// if active is reach to thereshold, close it and open a new one.
if vlog.activeLogFile.WriteAt+int64(eSize) >= vlog.opt.blockSize {
vlog.Lock()
if err := vlog.Sync(); err != nil {
return nil, 0, err
}
vlog.logFiles[vlog.activeLogFile.Fid] = vlog.activeLogFile

logFile, err := vlog.createLogFile()
if err != nil {
vlog.Unlock()
return nil, 0, err
}
vlog.activeLogFile = logFile
vlog.Unlock()
}

err := vlog.activeLogFile.Write(buf)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -253,6 +253,12 @@ func (vlog *valueLog) getActiveFid() uint32 {
return fid
}

func (vlog *valueLog) getLogFile(fid uint32) *logfile.LogFile {
vlog.Lock()
defer vlog.Unlock()
return vlog.logFiles[fid]
}

func (vlog *valueLog) handleCompaction() {
if vlog.opt.gcInterval <= 0 {
return
Expand Down Expand Up @@ -282,6 +288,7 @@ func (vlog *valueLog) compact() error {

var offset int64
var validEntries []*logfile.LogEntry
ts := time.Now().Unix()
for {
entry, sz, err := file.ReadLogEntry(offset)
if err != nil {
Expand Down Expand Up @@ -309,8 +316,14 @@ func (vlog *valueLog) compact() error {
}

var nodes []*index.IndexerNode
var deletedKeys [][]byte
// rewrite valid log entries.
for _, e := range validEntries {
if e.ExpiredAt != 0 && e.ExpiredAt <= ts {
deletedKeys = append(deletedKeys, e.Key)
continue
}

valuePos, esize, err := vlog.Write(e)
if err != nil {
return err
Expand All @@ -324,10 +337,15 @@ func (vlog *valueLog) compact() error {
},
})
}
putOpts := index.PutOptions{SendDiscard: false}
if _, err := vlog.cf.indexer.PutBatch(nodes, putOpts); err != nil {
writeOpts := index.WriteOptions{SendDiscard: false}
if _, err := vlog.cf.indexer.PutBatch(nodes, writeOpts); err != nil {
return err
}
if len(deletedKeys) > 0 {
if err := vlog.cf.indexer.DeleteBatch(deletedKeys, writeOpts); err != nil {
return err
}
}
if err := vlog.cf.indexer.Sync(); err != nil {
return err
}
Expand All @@ -342,18 +360,29 @@ func (vlog *valueLog) compact() error {
}

for _, fid := range ccl {
file, err := logfile.OpenLogFile(opt.path, fid, opt.blockSize, logfile.ValueLog, opt.ioType)
if err != nil {
return err
lf := vlog.getLogFile(fid)
if lf == nil {
file, err := logfile.OpenLogFile(opt.path, fid, opt.blockSize, logfile.ValueLog, opt.ioType)
if err != nil {
return err
}
lf = file
}
if err = rewrite(file); err != nil {

if err = rewrite(lf); err != nil {
logger.Warnf("compact rewrite err: %+v", err)
return err
}
// clear discard state.
vlog.discard.clear(fid)

// delete older vlog file.
if err = file.Delete(); err != nil {
vlog.Lock()
if _, ok := vlog.logFiles[fid]; ok {
delete(vlog.logFiles, fid)
}
vlog.Unlock()
if err = lf.Delete(); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit f2fc1c1

Please sign in to comment.