Skip to content

Commit

Permalink
add snowman engine reset function
Browse files Browse the repository at this point in the history
  • Loading branch information
bysomeone committed Apr 19, 2024
1 parent 85327f3 commit 46b12ac
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 133 deletions.
26 changes: 26 additions & 0 deletions system/consensus/snowman/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,29 @@ func newSnowContext(cfg *types.Chain33Config) *snow.ConsensusContext {

return ctx
}

func (s *snowman) applyConfig(subCfg *types.ConfigSubModule) {

cfg := &Config{}
types.MustDecode(subCfg.Consensus["snowman"], cfg)

if cfg.K > 0 {
s.params.K = cfg.K
}

if cfg.Alpha > 0 {
s.params.Alpha = cfg.Alpha
}

if cfg.BetaVirtuous > 0 {
s.params.BetaVirtuous = cfg.BetaVirtuous
}

if cfg.BetaRogue > 0 {
s.params.BetaRogue = cfg.BetaRogue
}

if cfg.ConcurrentRepolls > 0 {
s.params.ConcurrentRepolls = cfg.ConcurrentRepolls
}
}
96 changes: 23 additions & 73 deletions system/consensus/snowman/snowman.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,77 +70,35 @@ func (s *snowman) Initialize(ctx *consensus.Context) {

s.vs = &vdrSet{}
s.vs.init(ctx)

engineConfig := newSnowmanConfig(s, s.params, newSnowContext(ctx.Base.GetAPI().GetConfig()))
engine, err := smeng.New(engineConfig)
if err != nil {
panic("Initialize snowman engine err:" + err.Error())
}
s.engine = engine
s.initSnowEngine()

s.inMsg = make(chan *queue.Message, 1024)
s.engineNotify = make(chan struct{}, 1024)
go s.startRoutine()
}

