Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimental refactoring of channel communication #1378

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor persister loop into single select
  • Loading branch information
mschoch committed Apr 20, 2020
commit 0a8784eb9e4ba6a796059526c6cacabe1a445b1b
180 changes: 87 additions & 93 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type persisterOptions struct {
MemoryPressurePauseThreshold uint64
}

func (s *Scorch) persisterLoop() {
func (s *Scorch) persisterLoop(initialEpoch uint64) {
defer s.asyncTasks.Done()

var persistWatchers epochWatchers
Expand All @@ -95,127 +95,121 @@ func (s *Scorch) persisterLoop() {
return
}

// tell the introducer we're waiting for changes after the initial epoch
var introducerEpochWatcher *epochWatcher
introducerEpochWatcher, err = s.introducerNotifier.NotifyUsAfter(initialEpoch, s.closeCh)
if err != nil {
return
}

OUTER:
for {
atomic.AddUint64(&s.stats.TotPersistLoopBeg, 1)
atomic.AddUint64(&s.stats.TotPersistLoopWait, 1)

select {
case <-s.closeCh:
break OUTER
case ew = <-s.persisterNotifier:
persistWatchers.Add(ew)
default:
}
if ew != nil && ew.epoch > lastMergedEpoch {
lastMergedEpoch = ew.epoch
}
lastMergedEpoch, persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
lastMergedEpoch, persistWatchers, po)
case <-introducerEpochWatcher.notifyCh:
// woken up, next loop should pick up work
atomic.AddUint64(&s.stats.TotPersistLoopWaitNotified, 1)

var ourSnapshot *IndexSnapshot
var ourPersisted []chan error
var ourPersistedCallbacks []index.BatchCallback
lastMergedEpoch, persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
lastMergedEpoch, persistWatchers, po)

// check to see if there is a new snapshot to persist
s.rootLock.Lock()
if s.root != nil && s.root.epoch > lastPersistedEpoch {
ourSnapshot = s.root
ourSnapshot.AddRef()
ourPersisted = s.rootPersisted
s.rootPersisted = nil
ourPersistedCallbacks = s.persistedCallbacks
s.persistedCallbacks = nil
atomic.StoreUint64(&s.iStats.persistSnapshotSize, uint64(ourSnapshot.Size()))
atomic.StoreUint64(&s.iStats.persistEpoch, ourSnapshot.epoch)
}
s.rootLock.Unlock()
var ourSnapshot *IndexSnapshot
var ourPersisted []chan error
var ourPersistedCallbacks []index.BatchCallback

// check to see if there is a new snapshot to persist
s.rootLock.Lock()
if s.root != nil && s.root.epoch > lastPersistedEpoch {
ourSnapshot = s.root
ourSnapshot.AddRef()
ourPersisted = s.rootPersisted
s.rootPersisted = nil
ourPersistedCallbacks = s.persistedCallbacks
s.persistedCallbacks = nil
atomic.StoreUint64(&s.iStats.persistSnapshotSize, uint64(ourSnapshot.Size()))
atomic.StoreUint64(&s.iStats.persistEpoch, ourSnapshot.epoch)
}
s.rootLock.Unlock()

if ourSnapshot != nil {
startTime := time.Now()
if ourSnapshot != nil {
startTime := time.Now()

err := s.persistSnapshot(ourSnapshot, po)
for _, ch := range ourPersisted {
if err != nil {
ch <- err
err := s.persistSnapshot(ourSnapshot, po)
for _, ch := range ourPersisted {
if err != nil {
ch <- err
}
close(ch)
}
close(ch)
}
if err != nil {
atomic.StoreUint64(&s.iStats.persistEpoch, 0)
if err == segment.ErrClosed {
// index has been closed
if err != nil {
atomic.StoreUint64(&s.iStats.persistEpoch, 0)
if err == segment.ErrClosed {
// index has been closed
_ = ourSnapshot.DecRef()
break OUTER
}

// save this current snapshot's persistedCallbacks, to invoke during
// the retry attempt
unpersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...)

s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
_ = ourSnapshot.DecRef()
break OUTER
atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)
continue OUTER
}

// save this current snapshot's persistedCallbacks, to invoke during
// the retry attempt
unpersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...)
if unpersistedCallbacks != nil {
// in the event of this being a retry attempt for persisting a snapshot
// that had earlier failed, prepend the persistedCallbacks associated
// with earlier segment(s) to the latest persistedCallbacks
ourPersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...)
unpersistedCallbacks = nil
}

s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
_ = ourSnapshot.DecRef()
atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)
continue OUTER
}
for i := range ourPersistedCallbacks {
ourPersistedCallbacks[i](err)
}

