forked from lotusdblabs/lotusdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvlog.go
169 lines (145 loc) · 4.5 KB
/
vlog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package lotusdb
import (
"context"
"fmt"
"github.com/rosedblabs/wal"
"golang.org/x/sync/errgroup"
)
const (
valueLogFileExt = ".VLOG.%d"
tempValueLogFileExt = ".VLOG.%d.temp"
)
// valueLog value log is named after the concept in Wisckey paper
// https://www.usenix.org/system/files/conference/fast16/fast16-papers-lu.pdf
type valueLog struct {
walFiles []*wal.WAL
options valueLogOptions
}
type valueLogOptions struct {
// dirPath specifies the directory path where the WAL segment files will be stored.
dirPath string
// segmentSize specifies the maximum size of each segment file in bytes.
segmentSize int64
// blockCache specifies the size of the block cache in number of bytes.
// A block cache is used to store recently accessed data blocks, improving read performance.
// If BlockCache is set to 0, no block cache will be used.
blockCache uint32
// value log are partitioned to several parts for concurrent writing and reading
partitionNum uint32
// hash function for sharding
hashKeyFunction func([]byte) uint64
// writing validEntries to disk after reading the specified number of entries.
compactBatchCount int
}
// open wal files for value log, it will open several wal files for concurrent writing and reading
// the number of wal files is specified by the partitionNum.
func openValueLog(options valueLogOptions) (*valueLog, error) {
var walFiles []*wal.WAL
for i := 0; i < int(options.partitionNum); i++ {
vLogWal, err := wal.Open(wal.Options{
DirPath: options.dirPath,
SegmentSize: options.segmentSize,
SegmentFileExt: fmt.Sprintf(valueLogFileExt, i),
BlockCache: options.blockCache,
Sync: false, // we will sync manually
BytesPerSync: 0, // the same as Sync
})
if err != nil {
return nil, err
}
walFiles = append(walFiles, vLogWal)
}
return &valueLog{walFiles: walFiles, options: options}, nil
}
// read the value log record from the specified position.
func (vlog *valueLog) read(pos *KeyPosition) (*ValueLogRecord, error) {
buf, err := vlog.walFiles[pos.partition].Read(pos.position)
if err != nil {
return nil, err
}
log := decodeValueLogRecord(buf)
return log, nil
}
// write the value log record to the value log, it will be separated to several partitions
// and write to the corresponding partition concurrently.
func (vlog *valueLog) writeBatch(records []*ValueLogRecord) ([]*KeyPosition, error) {
// group the records by partition
partitionRecords := make([][]*ValueLogRecord, vlog.options.partitionNum)
for _, record := range records {
p := vlog.getKeyPartition(record.key)
partitionRecords[p] = append(partitionRecords[p], record)
}
// channel for receiving the positions of the records after writing to the value log
posChan := make(chan []*KeyPosition, vlog.options.partitionNum)
g, ctx := errgroup.WithContext(context.Background())
for i := 0; i < int(vlog.options.partitionNum); i++ {
if len(partitionRecords[i]) == 0 {
continue
}
part := i
g.Go(func() error {
var err error
defer func() {
if err != nil {
vlog.walFiles[part].ClearPendingWrites()
}
}()
var keyPositions []*KeyPosition
writeIdx := 0
for _, record := range partitionRecords[part] {
select {
case <-ctx.Done():
err = ctx.Err()
return err
default:
vlog.walFiles[part].PendingWrites(encodeValueLogRecord(record))
}
}
positions, err := vlog.walFiles[part].WriteAll()
if err != nil {
return err
}
for i, pos := range positions {
keyPositions = append(keyPositions, &KeyPosition{
key: partitionRecords[part][writeIdx+i].key,
partition: uint32(part),
position: pos,
})
}
posChan <- keyPositions
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
close(posChan)
// nwo we get the positions of the records, we can return them to the caller
var keyPositions []*KeyPosition
for i := 0; i < int(vlog.options.partitionNum); i++ {
pos := <-posChan
keyPositions = append(keyPositions, pos...)
}
return keyPositions, nil
}
// sync the value log to disk.
func (vlog *valueLog) sync() error {
for _, walFile := range vlog.walFiles {
if err := walFile.Sync(); err != nil {
return err
}
}
return nil
}
// close the value log.
func (vlog *valueLog) close() error {
for _, walFile := range vlog.walFiles {
if err := walFile.Close(); err != nil {
return err
}
}
return nil
}
func (vlog *valueLog) getKeyPartition(key []byte) int {
return int(vlog.options.hashKeyFunction(key) % uint64(vlog.options.partitionNum))
}