func (s *snowman) applyConfig(subCfg *types.ConfigSubModule) {

cfg := &Config{}
types.MustDecode(subCfg.Consensus["snowman"], cfg)
func (s *snowman) initSnowEngine() {

if cfg.K > 0 {
s.params.K = cfg.K
}

if cfg.Alpha > 0 {
s.params.Alpha = cfg.Alpha
}

if cfg.BetaVirtuous > 0 {
s.params.BetaVirtuous = cfg.BetaVirtuous
}

if cfg.BetaRogue > 0 {
s.params.BetaRogue = cfg.BetaRogue
}

if cfg.ConcurrentRepolls > 0 {
s.params.ConcurrentRepolls = cfg.ConcurrentRepolls
engineConfig := newSnowmanConfig(s, s.params, newSnowContext(s.ctx.Base.GetAPI().GetConfig()))
engine, err := smeng.New(engineConfig)
if err != nil {
panic("Initialize snowman engine err:" + err.Error())
}
s.engine = engine
}

func (s *snowman) getChainSyncStatus() bool {

reply, err := s.ctx.Base.GetAPI().IsSync()
func (s *snowman) resetEngine() {

if err != nil {
snowLog.Error("getChainSyncStatus", "err", err)
return false
snowLog.Debug("reset snowman engine")
s.initSnowEngine()
s.vm.reset()
if err := s.engine.Start(s.ctx.Base.Context, 0); err != nil {
snowLog.Error("resetEngine", "start engine err", err)
}

return reply.GetIsOk()
}

func (s *snowman) startRoutine() {

// check chain sync status
//for !s.getChainSyncStatus() {
// snowLog.Debug("startRoutine wait chain state syncing...")
// time.Sleep(5 * time.Second)
//}

// check connected peers
//for {
//
// peers, err := s.vs.getConnectedPeers()
// if err == nil && len(peers) >= s.params.K {
// break
// }
// snowLog.Debug("startRoutine wait more snowman peer connected...",
// "currConnected", len(peers), "minRequiredNum", s.params.K, "err", err)
// time.Sleep(5 * time.Second)
//}

for {
c, err := getLastChoice(s.ctx.Base.GetQueueClient())
if err == nil && len(c.Hash) > 0 {
Expand All @@ -155,33 +113,18 @@ func (s *snowman) startRoutine() {
}

go s.dispatchSyncMsg()
//s.lock.Lock()
//s.initDone = true
//s.lock.Unlock()
snowLog.Debug("snowman startRoutine done")

}

func (s *snowman) AddBlock(blk *types.Block) {

//s.lock.RLock()
//defer s.lock.RUnlock()
//if !s.initDone {
// return
//}
if s.vm.addNewBlock(blk) {
s.engineNotify <- struct{}{}
}
}

func (s *snowman) SubMsg(msg *queue.Message) {

//s.lock.RLock()
//defer s.lock.RUnlock()
//if !s.initDone {
// snowLog.Debug("snowman SubMsg ignore", "id", msg.ID, "name", types.GetEventName(int(msg.ID)))
// return
//}
s.inMsg <- msg
}

Expand Down Expand Up @@ -212,14 +155,15 @@ func (s *snowman) handleSyncMsg(msg *queue.Message) {

defer func() {
if r := recover(); r != nil {
var buf [4048]byte
n := runtime.Stack(buf[:], false)
snowLog.Error("handleInMsg", "err", r, "stack", buf[:n])
snowLog.Error("handleInMsg panic", "err", r, "stack", getStack())
}
}()

switch msg.ID {

case types.EventSnowmanResetEngine:
s.resetEngine()

case types.EventSnowmanChits:

req := msg.Data.(*types.SnowChits)
Expand Down Expand Up @@ -341,3 +285,9 @@ func (s *snowman) handleSyncMsg(msg *queue.Message) {

}
}

func getStack() string {
var buf [4048]byte
n := runtime.Stack(buf[:], false)
return string(buf[:n])
}
4 changes: 3 additions & 1 deletion system/consensus/snowman/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ func (s *vdrSet) getConnectedPeers() ([]*types.Peer, error) {
peers := make([]*types.Peer, 0, count)
for _, p := range peerlist.GetPeers() {

if p.Self || p.Blocked || p.Header.GetHeight() < s.self.Finalized.GetHeight() {
if p.Self || p.Blocked ||
p.Header.GetHeight() < s.self.Finalized.GetHeight() ||
p.GetFinalized().GetHeight() < s.self.Finalized.GetHeight()-128 {
continue
}
peers = append(peers, p)
Expand Down
76 changes: 17 additions & 59 deletions system/consensus/snowman/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package snowman

import (
"container/list"
"encoding/hex"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -34,18 +33,14 @@ var (
// implements the snowman.ChainVM interface
type chain33VM struct {
blankVM
api client.QueueProtocolAPI
cfg *types.Chain33Config
qclient queue.Client
pendingBlock map[string]*types.Block
api client.QueueProtocolAPI
cfg *types.Chain33Config
qclient queue.Client

pendingBlocks *list.List
lock sync.RWMutex
preferenceID ids.ID
acceptedHeight int64
preferChan chan ids.ID

decidedHashes *lru.Cache
decidedHashes *lru.Cache
}

func (vm *chain33VM) newSnowBlock(blk *types.Block, status choices.Status) *snowBlock {
Expand All @@ -61,8 +56,7 @@ func (vm *chain33VM) Init(ctx *consensus.Context) {
vm.api = ctx.Base.GetAPI()
vm.cfg = vm.api.GetConfig()
vm.qclient = ctx.Base.GetQueueClient()
vm.pendingBlock = make(map[string]*types.Block, 8)
vm.preferChan = make(chan ids.ID, 32)

c, err := lru.New(1024)
if err != nil {
panic("chain33VM Init New lru err" + err.Error())
Expand All @@ -75,7 +69,6 @@ func (vm *chain33VM) Init(ctx *consensus.Context) {
panic(err)
}
vm.acceptedHeight = choice.Height
go vm.handleNotifyNewBlock(ctx.Base.Context)
}

// Initialize implements the snowman.ChainVM interface
Expand All @@ -95,21 +88,8 @@ func (vm *chain33VM) Initialize(

}

func (vm *chain33VM) handleNotifyNewBlock(ctx context.Context) {

for {

select {

case <-ctx.Done():
return

case preferID := <-vm.preferChan:

snowLog.Debug("handleNotifyNewBlock", "hash", hex.EncodeToString(preferID[:]))
}

}
func (vm *chain33VM) reset() {
vm.decidedHashes.Purge()
}

// SetState communicates to VM its next state it starts
Expand All @@ -129,7 +109,7 @@ func (vm *chain33VM) GetBlock(_ context.Context, blkID ids.ID) (snowcon.Block, e

details, err := vm.api.GetBlockByHashes(&types.ReqHashes{Hashes: [][]byte{blkID[:]}})
if err != nil || len(details.GetItems()) < 1 || details.GetItems()[0].GetBlock() == nil {
snowLog.Error("vmGetBlock", "hash", blkID.Hex(), "GetBlockByHashes err", err)
snowLog.Error("vmGetBlock", "hash", blkID.Hex(), "stack", getStack())
return nil, database.ErrNotFound
}
sb := vm.newSnowBlock(details.GetItems()[0].GetBlock(), choices.Processing)
Expand Down Expand Up @@ -172,16 +152,9 @@ func (vm *chain33VM) addNewBlock(blk *types.Block) bool {
vm.lock.Lock()
defer vm.lock.Unlock()
vm.pendingBlocks.PushBack(vm.newSnowBlock(blk, choices.Processing))
snowLog.Debug("vm addNewBlock", "ah", ah, "bh", blk.GetHeight(), "pendingNum", vm.pendingBlocks.Len())
snowLog.Debug("vm addNewBlock", "height", blk.GetHeight(), "hash", blk.Hash(vm.cfg),
"acceptedHeight", ah, "pendingNum", vm.pendingBlocks.Len())
return true

//key := string(blk.ParentHash)
//exist, ok := vm.pendingBlock[key]
//if ok {
// snowLog.Debug("addNewBlock replace block", "height", blk.Height, "old", hex.EncodeToString(exist.Hash(vm.cfg)),
// "new", hex.EncodeToString(blk.Hash(vm.cfg)))
//}
//vm.pendingBlock[key] = blk
}

// BuildBlock Attempt to create a new block from data contained in the VM.
Expand Down Expand Up @@ -211,19 +184,14 @@ func (vm *chain33VM) BuildBlock(context.Context) (snowcon.Block, error) {
// This should always be a block that has no children known to consensus.
func (vm *chain33VM) SetPreference(ctx context.Context, blkID ids.ID) error {

vm.lock.Lock()
vm.preferenceID = blkID
vm.lock.Unlock()

snowLog.Debug("vmSetPreference", "blkHash", blkID.Hex())

err := vm.qclient.Send(vm.qclient.NewMessage("blockchain",
types.EventSnowmanPreferBlk, &types.ReqBytes{Data: blkID[:]}), false)

if err != nil {
snowLog.Error("vmSetPreference", "blkHash", blkID.Hex(), "send queue err", err)
return err
}
//err := vm.qclient.Send(vm.qclient.NewMessage("blockchain",
// types.EventSnowmanPreferBlk, &types.ReqBytes{Data: blkID[:]}), false)
//
//if err != nil {
// snowLog.Error("vmSetPreference", "blkHash", blkID.Hex(), "send queue err", err)
// return err
//}

return nil
}
Expand Down Expand Up @@ -311,13 +279,3 @@ func (vm *chain33VM) rejectBlock(height int64, blkID ids.ID) {

vm.decidedHashes.Add(blkID, false)
}

func (vm *chain33VM) removeExpireBlock() {

for key, blk := range vm.pendingBlock {

if blk.Height <= vm.acceptedHeight {
delete(vm.pendingBlock, key)
}
}
}

0 comments on commit 46b12ac

Please sign in to comment.