Skip to content

Commit

Permalink
update logfile open and put with options
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Jan 1, 2022
1 parent c76dd47 commit 66ea2dd
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 50 deletions.
53 changes: 49 additions & 4 deletions cf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
)

var (
// ErrColoumnFamilyNil .
ErrColoumnFamilyNil = errors.New("column family name is nil")
)

// ColumnFamily is a namespace of keys and values.
type ColumnFamily struct {
activeMem *memtable.Memtable // Active memtable for writing.
immuMems []*memtable.Memtable // Immutable memtables, waiting to be flushed to disk.
Expand All @@ -31,7 +33,7 @@ func (db *LotusDB) OpenColumnFamily(opts ColumnFamilyOptions) (*ColumnFamily, er
if opts.CfName == "" {
return nil, ErrColoumnFamilyNil
}
// use db path.
// use db path as default column family path.
if opts.DirPath == "" {
opts.DirPath = db.opts.DBPath
}
Expand Down Expand Up @@ -72,20 +74,42 @@ func (cf *ColumnFamily) Close() error {

// Put put to current column family.
func (cf *ColumnFamily) Put(key, value []byte) error {
if err := cf.activeMem.Put(key, value); err != nil {
return cf.PutWithOptions(key, value, nil)
}

// PutWithOptions put to current column family with options.
func (cf *ColumnFamily) PutWithOptions(key, value []byte, opt *WriteOptions) error {
// waiting for enough memtable sapce to write.
// todo

var memOpts memtable.Options
if opt != nil {
memOpts.Sync = opt.Sync
memOpts.DisableWal = opt.DisableWal
memOpts.ExpiredAt = opt.ExpiredAt
}
if err := cf.activeMem.Put(key, value, memOpts); err != nil {
return err
}
return nil
}

// Get get from current column family.
func (cf *ColumnFamily) Get(key []byte) ([]byte, error) {
// get from memtables.
// get from active memtable.
var value []byte
if value = cf.activeMem.Get(key); len(value) != 0 {
return value, nil
}

// get from immutable memtables.
for _, mem := range cf.immuMems {
value := mem.Get(key)
if value != nil {
return value, nil
}
}

// get from bptree.

// get value from value log.
Expand All @@ -95,6 +119,20 @@ func (cf *ColumnFamily) Get(key []byte) ([]byte, error) {

// Delete delete from current column family.
func (cf *ColumnFamily) Delete(key []byte) error {
return cf.DeleteWithOptions(key, nil)
}

// DeleteWithOptions delete from current column family with options.
func (cf *ColumnFamily) DeleteWithOptions(key []byte, opt *WriteOptions) error {
var memOpts memtable.Options
if opt != nil {
memOpts.Sync = opt.Sync
memOpts.DisableWal = opt.DisableWal
memOpts.ExpiredAt = opt.ExpiredAt
}
if err := cf.activeMem.Delete(key, memOpts); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -133,8 +171,15 @@ func (cf *ColumnFamily) openMemtables() error {
ioType = logfile.MMap
}

memOpts := memtable.Options{
Path: cf.opts.WalDir,
Fsize: cf.opts.MemtableSize,
TableTyp: tableType,
IoType: ioType,
}
for i, fid := range fids {
table, err := memtable.OpenMemTable(cf.opts.WalDir, fid, cf.opts.MemtableSize, tableType, ioType)
memOpts.Fid = fid
table, err := memtable.OpenMemTable(memOpts)
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions cf_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
package lotusdb

import "testing"

func TestLotusDB_OpenColumnFamily(t *testing.T) {

}
22 changes: 17 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
package cmd
// Copyright 2022 FlowerCorp, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import "context"
package main

func main() {

context.Background()
import "fmt"

func main() {
fmt.Println("LotusDB")
}
15 changes: 13 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

var (
// ErrDefaultCfNil .
ErrDefaultCfNil = errors.New("default comumn family is nil")
)

Expand Down Expand Up @@ -47,11 +48,16 @@ func (db *LotusDB) Close() error {

// Put put to default column family.
func (db *LotusDB) Put(key, value []byte) error {
return db.PutWithOptions(key, value, nil)
}

// PutWithOptions put to default column family with options.
func (db *LotusDB) PutWithOptions(key, value []byte, opt *WriteOptions) error {
columnFamily := db.getColumnFamily(DefaultColumnFamilyName)
if columnFamily == nil {
return ErrDefaultCfNil
}
return columnFamily.Put(key, value)
return columnFamily.PutWithOptions(key, value, opt)
}

// Get get from default column family.
Expand All @@ -65,11 +71,16 @@ func (db *LotusDB) Get(key []byte) ([]byte, error) {

// Delete delete from default column family.
func (db *LotusDB) Delete(key []byte) error {
return db.DeleteWithOptions(key, nil)
}

// DeleteWithOptions delete from default column family with options.
func (db *LotusDB) DeleteWithOptions(key []byte, opt *WriteOptions) error {
columnFamily := db.getColumnFamily(DefaultColumnFamilyName)
if columnFamily == nil {
return ErrDefaultCfNil
}
return columnFamily.Delete(key)
return columnFamily.DeleteWithOptions(key, opt)
}

func (db *LotusDB) getColumnFamily(cfName string) *ColumnFamily {
Expand Down
35 changes: 28 additions & 7 deletions db_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,45 @@
package lotusdb

import (
"bytes"
"fmt"
"math/rand"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestOpen(t *testing.T) {
options := DefaultOptions("/tmp/lotusdb")
//options.CfOpts.WalMMap = true
db, err := Open(options)
assert.Nil(t, err)
defer db.Close()

//err = db.Put([]byte("key-1"), []byte("LotusDB"))
//assert.Nil(t, err)
now := time.Now()
for i := 0; i < 500000; i++ {
err := db.Put(GetKey(i), GetValue())
assert.Nil(t, err)
}
t.Log("writing 50w records, time spent: ", time.Since(now).Milliseconds())
}

const alphabet = "abcdefghijklmnopqrstuvwxyz"

//
//[]byte("key-1"), []byte("val-1")
key := []byte("key-1")
func init() {
rand.Seed(time.Now().Unix())
}

func GetKey(n int) []byte {
return []byte("test_key_" + fmt.Sprintf("%09d", n))
}

v, err := db.Get(key)
t.Log(string(v))
func GetValue() []byte {
var str bytes.Buffer
for i := 0; i < 12; i++ {
str.WriteByte(alphabet[rand.Int()%26])
}
return []byte("test_val-" + strconv.FormatInt(time.Now().UnixNano(), 10) + str.String())
}
1 change: 1 addition & 0 deletions logfile/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (lf *LogFile) Read(offset int64) (*LogEntry, int64, error) {
if crc := getEntryCrc(e, headerBuf[crc32.Size:size]); crc != header.crc32 {
return nil, 0, ErrInvalidCrc
}
lf.WriteAt += entrySize
return e, entrySize, nil
}

Expand Down
93 changes: 72 additions & 21 deletions memtable/mem_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package memtable

import (
"fmt"
"io"

"github.com/flowercorp/lotusdb/logfile"
"io"
)

type TableType int8
Expand All @@ -25,56 +24,104 @@ type (
Memtable struct {
mem IMemtable
wal *logfile.LogFile
opt Options
}

Options struct {
// options for opening a memtable.
Path string
Fid uint32
Fsize int64
TableTyp TableType
IoType logfile.IOType

// options for writing.
Sync bool
DisableWal bool
ExpiredAt int64
}
)

func OpenMemTable(path string, fid uint32, size int64, tableType TableType, ioType logfile.IOType) (*Memtable, error) {
mem := getIMemtable(tableType)
table := &Memtable{mem: mem}
func OpenMemTable(opts Options) (*Memtable, error) {
mem := getIMemtable(opts.TableTyp)
table := &Memtable{mem: mem, opt: opts}

wal, err := logfile.OpenLogFile(path, fid, size*2, logfile.WAL, ioType)
// open wal log file.
wal, err := logfile.OpenLogFile(opts.Path, opts.Fid, opts.Fsize*2, logfile.WAL, opts.IoType)
if err != nil {
return nil, err
}
table.wal = wal

var count = 0

// load entries.
var offset int64 = 0
if wal != nil {
for {
if entry, size, err := wal.Read(offset); err == nil {
offset += size
mem.Put(entry.Key, entry.Value)
} else {
if err == io.EOF {
break
}
return nil, err
for {
if entry, size, err := wal.Read(offset); err == nil {
offset += size
mem.Put(entry.Key, entry.Value)
count++
} else {
if err == io.EOF {
break
}
return nil, err
}
table.wal = wal
}

fmt.Println("写入到 MemTable 的数据量 : ", count)

return table, nil
}

func (mt *Memtable) Put(key []byte, value []byte) error {
func (mt *Memtable) Put(key []byte, value []byte, opts Options) error {
entry := &logfile.LogEntry{
Key: key,
Value: value,
}
if opts.ExpiredAt > 0 {
entry.ExpiredAt = opts.ExpiredAt
}

if mt.wal != nil {
if !opts.DisableWal && mt.wal != nil {
buf, _ := logfile.EncodeEntry(entry)
if err := mt.wal.Write(buf); err != nil {
fmt.Println(err)
return err
}

if opts.Sync {
if err := mt.wal.Sync(); err != nil {
return err
}
}
}

mt.mem.Put(key, value)
return nil
}

func (mt *Memtable) SyncWAL() error {
return mt.wal.Sync()
func (mt *Memtable) Delete(key []byte, opts Options) error {
entry := &logfile.LogEntry{
Key: key,
Type: logfile.TypeDelete,
}

if !opts.DisableWal && mt.wal != nil {
buf, _ := logfile.EncodeEntry(entry)
if err := mt.wal.Write(buf); err != nil {
return err
}

if opts.Sync {
if err := mt.wal.Sync(); err != nil {
return err
}
}
}
mt.mem.Remove(key)
return nil
}

func (mt *Memtable) Get(key []byte) []byte {
Expand All @@ -86,6 +133,10 @@ func (mt *Memtable) Get(key []byte) []byte {
return entry.Value
}

func (mt *Memtable) SyncWAL() error {
return mt.wal.Sync()
}

func (mt *Memtable) IsFull() bool {
return false
}
Expand Down
Loading

0 comments on commit 66ea2dd

Please sign in to comment.