Skip to content

Commit

Permalink
Merge pull request kubernetes#29101 from gmarek/allocator2
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Retry assigning CIDRs

Fix kubernetes#28879, ref kubernetes#29064

cc @bgrant0607 @bprashanth @alex-mohr
  • Loading branch information
k8s-merge-robot authored Jul 19, 2016
2 parents 1026eec + 56006fa commit 8c6336b
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 23 deletions.
50 changes: 45 additions & 5 deletions pkg/controller/node/cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"errors"
"fmt"
"net"
"sync"

"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
Expand Down Expand Up @@ -60,6 +63,9 @@ type rangeAllocator struct {
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan nodeAndCIDR
recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
sync.Mutex
nodesInProcessing sets.String
}

// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
Expand All @@ -77,6 +83,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
clusterCIDR: clusterCIDR,
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
recorder: recorder,
nodesInProcessing: sets.NewString(),
}

if serviceCIDR != nil {
Expand Down Expand Up @@ -122,7 +129,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
return ra, nil
}

func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
r.Lock()
defer r.Unlock()
if r.nodesInProcessing.Has(nodeName) {
return false
}
r.nodesInProcessing.Insert(nodeName)
return true
}

func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
r.Lock()
defer r.Unlock()
r.nodesInProcessing.Delete(nodeName)
}

func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
defer r.removeNodeFromProcessing(node.Name)
if node.Spec.PodCIDR == "" {
return nil
}
Expand All @@ -138,12 +162,22 @@ func (r *rangeAllocator) occupyCIDR(node *api.Node) error {

// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
// WARNING: If you're adding any return calls or defer any more work from this function
// you have to handle correctly nodesInProcessing.
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
if node == nil {
return nil
}
if !r.insertNodeToProcessing(node.Name) {
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
return nil
}
if node.Spec.PodCIDR != "" {
return r.occupyCIDR(node)
}
podCIDR, err := r.cidrs.allocateNext()
if err != nil {
r.removeNodeFromProcessing(node.Name)
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
Expand Down Expand Up @@ -173,8 +207,8 @@ func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error {
return err
}

// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be
// assignable.
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
// so that they won't be assignable.
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
// Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either
// clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR)
Expand All @@ -192,6 +226,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
var err error
var node *api.Node
defer r.removeNodeFromProcessing(data.nodeName)
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
// TODO: change it to using PATCH instead of full Node updates.
node, err = r.client.Core().Nodes().Get(data.nodeName)
Expand All @@ -217,9 +252,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
}
if err != nil {
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
// We accept the fact that we may leek CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.
if !apierrors.IsServerTimeout(err) {
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
}
}
}
return err
Expand Down
44 changes: 26 additions & 18 deletions pkg/controller/node/cidr_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,13 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) {
}, nil
}

func (s *cidrSet) release(cidr *net.IPNet) error {
used, err := s.getIndexForCIDR(cidr)
if err != nil {
return err
}

s.Lock()
defer s.Unlock()
s.used.SetBit(&s.used, used, 0)

return nil
}

func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
begin, end := 0, s.maxCIDRs
func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) {
begin, end = 0, s.maxCIDRs
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()

if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) {
return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR)
return -1, -1, fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR)
}

if s.clusterMaskSize < maskSize {
Expand All @@ -107,7 +94,7 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
Mask: subNetMask,
})
if err != nil {
return err
return -1, -1, err
}

ip := make([]byte, 4)
Expand All @@ -118,9 +105,30 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
Mask: subNetMask,
})
if err != nil {
return err
return -1, -1, err
}
}
return begin, end, nil
}

func (s *cidrSet) release(cidr *net.IPNet) error {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()
for i := begin; i <= end; i++ {
s.used.SetBit(&s.used, i, 0)
}
return nil
}

func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
}

s.Lock()
defer s.Unlock()
Expand Down
28 changes: 28 additions & 0 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,34 @@ func NewNodeController(
glog.Errorf("Error allocating CIDR: %v", err)
}
},
UpdateFunc: func(_, obj interface{}) {
node := obj.(*api.Node)
// If the PodCIDR is not empty we either:
// - already processed a Node that already had a CIDR after NC restarted
// (cidr is marked as used),
// - already processed a Node successfully and allocated a CIDR for it
// (cidr is marked as used),
// - already processed a Node but we did saw a "timeout" response and
// request eventually got through in this case we haven't released
// the allocated CIDR (cidr is still marked as used).
// There's a possible error here:
// - NC sees a new Node and assigns a CIDR X to it,
// - Update Node call fails with a timeout,
// - Node is updated by some other component, NC sees an update and
// assigns CIDR Y to the Node,
// - Both CIDR X and CIDR Y are marked as used in the local cache,
// even though Node sees only CIDR Y
// The problem here is that in in-memory cache we see CIDR X as marked,
// which prevents it from being assigned to any new node. The cluster
// state is correct.
// Restart of NC fixes the issue.
if node.Spec.PodCIDR == "" {
err := nc.cidrAllocator.AllocateOrOccupyCIDR(node)
if err != nil {
glog.Errorf("Error allocating CIDR: %v", err)
}
}
},
DeleteFunc: func(obj interface{}) {
node := obj.(*api.Node)
err := nc.cidrAllocator.ReleaseCIDR(node)
Expand Down

0 comments on commit 8c6336b

Please sign in to comment.