Skip to content

Commit

Permalink
Merge pull request kubernetes#4808 from jszczepkowski/node-probes-dis…
Browse files Browse the repository at this point in the history
…abled

Node controller supports disabling node probes.
  • Loading branch information
ddysher committed Feb 27, 2015
2 parents 5fee09d + e0548c3 commit 164ccd7
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
nodeResources := &api.NodeResources{}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
nodeController.Run(5*time.Second, true)
nodeController.Run(5*time.Second, true, true)

// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
Expand Down
5 changes: 4 additions & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type CMServer struct {
RegisterRetryCount int
MachineList util.StringList
SyncNodeList bool
SyncNodeStatus bool
PodEvictionTimeout time.Duration

// TODO: Discover these by pinging the host machines, and rip out these params.
Expand All @@ -75,6 +76,7 @@ func NewCMServer() *CMServer {
NodeMilliCPU: 1000,
NodeMemory: resource.MustParse("3Gi"),
SyncNodeList: true,
SyncNodeStatus: true,
KubeletConfig: client.KubeletConfig{
Port: ports.KubeletPort,
EnableHttps: false,
Expand All @@ -100,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.")
fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controler send probes to kubelets and update NodeStatus.")
// TODO: Discover these by pinging the host machines, and rip out these flags.
// TODO: in the meantime, use resource.QuantityFlag() instead of these
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
Expand Down Expand Up @@ -158,7 +161,7 @@ func (s *CMServer) Run(_ []string) error {

nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus)

resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute)
nodeController.Run(10*time.Second, true)
nodeController.Run(10*time.Second, true, true)

endpoints := service.NewEndpointController(cl)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
Expand Down
51 changes: 43 additions & 8 deletions pkg/cloudprovider/controller/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewNodeController(

// Run creates initial node list and start syncing instances from cloudprovider if any.
// It also starts syncing cluster node status.
func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) {
// Register intial set of nodes with their status set.
var nodes *api.NodeList
var err error
Expand All @@ -96,7 +96,6 @@ func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
glog.Errorf("Error loading initial static nodes: %v", err)
}
}
nodes = s.DoChecks(nodes)
nodes, err = s.PopulateIPs(nodes)
if err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
Expand All @@ -114,12 +113,21 @@ func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
}, period)
}

// Start syncing node status.
go util.Forever(func() {
if err = s.SyncNodeStatus(); err != nil {
glog.Errorf("Error syncing status: %v", err)
}
}, period)
if syncNodeStatus {
// Start syncing node status.
go util.Forever(func() {
if err = s.SyncNodeStatus(); err != nil {
glog.Errorf("Error syncing status: %v", err)
}
}, period)
} else {
// Start checking node reachability and evicting timeouted pods.
go util.Forever(func() {
if err = s.EvictTimeoutedPods(); err != nil {
glog.Errorf("Error evicting timeouted pods: %v", err)
}
}, period)
}
}

// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
Expand Down Expand Up @@ -216,6 +224,33 @@ func (s *NodeController) SyncNodeStatus() error {
return nil
}

// EvictTimeoutedPods verifies if nodes are reachable by checking the time of last probe
// and deletes pods from not reachable nodes.
func (s *NodeController) EvictTimeoutedPods() error {
nodes, err := s.kubeClient.Nodes().List()
if err != nil {
return err
}
for _, node := range nodes.Items {
if util.Now().After(latestReadyTime(&node).Add(s.podEvictionTimeout)) {
s.deletePods(node.Name)
}
}
return nil
}

func latestReadyTime(node *api.Node) util.Time {
readyTime := node.ObjectMeta.CreationTimestamp
for _, condition := range node.Status.Conditions {
if condition.Type == api.NodeReady &&
condition.Status == api.ConditionFull &&
condition.LastProbeTime.After(readyTime.Time) {
readyTime = condition.LastProbeTime
}
}
return readyTime
}

// PopulateIPs queries IPs for given list of nodes.
func (s *NodeController) PopulateIPs(nodes *api.NodeList) (*api.NodeList, error) {
if s.isRunningCloudProvider() {
Expand Down
112 changes: 112 additions & 0 deletions pkg/cloudprovider/controller/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,118 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) {
}
}

func TestEvictTimeoutedPods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
expectedRequestCount int
expectedActions []client.FakeAction
}{
// Node created long time ago, with no status.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
},
// Node created recently, with no status.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Now(),
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: nil,
},
// Node created long time ago, with status updated long time ago.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
},
// Node created long time ago, with status updated recently.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
LastProbeTime: util.Now(),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: nil,
},
}

for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
if err := nodeController.EvictTimeoutedPods(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) {
t.Errorf("actions differs, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions)
}
}
}

func TestSyncNodeStatusDeletePods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
Expand Down

0 comments on commit 164ccd7

Please sign in to comment.