Skip to content

Commit

Permalink
List all nodes and occupy cidr map before starting allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
bprashanth committed Jul 16, 2016
1 parent 51629f5 commit 2f9516d
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 25 deletions.
5 changes: 4 additions & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if err != nil {
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
}
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
if err != nil {
glog.Fatalf("Failed to initialize nodecontroller: %v", err)
}
nodeController.Run(s.NodeSyncPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

Expand Down
5 changes: 4 additions & 1 deletion contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,12 @@ func (s *CMServer) Run(_ []string) error {
}
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps,
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
if err != nil {
glog.Fatalf("Failed to initialize nodecontroller: %v", err)
}
nodeController.Run(s.NodeSyncPeriod.Duration)

nodeStatusUpdaterController := node.NewStatusUpdater(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-status-controller")), s.NodeMonitorPeriod.Duration, time.Now)
Expand Down
47 changes: 37 additions & 10 deletions pkg/controller/node/cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ type rangeAllocator struct {

// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator {
// Caller must always pass in a list of existing nodes so the new allocator
// can initialize its CIDR map. NodeList is only nil in testing.
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *api.NodeList) (CIDRAllocator, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cidrAllocator"})
eventBroadcaster.StartLogging(glog.Infof)
Expand All @@ -82,6 +84,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
} else {
glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.")
}

if nodeList != nil {
for _, node := range nodeList.Items {
if node.Spec.PodCIDR == "" {
glog.Infof("Node %v has no CIDR, ignoring", node.Name)
continue
} else {
glog.Infof("Node %v has CIDR %s, occupying it in CIDR map", node.Name, node.Spec.PodCIDR)
}
if err := ra.occupyCIDR(&node); err != nil {
// This will happen if:
// 1. We find garbage in the podCIDR field. Retrying is useless.
// 2. CIDR out of range: This means a node CIDR has changed.
// This error will keep crashing controller-manager.
return nil, err
}
}
}
for i := 0; i < cidrUpdateWorkers; i++ {
go func(stopChan <-chan struct{}) {
for {
Expand All @@ -99,21 +119,28 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
}(wait.NeverStop)
}

return ra
return ra, nil
}

func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
if node.Spec.PodCIDR == "" {
return nil
}
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
}
if err := r.cidrs.occupy(podCIDR); err != nil {
return fmt.Errorf("failed to mark cidr as occupied: %v", err)
}
return nil
}

// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
if node.Spec.PodCIDR != "" {
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
}
if err := r.cidrs.occupy(podCIDR); err != nil {
return fmt.Errorf("failed to mark cidr as occupied: %v", err)
}
return nil
return r.occupyCIDR(node)
}
podCIDR, err := r.cidrs.allocateNext()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/node/cidr_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
expectedAllocatedCIDR string
allocatedCIDRs []string
}) {
allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize)
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
// this is a bit of white box testing
for _, allocated := range tc.allocatedCIDRs {
_, cidr, err := net.ParseCIDR(allocated)
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
subNetMaskSize int
allocatedCIDRs []string
}) {
allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize)
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
// this is a bit of white box testing
for _, allocated := range tc.allocatedCIDRs {
_, cidr, err := net.ParseCIDR(allocated)
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
allocatedCIDRs []string
cidrsToRelease []string
}) {
allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize)
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
// this is a bit of white box testing
for _, allocated := range tc.allocatedCIDRs {
_, cidr, err := net.ParseCIDR(allocated)
Expand Down
32 changes: 29 additions & 3 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
Expand All @@ -58,6 +59,8 @@ const (
nodeEvictionPeriod = 100 * time.Millisecond
// Burst value for all eviction rate limiters
evictionRateLimiterBurst = 1
// The amount of time the nodecontroller polls on the list nodes endpoint.
apiserverStartupGracePeriod = 10 * time.Minute
)

type zoneState string
Expand Down Expand Up @@ -140,6 +143,9 @@ type NodeController struct {
}

// NewNodeController returns a new node controller to sync instances from cloudprovider.
// This method returns an error if it is unable to initialize the CIDR bitmap with
// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
// currently, this should be handled as a fatal error.
func NewNodeController(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
Expand All @@ -151,7 +157,7 @@ func NewNodeController(
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool) *NodeController {
allocateNodeCIDRs bool) (*NodeController, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
eventBroadcaster.StartLogging(glog.Infof)
Expand Down Expand Up @@ -277,10 +283,30 @@ func NewNodeController(
)

if allocateNodeCIDRs {
nc.cidrAllocator = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize)
var nodeList *api.NodeList
var err error
// We must poll because apiserver might not be up. This error causes
// controller manager to restart.
if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) {
nodeList, err = kubeClient.Core().Nodes().List(api.ListOptions{
FieldSelector: fields.Everything(),
LabelSelector: labels.Everything(),
})
if err != nil {
glog.Errorf("Failed to list all nodes: %v", err)
return false, nil
}
return true, nil
}); pollErr != nil {
return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map", apiserverStartupGracePeriod)
}
nc.cidrAllocator, err = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
if err != nil {
return nil, err
}
}

return nc
return nc, nil
}

// Run starts an asynchronous loop that monitors the status of cluster nodes.
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/node/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler,
nodeController, _ := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
Expand Down Expand Up @@ -673,7 +673,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}),
deleteWaitChan: make(chan struct{}),
}
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
nodeController, _ := NewNodeController(nil, fnh, 10*time.Minute,
testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
Expand Down Expand Up @@ -907,7 +907,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}

for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -1057,7 +1057,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
}

for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -1139,7 +1139,7 @@ func TestNodeDeletion(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}

nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
nodeController, _ := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func TestCheckPod(t *testing.T) {
},
}

nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
Expand Down Expand Up @@ -1310,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
newPod("b", "bar"),
newPod("c", "gone"),
}
nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)

nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar"))
Expand Down

0 comments on commit 2f9516d

Please sign in to comment.