forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
epoch_tracker.go
147 lines (128 loc) · 3.24 KB
/
epoch_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package tsdb
import (
"sync"
)
// TODO(jeff): using a mutex is easiest, but there may be a way to do
// this with atomics only, and in a way such that writes are minimally
// blocked.
// epochTracker keeps track of epochs for write and delete operations
// allowing a delete to block until all previous writes have completed.
type epochTracker struct {
mu sync.Mutex
epoch uint64 // current epoch
largest uint64 // largest delete possible
writes int64 // pending writes
// pending deletes waiting on writes
deletes map[uint64]*epochDeleteState
}
// newEpochTracker constructs an epochTracker.
func newEpochTracker() *epochTracker {
return &epochTracker{
deletes: make(map[uint64]*epochDeleteState),
}
}
// epochDeleteState keeps track of the state for a pending delete.
type epochDeleteState struct {
cond *sync.Cond
guard *guard
pending int64
}
// done signals that an earlier write has finished.
func (e *epochDeleteState) done() {
e.cond.L.Lock()
e.pending--
if e.pending == 0 {
e.cond.Broadcast()
}
e.cond.L.Unlock()
}
// Wait blocks until all earlier writes have finished.
func (e *epochDeleteState) Wait() {
e.cond.L.Lock()
for e.pending > 0 {
e.cond.Wait()
}
e.cond.L.Unlock()
}
// next bumps the epoch and returns it.
func (e *epochTracker) next() uint64 {
e.epoch++
return e.epoch
}
// StartWrite should be called before a write is going to start, and after
// it has checked for guards.
func (e *epochTracker) StartWrite() ([]*guard, uint64) {
e.mu.Lock()
gen := e.next()
e.writes++
if len(e.deletes) == 0 {
e.mu.Unlock()
return nil, gen
}
guards := make([]*guard, 0, len(e.deletes))
for _, state := range e.deletes {
guards = append(guards, state.guard)
}
e.mu.Unlock()
return guards, gen
}
// EndWrite should be called when the write ends for any reason.
func (e *epochTracker) EndWrite(gen uint64) {
e.mu.Lock()
if gen <= e.largest {
// TODO(jeff): at the cost of making waitDelete more
// complicated, we can keep a sorted slice which would
// allow this to exit early rather than go over the
// whole map.
for dgen, state := range e.deletes {
if gen > dgen {
continue
}
state.done()
}
}
e.writes--
e.mu.Unlock()
}
// epochWaiter is a type that can be waited on for prior writes to finish.
type epochWaiter struct {
gen uint64
guard *guard
state *epochDeleteState
tracker *epochTracker
}
// Wait blocks until all writes prior to the creation of the waiter finish.
func (e epochWaiter) Wait() {
if e.state == nil || e.tracker == nil {
return
}
e.state.Wait()
}
// Done marks the delete as completed, removing its guard.
func (e epochWaiter) Done() {
e.tracker.mu.Lock()
delete(e.tracker.deletes, e.gen)
e.tracker.mu.Unlock()
e.guard.Done()
}
// WaitDelete should be called after any delete guards have been installed.
// The returned epochWaiter will not be affected by any future writes.
func (e *epochTracker) WaitDelete(guard *guard) epochWaiter {
e.mu.Lock()
state := &epochDeleteState{
pending: e.writes,
cond: sync.NewCond(new(sync.Mutex)),
guard: guard,
}
// record our pending delete
gen := e.next()
e.largest = gen
e.deletes[gen] = state
e.mu.Unlock()
return epochWaiter{
gen: gen,
guard: guard,
state: state,
tracker: e,
}
}