Skip to content

Commit

Permalink
Merge pull request #29063 from bprashanth/automated-cherry-pick-of-#2…
Browse files Browse the repository at this point in the history
…8604-#29062-upstream-release-1.3

Automatic merge from submit-queue

Automated cherry pick of #28604 #29062

Cherry pick of #28604 #29062 on release-1.3.
  • Loading branch information
k8s-merge-robot authored Jul 17, 2016
2 parents 4e6ee92 + 4dd5345 commit 2ba1a1d
Show file tree
Hide file tree
Showing 10 changed files with 1,261 additions and 676 deletions.
5 changes: 4 additions & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,11 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string

go podInformer.Run(wait.NeverStop)

nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
nodeController, err := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, nil, 0, false)
if err != nil {
glog.Fatalf("Failed to initialize nodecontroller: %v", err)
}
nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisortest.Fake)

Expand Down
5 changes: 4 additions & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if err != nil {
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
}
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
if err != nil {
glog.Fatalf("Failed to initialize nodecontroller: %v", err)
}
nodeController.Run(s.NodeSyncPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

Expand Down
5 changes: 4 additions & 1 deletion contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,13 @@ func (s *CMServer) Run(_ []string) error {
}
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
if err != nil {
glog.Fatalf("Failed to initialize nodecontroller: %v", err)
}
nodeController.Run(s.NodeSyncPeriod.Duration)

nodeStatusUpdaterController := node.NewStatusUpdater(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-status-controller")), s.NodeMonitorPeriod.Duration, time.Now)
Expand Down
252 changes: 155 additions & 97 deletions pkg/controller/node/cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,144 +17,202 @@ limitations under the License.
package node

import (
"encoding/binary"
"errors"
"fmt"
"math/big"
"net"
"sync"

"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
)

// TODO: figure out the good setting for those constants.
const (
// controls how many NodeSpec updates NC can process concurrently.
cidrUpdateWorkers = 10
cidrUpdateQueueSize = 5000
// podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update.
podCIDRUpdateRetry = 5
)

var errCIDRRangeNoCIDRsRemaining = errors.New("CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")

type nodeAndCIDR struct {
cidr *net.IPNet
nodeName string
}

// CIDRAllocator is an interface implemented by things that know how to allocate/occupy/recycle CIDR for nodes.
type CIDRAllocator interface {
AllocateNext() (*net.IPNet, error)
Occupy(*net.IPNet) error
Release(*net.IPNet) error
AllocateOrOccupyCIDR(node *api.Node) error
ReleaseCIDR(node *api.Node) error
}

type rangeAllocator struct {
clusterCIDR *net.IPNet
clusterIP net.IP
clusterMaskSize int
subNetMaskSize int
maxCIDRs int
used big.Int
lock sync.Mutex
nextCandidate int
client clientset.Interface
cidrs *cidrSet
clusterCIDR *net.IPNet
maxCIDRs int
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan nodeAndCIDR
recorder record.EventRecorder
}

// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator {
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
// Caller must always pass in a list of existing nodes so the new allocator
// can initialize its CIDR map. NodeList is only nil in testing.
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *api.NodeList) (CIDRAllocator, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cidrAllocator"})
eventBroadcaster.StartLogging(glog.Infof)

ra := &rangeAllocator{
clusterCIDR: clusterCIDR,
clusterIP: clusterCIDR.IP.To4(),
clusterMaskSize: clusterMaskSize,
subNetMaskSize: subNetMaskSize,
maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize),
nextCandidate: 0,
}
return ra
}
client: client,
cidrs: newCIDRSet(clusterCIDR, subNetMaskSize),
clusterCIDR: clusterCIDR,
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
recorder: recorder,
}

