Skip to content

Commit

Permalink
update value log
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Mar 13, 2022
1 parent dc976d4 commit 4018d5a
Showing 5 changed files with 124 additions and 59 deletions.
16 changes: 12 additions & 4 deletions cf.go
Original file line number Diff line number Diff line change
@@ -40,9 +40,10 @@ type ColumnFamily struct {
// Store keys and meta info.
indexer index.Indexer
// When the active memtable is full, send it to the flushChn, see listenAndFlush.
flushChn chan *memtable
opts ColumnFamilyOptions
mu sync.RWMutex
flushChn chan *memtable
flushLock sync.RWMutex // guarantee flush and compaction exclusive.
opts ColumnFamilyOptions
mu sync.RWMutex
// Prevent concurrent db using.
// At least one FileLockGuard(cf/indexer/vlog dirs are all the same).
// And at most three FileLockGuards(cf/indexer/vlog dirs are all different).
@@ -112,7 +113,14 @@ func (db *LotusDB) OpenColumnFamily(opts ColumnFamilyOptions) (*ColumnFamily, er
if opts.ValueLogMmap {
ioType = logfile.MMap
}
valueLog, err := openValueLog(opts.ValueLogDir, opts.ValueLogFileSize, ioType, opts.ValueLogGCRatio)
vlogOpt := vlogOptions{
path: opts.ValueLogDir,
blockSize: opts.ValueLogFileSize,
ioType: ioType,
gcRatio: opts.ValueLogGCRatio,
gcInterval: opts.ValueLogGCInterval,
}
valueLog, err := openValueLog(vlogOpt)
if err != nil {
return nil, err
}
33 changes: 20 additions & 13 deletions flush.go
Original file line number Diff line number Diff line change
@@ -84,19 +84,9 @@ func (cf *ColumnFamily) listenAndFlush() {
nodes = append(nodes, node)
}
}

if _, err := cf.indexer.PutBatch(nodes); err != nil {
logger.Errorf("write to indexer err.%+v", err)
break
}
if err := cf.indexer.DeleteBatch(deletedKeys); err != nil {
logger.Errorf("delete keys in indexer err.%+v", err)
break
}
// must fsync before delete wal.
if err := cf.indexer.Sync(); err != nil {
logger.Errorf("sync indexer err.%+v", err)
break
if err := cf.flushUpdateIndex(nodes, deletedKeys); err != nil {
logger.Errorf("listenAndFlush: update index err.%+v", err)
return
}
// delete wal after flush to indexer.
if err := table.deleteWal(); err != nil {
@@ -118,3 +108,20 @@ func (cf *ColumnFamily) listenAndFlush() {
}
}
}

func (cf *ColumnFamily) flushUpdateIndex(nodes []*index.IndexerNode, keys [][]byte) error {
cf.flushLock.Lock()
defer cf.flushLock.Unlock()
// must put and delete in batch.
if _, err := cf.indexer.PutBatch(nodes); err != nil {
return err
}
if err := cf.indexer.DeleteBatch(keys); err != nil {
return err
}
// must fsync before delete wal.
if err := cf.indexer.Sync(); err != nil {
return err
}
return nil
}
7 changes: 7 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ func DefaultOptions(path string) Options {
ValueLogMmap: false,
ValueThreshold: 0,
ValueLogGCRatio: 0.5,
ValueLogGCInterval: time.Minute * 10,
},
}
}
@@ -44,6 +45,7 @@ func DefaultColumnFamilyOptions(name string) ColumnFamilyOptions {
ValueLogMmap: false,
ValueThreshold: 0,
ValueLogGCRatio: 0.5,
ValueLogGCInterval: time.Minute * 10,
}
}

@@ -116,6 +118,11 @@ type ColumnFamilyOptions struct {
// The recommended ratio is 0.5, half of the file can be compacted.
// Default value is 0.5.
ValueLogGCRatio float64

// ValueLogGCInterval a backgroud groutine will check and do gc periodically according to the interval.
// If you don`t want value log file be compacted, set it a Zero time.
// Default value is 10 minutes.
ValueLogGCInterval time.Duration
}

// WriteOptions set optional params for PutWithOptions and DeleteWithOptions.
93 changes: 63 additions & 30 deletions vlog.go
Original file line number Diff line number Diff line change
@@ -5,13 +5,18 @@ import (
"fmt"
"github.com/flower-corp/lotusdb/index"
"github.com/flower-corp/lotusdb/logfile"
"github.com/flower-corp/lotusdb/logger"
"io"
"io/ioutil"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)

