forked from grafana/tempo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcompaction_block_selector.go
183 lines (149 loc) · 5.67 KB
/
compaction_block_selector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package tempodb
import (
"fmt"
"sort"
"time"
"github.com/grafana/tempo/tempodb/backend"
)
// CompactionBlockSelector is an interface for different algorithms to pick suitable blocks for compaction
type CompactionBlockSelector interface {
BlocksToCompact() ([]*backend.BlockMeta, string)
}
const (
activeWindowDuration = 24 * time.Hour
defaultMinInputBlocks = 2
defaultMaxInputBlocks = 4
)
/*************************** Time Window Block Selector **************************/
// Sharding will be based on time slot - not level. Since each compactor works on two levels.
// Levels will be needed for id-range isolation
// The timeWindowBlockSelector can be used ONLY ONCE PER TIMESLOT.
// It needs to be reinitialized with updated blocklist.
type timeWindowBlockSelector struct {
MinInputBlocks int
MaxInputBlocks int
MaxCompactionRange time.Duration // Size of the time window - say 6 hours
MaxCompactionObjects int // maximum size of compacted objects
MaxBlockBytes uint64 // maximum block size, estimate
entries []timeWindowBlockEntry
}
type timeWindowBlockEntry struct {
meta *backend.BlockMeta
group string // Blocks in the same group will be compacted together. Sort order also determines group priority.
order string // Individual block priority within the group.
hash string // Hash string used for sharding ownership, preserves backwards compatibility
}
var _ (CompactionBlockSelector) = (*timeWindowBlockSelector)(nil)
func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int, maxBlockBytes uint64, minInputBlocks int, maxInputBlocks int) CompactionBlockSelector {
twbs := &timeWindowBlockSelector{
MinInputBlocks: minInputBlocks,
MaxInputBlocks: maxInputBlocks,
MaxCompactionRange: maxCompactionRange,
MaxCompactionObjects: maxCompactionObjects,
MaxBlockBytes: maxBlockBytes,
}
now := time.Now()
currWindow := twbs.windowForTime(now)
activeWindow := twbs.windowForTime(now.Add(-activeWindowDuration))
for _, b := range blocklist {
w := twbs.windowForBlock(b)
// exclude blocks that fall in last window from active -> inactive cut-over
// blocks in this window will not be compacted in order to avoid
// ownership conflicts where two compactors process the same block
// at the same time as it transitions from last active window to first inactive window.
if w == activeWindow {
continue
}
entry := timeWindowBlockEntry{
meta: b,
}
age := currWindow - w
if activeWindow <= w {
// inside active window.
// Group by compaction level and window.
// Choose lowest compaction level and most recent windows first.
entry.group = fmt.Sprintf("A-%v-%016X", b.CompactionLevel, age)
// Within group choose smallest blocks first.
// update after parquet: we want to make sure blocks of the same version end up together
entry.order = fmt.Sprintf("%016X-%v", entry.meta.TotalObjects, entry.meta.Version)
entry.hash = fmt.Sprintf("%v-%v-%v", b.TenantID, b.CompactionLevel, w)
} else {
// outside active window.
// Group by window only. Choose most recent windows first.
entry.group = fmt.Sprintf("B-%016X", age)
// Within group chose lowest compaction lvl and smallest blocks first.
// update after parquet: we want to make sure blocks of the same version end up together
entry.order = fmt.Sprintf("%v-%016X-%v", b.CompactionLevel, entry.meta.TotalObjects, entry.meta.Version)
entry.hash = fmt.Sprintf("%v-%v", b.TenantID, w)
}
twbs.entries = append(twbs.entries, entry)
}
// sort by group then order
sort.SliceStable(twbs.entries, func(i, j int) bool {
ei := twbs.entries[i]
ej := twbs.entries[j]
if ei.group == ej.group {
return ei.order < ej.order
}
return ei.group < ej.group
})
return twbs
}
func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*backend.BlockMeta, string) {
for len(twbs.entries) > 0 {
var chosen []timeWindowBlockEntry
// find everything from cursor forward that belongs to this group
// Gather contiguous blocks while staying within limits
i := 0
for ; i < len(twbs.entries); i++ {
for j := i + 1; j < len(twbs.entries); j++ {
stripe := twbs.entries[i : j+1]
if twbs.entries[i].group == twbs.entries[j].group &&
twbs.entries[i].meta.DataEncoding == twbs.entries[j].meta.DataEncoding &&
twbs.entries[i].meta.Version == twbs.entries[j].meta.Version && // update after parquet: only compact blocks of the same version
len(stripe) <= twbs.MaxInputBlocks &&
totalObjects(stripe) <= twbs.MaxCompactionObjects &&
totalSize(stripe) <= twbs.MaxBlockBytes {
chosen = stripe
} else {
break
}
}
if len(chosen) > 0 {
// Found a stripe of blocks
break
}
}
// Remove entries that were checked so they are not considered again.
twbs.entries = twbs.entries[i+len(chosen):]
// did we find enough blocks?
if len(chosen) >= twbs.MinInputBlocks {
compactBlocks := make([]*backend.BlockMeta, 0)
for _, e := range chosen {
compactBlocks = append(compactBlocks, e.meta)
}
return compactBlocks, chosen[0].hash
}
}
return nil, ""
}
func totalObjects(entries []timeWindowBlockEntry) int {
totalObjects := 0
for _, b := range entries {
totalObjects += b.meta.TotalObjects
}
return totalObjects
}
func totalSize(entries []timeWindowBlockEntry) uint64 {
sz := uint64(0)
for _, b := range entries {
sz += b.meta.Size
}
return sz
}
func (twbs *timeWindowBlockSelector) windowForBlock(meta *backend.BlockMeta) int64 {
return twbs.windowForTime(meta.EndTime)
}
func (twbs *timeWindowBlockSelector) windowForTime(t time.Time) int64 {
return t.Unix() / int64(twbs.MaxCompactionRange/time.Second)
}