func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) {
r.lock.Lock()
defer r.lock.Unlock()
if serviceCIDR != nil {
ra.filterOutServiceRange(serviceCIDR)
} else {
glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.")
}

nextUnused := -1
for i := 0; i < r.maxCIDRs; i++ {
candidate := (i + r.nextCandidate) % r.maxCIDRs
if r.used.Bit(candidate) == 0 {
nextUnused = candidate
break
if nodeList != nil {
for _, node := range nodeList.Items {
if node.Spec.PodCIDR == "" {
glog.Infof("Node %v has no CIDR, ignoring", node.Name)
continue
} else {
glog.Infof("Node %v has CIDR %s, occupying it in CIDR map", node.Name, node.Spec.PodCIDR)
}
if err := ra.occupyCIDR(&node); err != nil {
// This will happen if:
// 1. We find garbage in the podCIDR field. Retrying is useless.
// 2. CIDR out of range: This means a node CIDR has changed.
// This error will keep crashing controller-manager.
return nil, err
}
}
}
if nextUnused == -1 {
return nil, errCIDRRangeNoCIDRsRemaining
for i := 0; i < cidrUpdateWorkers; i++ {
go func(stopChan <-chan struct{}) {
for {
select {
case workItem, ok := <-ra.nodeCIDRUpdateChannel:
if !ok {
glog.Warning("NodeCIDRUpdateChannel read returned false.")
return
}
ra.updateCIDRAllocation(workItem)
case <-stopChan:
return
}
}
}(wait.NeverStop)
}
r.nextCandidate = (nextUnused + 1) % r.maxCIDRs

r.used.SetBit(&r.used, nextUnused, 1)

j := uint32(nextUnused) << uint32(32-r.subNetMaskSize)
ipInt := (binary.BigEndian.Uint32(r.clusterIP)) | j
ip := make([]byte, 4)
binary.BigEndian.PutUint32(ip, ipInt)

return &net.IPNet{
IP: ip,
Mask: net.CIDRMask(r.subNetMaskSize, 32),
}, nil
return ra, nil
}

func (r *rangeAllocator) Release(cidr *net.IPNet) error {
used, err := r.getIndexForCIDR(cidr)
func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
if node.Spec.PodCIDR == "" {
return nil
}
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
return err
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
}
if err := r.cidrs.occupy(podCIDR); err != nil {
return fmt.Errorf("failed to mark cidr as occupied: %v", err)
}
return nil
}

r.lock.Lock()
defer r.lock.Unlock()
r.used.SetBit(&r.used, used, 0)
// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
if node.Spec.PodCIDR != "" {
return r.occupyCIDR(node)
}
podCIDR, err := r.cidrs.allocateNext()
if err != nil {
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}

glog.V(10).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
r.nodeCIDRUpdateChannel <- nodeAndCIDR{
nodeName: node.Name,
cidr: podCIDR,
}
return nil
}

func (r *rangeAllocator) MaxCIDRs() int {
return r.maxCIDRs
// ReleaseCIDR releases the CIDR of the removed node
func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error {
if node.Spec.PodCIDR == "" {
return nil
}
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
return fmt.Errorf("Failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
}

glog.V(4).Infof("recycle node %s CIDR %s", node.Name, podCIDR)
if err = r.cidrs.release(podCIDR); err != nil {
return fmt.Errorf("Failed to release cidr: %v", err)
}
return err
}

func (r *rangeAllocator) Occupy(cidr *net.IPNet) (err error) {
begin, end := 0, r.maxCIDRs
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be
// assignable.
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
// Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either
// clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR)
// or vice versa (which means that serviceCIDR contains clusterCIDR).
if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) {
return
}

if !r.clusterCIDR.Contains(cidr.IP.Mask(r.clusterCIDR.Mask)) && !cidr.Contains(r.clusterCIDR.IP.Mask(cidr.Mask)) {
return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, r.clusterCIDR)
if err := r.cidrs.occupy(serviceCIDR); err != nil {
glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err)
}
}

if r.clusterMaskSize < maskSize {
subNetMask := net.CIDRMask(r.subNetMaskSize, 32)
begin, err = r.getIndexForCIDR(&net.IPNet{
IP: cidr.IP.To4().Mask(subNetMask),
Mask: subNetMask,
})
// Assigns CIDR to Node and sends an update to the API server.
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
var err error
var node *api.Node
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
// TODO: change it to using PATCH instead of full Node updates.
node, err = r.client.Core().Nodes().Get(data.nodeName)
glog.Infof("Got Node: %v", node)
if err != nil {
return err
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
continue
}

ip := make([]byte, 4)
ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask))
binary.BigEndian.PutUint32(ip, ipInt)
end, err = r.getIndexForCIDR(&net.IPNet{
IP: net.IP(ip).To4().Mask(subNetMask),
Mask: subNetMask,
})
if err != nil {
return err
node.Spec.PodCIDR = data.cidr.String()
if _, err := r.client.Core().Nodes().Update(node); err != nil {
glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
} else {
break
}
}

r.lock.Lock()
defer r.lock.Unlock()
for i := begin; i <= end; i++ {
r.used.SetBit(&r.used, i, 1)
}

return nil
}

func (r *rangeAllocator) getIndexForCIDR(cidr *net.IPNet) (int, error) {
cidrIndex := (binary.BigEndian.Uint32(r.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-r.subNetMaskSize)

if cidrIndex >= uint32(r.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr)
if err != nil {
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
}
}

return int(cidrIndex), nil
return err
}
Loading

0 comments on commit 2ba1a1d

Please sign in to comment.