This repository has been archived by the owner on Oct 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathindex.go
184 lines (162 loc) · 4.53 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package state
import (
"sync"
abci "github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/types"
)
const (
MaxIndexLag = 100
)
var IndexHeightKey = []byte("IndexHeightKey")
type IndexService interface {
SetOnIndex(callback func(int64))
}
type IndexHub struct {
cmn.BaseService
mtx sync.Mutex
stateHeight int64
expectHeight int64
// the total registered index service
numIdxSvc int
indexTaskCounter map[int64]int
indexTaskEvents chan int64
stateDB dbm.DB
blockStore BlockStore
eventBus types.BlockEventPublisher
metrics *Metrics
}
func NewIndexHub(initialHeight int64, stateDB dbm.DB, blockStore BlockStore, eventBus types.BlockEventPublisher, options ...IndexHubOption) *IndexHub {
ih := &IndexHub{
stateHeight: initialHeight,
indexTaskCounter: make(map[int64]int, 0),
indexTaskEvents: make(chan int64, MaxIndexLag),
stateDB: stateDB,
blockStore: blockStore,
eventBus: eventBus,
metrics: NopMetrics(),
}
indexedHeight := ih.GetIndexedHeight()
if indexedHeight < 0 {
// no indexedHeight found, will do no recover
ih.expectHeight = ih.stateHeight + 1
} else {
ih.expectHeight = indexedHeight + 1
}
for _, option := range options {
option(ih)
}
ih.BaseService = *cmn.NewBaseService(nil, "indexHub", ih)
return ih
}
type IndexHubOption func(*IndexHub)
func IndexHubWithMetrics(metrics *Metrics) IndexHubOption {
return func(ih *IndexHub) {
ih.metrics = metrics
}
}
func (ih *IndexHub) OnStart() error {
// start listen routine before recovering.
go ih.commitIndexRoutine()
ih.recoverIndex()
return nil
}
func (ih *IndexHub) SetExpectedHeight(height int64) {
ih.expectHeight = height
}
func (ih *IndexHub) recoverIndex() {
for h := ih.expectHeight; h <= ih.stateHeight; h++ {
ih.Logger.Info("try to recover index", "height", h)
block := ih.blockStore.LoadBlock(h)
if block == nil {
ih.Logger.Error("index skip since the the block is missing", "height", h)
} else {
abciResponses, err := LoadABCIResponses(ih.stateDB, h)
if err != nil {
ih.Logger.Error("failed to load ABCIResponse, will use default")
abciResponses = NewABCIResponses(block)
abciResponses.EndBlock = &abci.ResponseEndBlock{}
abciResponses.BeginBlock = &abci.ResponseBeginBlock{}
}
abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
if err != nil {
ih.Logger.Error("failed to load validatorUpdates, will use nil by default")
}
fireEvents(ih.Logger, ih.eventBus, block, abciResponses, validatorUpdates)
}
}
}
func (ih *IndexHub) commitIndexRoutine() {
for {
select {
case <-ih.Quit():
return
case h := <-ih.indexTaskEvents:
ih.Logger.Info("finish index", "height", h)
ih.SetIndexedHeight(h)
ih.metrics.IndexHeight.Set(float64(h))
}
}
}
func (ih *IndexHub) RegisterIndexSvc(idx IndexService) {
ih.mtx.Lock()
defer ih.mtx.Unlock()
if ih.IsRunning() {
panic("can't RegisterIndexSvc when IndexHub is running")
}
idx.SetOnIndex(ih.CountDownAt)
ih.numIdxSvc++
}
// `CountDownAt` is a callback in index service, keep it simple and fast.
func (ih *IndexHub) CountDownAt(height int64) {
ih.mtx.Lock()
defer ih.mtx.Unlock()
count, exist := ih.indexTaskCounter[height]
if exist {
count = count - 1
} else {
count = ih.numIdxSvc - 1
}
// The higher block won't finish index before lower one.
if count == 0 && height == ih.expectHeight {
if exist {
delete(ih.indexTaskCounter, height)
}
ih.expectHeight = ih.expectHeight + 1
ih.indexTaskEvents <- height
} else {
ih.indexTaskCounter[height] = count
}
}
// set and get won't happen in the same time, won't lock
func (ih *IndexHub) SetIndexedHeight(h int64) {
rawHeight, err := cdc.MarshalBinaryBare(h)
if err != nil {
ih.Logger.Error("failed to MarshalBinaryBare for indexed height", "error", err, "height", h)
} else {
ih.stateDB.SetSync(IndexHeightKey, rawHeight)
}
}
// if never store `IndexHeightKey` in index db, will return -1.
func (ih *IndexHub) GetIndexedHeight() int64 {
rawHeight := ih.stateDB.Get(IndexHeightKey)
if rawHeight == nil {
return -1
} else {
var height int64
err := cdc.UnmarshalBinaryBare(rawHeight, &height)
if err != nil {
// should not happen
panic(err)
}
return height
}
}
// get indexed height from memory to save time for RPC
func (ih *IndexHub) GetHeight() int64 {
ih.mtx.Lock()
defer ih.mtx.Unlock()
return ih.expectHeight - 1
}