Skip to content

Commit

Permalink
Merge pull request kubernetes#11298 from mesosphere/fix-10776
Browse files Browse the repository at this point in the history
Fix deadlocks and race conditions in mesos master election notifier
  • Loading branch information
erictune committed Jul 15, 2015
2 parents c208c1c + e98c8e7 commit f5e6161
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 40 deletions.
51 changes: 22 additions & 29 deletions contrib/mesos/pkg/election/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ type Service interface {
}

type notifier struct {
lock sync.Mutex
cond *sync.Cond
changed chan struct{} // to notify the service loop about changed state

// desired is updated with every change, current is updated after
// Start()/Stop() finishes. 'cond' is used to signal that a change
// might be needed. This handles the case where mastership flops
// around without calling Start()/Stop() excessively.
desired, current Master
lock sync.Mutex // to protect the desired variable

// for comparison, to see if we are master.
id Master
Expand All @@ -65,7 +65,7 @@ type notifier struct {
// elected master starts/stops matching 'id'. Never returns.
func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) {
n := &notifier{id: Master(id), service: s}
n.cond = sync.NewCond(&n.lock)
n.changed = make(chan struct{})
finished := runtime.After(func() {
runtime.Until(func() {
for {
Expand All @@ -86,14 +86,16 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{})
glog.Errorf("Unexpected object from election channel: %v", event.Object)
break
}
func() {
n.lock.Lock()
defer n.lock.Unlock()
n.desired = electedMaster
if n.desired != n.current {
n.cond.Signal()
}
}()

n.lock.Lock()
n.desired = electedMaster
n.lock.Unlock()

// notify serviceLoop, but don't block. If a change
// is queued already it will see the new n.desired.
select {
case n.changed <- struct{}{}:
}
}
}
}
Expand All @@ -104,31 +106,22 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{})

// serviceLoop waits for changes, and calls Start()/Stop() as needed.
func (n *notifier) serviceLoop(abort <-chan struct{}) {
n.lock.Lock()
defer n.lock.Unlock()
for {
select {
case <-abort:
return
default:
for n.desired == n.current {
ch := runtime.After(n.cond.Wait)
select {
case <-abort:
n.cond.Signal() // ensure that Wait() returns
<-ch
return
case <-ch:
// we were notified and have the lock, proceed..
}
}
if n.current != n.id && n.desired == n.id {
n.service.Validate(n.desired, n.current)
case <-n.changed:
n.lock.Lock()
newDesired := n.desired // copy value to avoid race below
n.lock.Unlock()

if n.current != n.id && newDesired == n.id {
n.service.Validate(newDesired, n.current)
n.service.Start()
} else if n.current == n.id && n.desired != n.id {
} else if n.current == n.id && newDesired != n.id {
n.service.Stop()
}
n.current = n.desired
n.current = newDesired
}
}
}
30 changes: 19 additions & 11 deletions contrib/mesos/pkg/election/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,24 @@ func Test(t *testing.T) {
changes := make(chan bool, 1500)
done := make(chan struct{})
s := &slowService{t: t, changes: changes, done: done}

// change master to "notme" such that the initial m.Elect call inside Notify
// will trigger an obversable event. We will wait for it to make sure the
// Notify loop will see those master changes triggered by the go routine below.
m.ChangeMaster(Master("me"))
temporaryWatch := m.mux.Watch()
ch := temporaryWatch.ResultChan()

notifyDone := runtime.After(func() { Notify(m, "", "me", s, done) })

// wait for the event triggered by the initial m.Elect of Notify. Then drain
// the channel to not block anything.
<-ch
temporaryWatch.Stop()
for i := 0; i < len(ch); i += 1 { // go 1.3 and 1.4 compatible loop
<-ch
}

go func() {
defer close(done)
for i := 0; i < 500; i++ {
Expand All @@ -83,16 +99,8 @@ func Test(t *testing.T) {
<-notifyDone
close(changes)

changeList := []bool{}
for {
change, ok := <-changes
if !ok {
break
}
changeList = append(changeList, change)
}

if len(changeList) > 1000 {
t.Errorf("unexpected number of changes: %v", len(changeList))
changesNum := len(changes)
if changesNum > 1000 || changesNum == 0 {
t.Errorf("unexpected number of changes: %v", changesNum)
}
}

0 comments on commit f5e6161

Please sign in to comment.