var (
@@ -29,7 +34,7 @@ type (
// Values will be stored in value log if its size exceed ValueThreshold in options.
valueLog struct {
sync.RWMutex
opt options
opt vlogOptions
activeLogFile *logfile.LogFile // current active log file for writing.
logFiles map[uint32]*logfile.LogFile // all log files. Must hold the mutex before modify it.
cf *ColumnFamily
@@ -42,23 +47,18 @@ type (
Offset int64
}

options struct {
path string
blockSize int64
ioType logfile.IOType
gcRatio float64
vlogOptions struct {
path string
blockSize int64
ioType logfile.IOType
gcRatio float64
gcInterval time.Duration
}
)

// openValueLog create a new value log file.
func openValueLog(path string, blockSize int64, ioType logfile.IOType, gcRatio float64) (*valueLog, error) {
opt := options{
path: path,
blockSize: blockSize,
ioType: ioType,
gcRatio: gcRatio,
}
fileInfos, err := ioutil.ReadDir(path)
func openValueLog(opt vlogOptions) (*valueLog, error) {
fileInfos, err := ioutil.ReadDir(opt.path)
if err != nil {
return nil, err
}
@@ -84,12 +84,12 @@ func openValueLog(path string, blockSize int64, ioType logfile.IOType, gcRatio f
}

// open discard file.
discard, err := newDiscard(path, vlogDiscardName)
discard, err := newDiscard(opt.path, vlogDiscardName)
if err != nil {
return nil, err
}
// open active log file only.
logFile, err := logfile.OpenLogFile(path, fids[len(fids)-1], opt.blockSize, logfile.ValueLog, opt.ioType)
logFile, err := logfile.OpenLogFile(opt.path, fids[len(fids)-1], opt.blockSize, logfile.ValueLog, opt.ioType)
if err != nil {
return nil, err
}
@@ -108,6 +108,7 @@ func openValueLog(path string, blockSize int64, ioType logfile.IOType, gcRatio f
if err := vlog.setLogFileState(); err != nil {
return nil, err
}
go vlog.handleGC()
return vlog, nil
}

@@ -237,22 +238,34 @@ func (vlog *valueLog) setLogFileState() error {
return nil
}

func (vlog *valueLog) compact() error {
opt := vlog.opt
func (vlog *valueLog) handleGC() {
if vlog.opt.gcInterval <= 0 {
return
}

quitSig := make(chan os.Signal, 1)
signal.Notify(quitSig, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
ticker := time.NewTicker(vlog.opt.gcInterval)
defer ticker.Stop()
for {
fid, ratio, err := vlog.discard.maxDiscardFid()
if err != nil {
return err
}
if ratio < vlog.opt.gcRatio {
break
}
file, err := logfile.OpenLogFile(opt.path, fid, opt.blockSize, logfile.ValueLog, opt.ioType)
if err != nil {
return err
select {
case <-ticker.C:
if err := vlog.compact(); err != nil {
logger.Errorf("value log compact err: %+v", err)
}
case <-quitSig:
return
}
}
}

func (vlog *valueLog) compact() error {
rewrite := func(file *logfile.LogFile) error {
vlog.cf.flushLock.Lock()
defer vlog.cf.flushLock.Unlock()
var offset int64
var validEntries []*logfile.LogEntry

for {
entry, sz, err := file.ReadLogEntry(offset)
if err != nil {
@@ -271,7 +284,7 @@ func (vlog *valueLog) compact() error {
if len(indexMeta.Value) != 0 {
continue
}
if indexMeta.Fid == fid && indexMeta.Offset == eoff {
if indexMeta.Fid == file.Fid && indexMeta.Offset == eoff {
validEntries = append(validEntries, entry)
}
}
@@ -288,7 +301,27 @@ func (vlog *valueLog) compact() error {
Meta: &index.IndexerMeta{Fid: valuePos.Fid, Offset: valuePos.Offset},
})
}
if _, err = vlog.cf.indexer.PutBatch(nodes); err != nil {
if _, err := vlog.cf.indexer.PutBatch(nodes); err != nil {
return err
}
return nil
}

opt := vlog.opt
for {
fid, ratio, err := vlog.discard.maxDiscardFid()
if err != nil {
return err
}
if ratio < vlog.opt.gcRatio {
break
}
file, err := logfile.OpenLogFile(opt.path, fid, opt.blockSize, logfile.ValueLog, opt.ioType)
if err != nil {
return err
}
if err = rewrite(file); err != nil {
logger.Warnf("compact rewrite err: %+v", err)
return err
}
// delete older vlog file.
34 changes: 22 additions & 12 deletions vlog_test.go
Original file line number Diff line number Diff line change
@@ -29,14 +29,14 @@ func TestOpenValueLog(t *testing.T) {
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLog(path, 180, logfile.FileIO, 0.5)
vlog, err := openValueLogForTest(path, 180, logfile.FileIO, 0.5)
assert.Nil(t, err)

_, err = vlog.Write(&logfile.LogEntry{Key: GetKey(923), Value: GetValue128B()})
assert.Nil(t, err)

// open again, the old active log file is close to full, so we weill create a new active log file.
vlog1, err := openValueLog(path, 180, logfile.FileIO, 0.5)
vlog1, err := openValueLogForTest(path, 180, logfile.FileIO, 0.5)
assert.Nil(t, err)
assert.NotNil(t, vlog1)
})
@@ -80,7 +80,7 @@ func testOpenValueLog(t *testing.T, ioType logfile.IOType) {
assert.Nil(t, err)
}
}
got, err := openValueLog(tt.args.path, tt.args.blockSize, tt.args.ioType, 0.5)
got, err := openValueLogForTest(tt.args.path, tt.args.blockSize, tt.args.ioType, 0.5)
if (err != nil) != tt.wantErr {
t.Errorf("openValueLog() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -111,7 +111,7 @@ func testValueLogWrite(t *testing.T, ioType logfile.IOType) {
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLog(path, 1024<<20, ioType, 0.5)
vlog, err := openValueLogForTest(path, 1024<<20, ioType, 0.5)
assert.Nil(t, err)

type fields struct {
@@ -176,7 +176,7 @@ func TestValueLog_WriteAfterReopen(t *testing.T) {
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLog(path, 100, logfile.FileIO, 0.5)
vlog, err := openValueLogForTest(path, 100, logfile.FileIO, 0.5)
assert.Nil(t, err)

tests := []*logfile.LogEntry{
@@ -196,7 +196,7 @@ func TestValueLog_WriteAfterReopen(t *testing.T) {
assert.Nil(t, err)

// reopen it.
vlog, err = openValueLog(path, 100, logfile.MMap, 0.5)
vlog, err = openValueLogForTest(path, 100, logfile.MMap, 0.5)
assert.Nil(t, err)
pos2, err := vlog.Write(tests[1])
assert.Nil(t, err)
@@ -230,7 +230,7 @@ func testValueLogWriteUntilNewActiveFileOpen(t *testing.T, ioType logfile.IOType
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLog(path, 10<<20, ioType, 0.5)
vlog, err := openValueLogForTest(path, 10<<20, ioType, 0.5)
assert.Nil(t, err)

writeCount := 100000
@@ -275,7 +275,7 @@ func testValueLogRead(t *testing.T, ioType logfile.IOType) {
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLog(path, 10<<20, ioType, 0.5)
vlog, err := openValueLogForTest(path, 10<<20, ioType, 0.5)
assert.Nil(t, err)

type data struct {
@@ -356,7 +356,7 @@ func TestValueLog_ReadFromArchivedFile(t *testing.T) {
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLog(path, 10<<20, logfile.FileIO, 0.5)
vlog, err := openValueLogForTest(path, 10<<20, logfile.FileIO, 0.5)
assert.Nil(t, err)

writeCount := 100000
@@ -368,7 +368,7 @@ func TestValueLog_ReadFromArchivedFile(t *testing.T) {
err = vlog.Close()
assert.Nil(t, err)

vlog1, err := openValueLog(path, 10<<20, logfile.FileIO, 0.5)
vlog1, err := openValueLogForTest(path, 10<<20, logfile.FileIO, 0.5)
assert.Nil(t, err)
e, err := vlog1.Read(0, 0)
assert.Nil(t, err)
@@ -385,7 +385,7 @@ func TestValueLog_Sync(t *testing.T) {
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLog(path, 10<<20, logfile.FileIO, 0.5)
vlog, err := openValueLogForTest(path, 10<<20, logfile.FileIO, 0.5)
assert.Nil(t, err)

err = vlog.Sync()
@@ -401,9 +401,19 @@ func TestValueLog_Close(t *testing.T) {
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLog(path, 10<<20, logfile.MMap, 0.5)
vlog, err := openValueLogForTest(path, 10<<20, logfile.MMap, 0.5)
assert.Nil(t, err)

err = vlog.Close()
assert.Nil(t, err)
}

func openValueLogForTest(path string, blockSize int64, ioType logfile.IOType, gcRatio float64) (*valueLog, error) {
opts := vlogOptions{
path: path,
blockSize: blockSize,
ioType: ioType,
gcRatio: gcRatio,
}
return openValueLog(opts)
}

0 comments on commit 4018d5a

Please sign in to comment.