if unpersistedCallbacks != nil {
// in the event of this being a retry attempt for persisting a snapshot
// that had earlier failed, prepend the persistedCallbacks associated
// with earlier segment(s) to the latest persistedCallbacks
ourPersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...)
unpersistedCallbacks = nil
}
atomic.StoreUint64(&s.stats.LastPersistedEpoch, ourSnapshot.epoch)

for i := range ourPersistedCallbacks {
ourPersistedCallbacks[i](err)
}
lastPersistedEpoch = ourSnapshot.epoch
for _, ew := range persistWatchers {
close(ew.notifyCh)
}

atomic.StoreUint64(&s.stats.LastPersistedEpoch, ourSnapshot.epoch)
persistWatchers = nil
_ = ourSnapshot.DecRef()

lastPersistedEpoch = ourSnapshot.epoch
for _, ew := range persistWatchers {
close(ew.notifyCh)
}
changed := false
s.rootLock.RLock()
if s.root != nil && s.root.epoch != lastPersistedEpoch {
changed = true
}
s.rootLock.RUnlock()

persistWatchers = nil
_ = ourSnapshot.DecRef()
s.fireEvent(EventKindPersisterProgress, time.Since(startTime))

changed := false
s.rootLock.RLock()
if s.root != nil && s.root.epoch != lastPersistedEpoch {
changed = true
if changed {
atomic.AddUint64(&s.stats.TotPersistLoopProgress, 1)
continue OUTER
}
}
s.rootLock.RUnlock()

s.fireEvent(EventKindPersisterProgress, time.Since(startTime))

if changed {
atomic.AddUint64(&s.stats.TotPersistLoopProgress, 1)
continue OUTER
// tell the introducer we're waiting for changes after lastPersistedEpoch
introducerEpochWatcher, err = s.introducerNotifier.NotifyUsAfter(lastPersistedEpoch, s.closeCh)
if err != nil {
break OUTER
}
}

// tell the introducer we're waiting for changes
var w *epochWatcher
w, err = s.introducerNotifier.NotifyUsAfter(lastPersistedEpoch, s.closeCh)
if err != nil {
break OUTER
}

s.removeOldData() // might as well cleanup while waiting

atomic.AddUint64(&s.stats.TotPersistLoopWait, 1)

select {
case <-s.closeCh:
break OUTER
case <-w.notifyCh:
// woken up, next loop should pick up work
atomic.AddUint64(&s.stats.TotPersistLoopWaitNotified, 1)
case ew = <-s.persisterNotifier:
// if the watchers are already caught up then let them wait,
// else let them continue to do the catch up
persistWatchers.Add(ew)
s.removeOldData() // might as well cleanup while waiting
}

atomic.AddUint64(&s.stats.TotPersistLoopEnd, 1)
Expand Down
2 changes: 1 addition & 1 deletion index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (s *Scorch) Open() error {

if !s.readOnly && s.path != "" {
s.asyncTasks.Add(1)
go s.persisterLoop()
go s.persisterLoop(initialEpoch)
s.asyncTasks.Add(1)
go s.mergerLoop(initialEpoch)
}
Expand Down