Skip to content

Commit

Permalink
Move makeCompositeReconciler into taskreconciler package
Browse files Browse the repository at this point in the history
  • Loading branch information
sttts committed Nov 12, 2015
1 parent 12efba8 commit f8ee091
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 53 deletions.
54 changes: 1 addition & 53 deletions contrib/mesos/pkg/scheduler/components/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (k *framework) onInitialRegistration(driver bindings.SchedulerDriver) {
r1 := k.makeTaskRegistryReconciler()
r2 := k.makePodRegistryReconciler()

k.tasksReconciler = taskreconciler.New(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2),
k.tasksReconciler = taskreconciler.New(k.asRegisteredMaster, taskreconciler.MakeComposite(k.terminate, r1, r2),
k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate)
go k.tasksReconciler.Run(driver, k.terminate)

Expand Down Expand Up @@ -569,58 +569,6 @@ func explicitTaskFilter(t *podtask.T) bool {
}
}

// invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation
// is cancelled. if any other errors occur the composite reconciler will attempt to complete the
// sequence, reporting only the last generated error.
func (k *framework) makeCompositeReconciler(actions ...taskreconciler.Action) taskreconciler.Action {
if x := len(actions); x == 0 {
// programming error
panic("no actions specified for composite reconciler")
} else if x == 1 {
return actions[0]
}
chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b taskreconciler.Action) <-chan error {
ech := a(d, c)
ch := make(chan error, 1)
go func() {
select {
case <-k.terminate:
case <-c:
case e := <-ech:
if e != nil {
ch <- e
return
}
ech = b(d, c)
select {
case <-k.terminate:
case <-c:
case e := <-ech:
if e != nil {
ch <- e
return
}
close(ch)
return
}
}
ch <- fmt.Errorf("aborting composite reconciler action")
}()
return ch
}
result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error {
return chained(d, c, actions[0], actions[1])
}
for i := 2; i < len(actions); i++ {
i := i
next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error {
return chained(d, c, taskreconciler.Action(result), actions[i])
}
result = next
}
return taskreconciler.Action(result)
}

// reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks listed in the scheduler's internal taskRegistry.
func (k *framework) makeTaskRegistryReconciler() taskreconciler.Action {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package taskreconciler

import (
"fmt"
"time"

log "github.com/golang/glog"
Expand Down Expand Up @@ -180,3 +181,55 @@ requestLoop:
}
} // for
}

// MakeComposite invokes the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation
// is cancelled. if any other errors occur the composite reconciler will attempt to complete the
// sequence, reporting only the last generated error.
func MakeComposite(done <-chan struct{}, actions ...Action) Action {
if x := len(actions); x == 0 {
// programming error
panic("no actions specified for composite reconciler")
} else if x == 1 {
return actions[0]
}
chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b Action) <-chan error {
ech := a(d, c)
ch := make(chan error, 1)
go func() {
select {
case <-done:
case <-c:
case e := <-ech:
if e != nil {
ch <- e
return
}
ech = b(d, c)
select {
case <-done:
case <-c:
case e := <-ech:
if e != nil {
ch <- e
return
}
close(ch)
return
}
}
ch <- fmt.Errorf("aborting composite reconciler action")
}()
return ch
}
result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error {
return chained(d, c, actions[0], actions[1])
}
for i := 2; i < len(actions); i++ {
i := i
next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error {
return chained(d, c, Action(result), actions[i])
}
result = next
}
return Action(result)
}

0 comments on commit f8ee091

Please sign in to comment.