forked from lotusdblabs/lotusdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmemtable.go
234 lines (207 loc) · 6.46 KB
/
memtable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package lotusdb
import (
"fmt"
"io"
"math"
"os"
"sort"
"sync"
"github.com/bwmarrin/snowflake"
arenaskl "github.com/dgraph-io/badger/v4/skl"
"github.com/dgraph-io/badger/v4/y"
"github.com/rosedblabs/wal"
)
const (
// the wal file name format is .SEG.%d
// %d is the unique id of the memtable, used to generate wal file name
// for example, the wal file name of memtable with id 1 is .SEG.1
walFileExt = ".SEG.%d"
initialTableID = 1
)
type (
// memtable is an in-memory data structure holding data before they are flushed into index and value log.
// Currently, the only supported data structure is skip list, see github.com/dgraph-io/badger/v4/skl.
//
// New writes always insert data to memtable, and reads has query from memtable
// before reading from index and value log, because memtable`s data is newer.
//
// Once a memtable is full(memtable has its threshold, see MemtableSize in options),
// it becomes immutable and replaced by a new memtable.
//
// A background goroutine will flush the content of memtable into index and vlog,
// after that the memtable can be deleted.
memtable struct {
mu sync.RWMutex
wal *wal.WAL // write ahead log for the memtable
skl *arenaskl.Skiplist // in-memory skip list
options memtableOptions
}
// memtableOptions represents the configuration options for a memtable.
memtableOptions struct {
dirPath string // where write ahead log wal file is stored
tableId uint32 // unique id of the memtable, used to generate wal file name
memSize uint32 // max size of the memtable
walBytesPerSync uint32 // flush wal file to disk throughput BytesPerSync parameter
walSync bool // WAL flush immediately after each writing
walBlockCache uint32 // block cache size of wal
}
)
// find the wal file of the memtable with the specified id
// a wal is associated with a memtable, so the wal file name is generated by the memtable id
// for example, the wal file name of memtable with id 1 is .SEG.1
func openAllMemtables(options Options) ([]*memtable, error) {
entries, err := os.ReadDir(options.DirPath)
if err != nil {
return nil, err
}
// get all memtable ids
var tableIDs []int
for _, entry := range entries {
if entry.IsDir() {
continue
}
var id int
var prefix int
_, err := fmt.Sscanf(entry.Name(), "%d"+walFileExt, &prefix, &id)
if err != nil {
continue
}
tableIDs = append(tableIDs, id)
}
if len(tableIDs) == 0 {
tableIDs = append(tableIDs, initialTableID)
}
sort.Ints(tableIDs)
tables := make([]*memtable, len(tableIDs))
for i, table := range tableIDs {
table, err := openMemtable(memtableOptions{
dirPath: options.DirPath,
tableId: uint32(table),
memSize: options.MemtableSize,
walSync: options.Sync,
walBytesPerSync: options.BytesPerSync,
walBlockCache: options.BlockCache,
})
if err != nil {
return nil, err
}
tables[i] = table
}
return tables, nil
}
// memtable holds a wal(write ahead log), so when opening a memtable,
// actually it open the corresponding wal file.
// and load all entries from wal to rebuild the content of the skip list.
func openMemtable(options memtableOptions) (*memtable, error) {
// init skip list
skl := arenaskl.NewSkiplist(int64(float64(options.memSize) * 1.5))
table := &memtable{options: options, skl: skl}
// open the Write Ahead Log file
walFile, err := wal.Open(wal.Options{
DirPath: options.dirPath,
SegmentSize: math.MaxInt, // no limit, guarantee that a wal file only contains one segment file
SegmentFileExt: fmt.Sprintf(walFileExt, options.tableId),
BlockCache: options.walBlockCache,
Sync: options.walSync,
BytesPerSync: options.walBytesPerSync,
})
if err != nil {
return nil, err
}
table.wal = walFile
indexRecords := make(map[uint64][]*LogRecord)
// now we get the opened wal file, we need to load all entries
// from wal to rebuild the content of the skip list
reader := table.wal.NewReader()
for {
chunk, _, err := reader.Next()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
record := decodeLogRecord(chunk)
if record.Type == LogRecordBatchFinished {
batchId, err := snowflake.ParseBytes(record.Key)
if err != nil {
return nil, err
}
for _, idxRecord := range indexRecords[uint64(batchId)] {
table.skl.Put(y.KeyWithTs(idxRecord.Key, 0),
y.ValueStruct{Value: idxRecord.Value, Meta: idxRecord.Type})
}
delete(indexRecords, uint64(batchId))
} else {
indexRecords[record.BatchId] = append(indexRecords[record.BatchId], record)
}
}
// open and read wal file successfully, return the memtable
return table, nil
}
// putBatch writes a batch of entries to memtable.
func (mt *memtable) putBatch(pendingWrites map[string]*LogRecord,
batchId snowflake.ID, options *WriteOptions) error {
// if wal is not disabled, write to wal first to ensure durability and atomicity
if options == nil || !options.DisableWal {
// add record to wal.pendingWrites
for _, record := range pendingWrites {
record.BatchId = uint64(batchId)
encRecord := encodeLogRecord(record)
mt.wal.PendingWrites(encRecord)
}
// add a record to indicate the end of the batch
endRecord := encodeLogRecord(&LogRecord{
Key: batchId.Bytes(),
Type: LogRecordBatchFinished,
})
mt.wal.PendingWrites(endRecord)
// write wal.pendingWrites
if _, err := mt.wal.WriteAll(); err != nil {
return err
}
// flush wal if necessary
if options.Sync && !mt.options.walSync {
if err := mt.wal.Sync(); err != nil {
return err
}
}
}
mt.mu.Lock()
// write to in-memory skip list
for key, record := range pendingWrites {
mt.skl.Put(y.KeyWithTs([]byte(key), 0), y.ValueStruct{Value: record.Value, Meta: record.Type})
}
mt.mu.Unlock()
return nil
}
// get value from memtable
// if the specified key is marked as deleted, a true bool value is returned.
func (mt *memtable) get(key []byte) (bool, []byte) {
mt.mu.RLock()
defer mt.mu.RUnlock()
valueStruct := mt.skl.Get(y.KeyWithTs(key, 0))
deleted := valueStruct.Meta == LogRecordDeleted
return deleted, valueStruct.Value
}
func (mt *memtable) isFull() bool {
return mt.skl.MemSize() >= int64(mt.options.memSize)
}
func (mt *memtable) deleteWAl() error {
if mt.wal != nil {
return mt.wal.Delete()
}
return nil
}
func (mt *memtable) close() error {
if mt.wal != nil {
return mt.wal.Close()
}
return nil
}
func (mt *memtable) sync() error {
if mt.wal != nil {
return mt.wal.Sync()
}
return nil
}