Skip to content

Commit

Permalink
Stop 'drain' deleting pods with local storage.
Browse files Browse the repository at this point in the history
Unless forced with --delete-local-data.  Also a refactoring of the
kubectl drain logic that selects/rejects pods and produces error/warning
messages.
  • Loading branch information
mml committed Jun 8, 2016
1 parent e79f046 commit d09af4a
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 121 deletions.
4 changes: 4 additions & 0 deletions docs/man/man1/kubectl-drain.1
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ will make the node schedulable again.


.SH OPTIONS
.PP
\fB\-\-delete\-local\-data\fP=false
Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).

.PP
\fB\-\-force\fP=false
Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet.
Expand Down
3 changes: 2 additions & 1 deletion docs/user-guide/kubectl/kubectl_drain.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ $ kubectl drain foo --grace-period=900
### Options

```
--delete-local-data[=false]: Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).
--force[=false]: Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet.
--grace-period=-1: Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.
--ignore-daemonsets[=false]: Ignore DaemonSet-managed pods.
Expand Down Expand Up @@ -110,7 +111,7 @@ $ kubectl drain foo --grace-period=900

* [kubectl](kubectl.md) - kubectl controls the Kubernetes cluster manager

###### Auto generated by spf13/cobra on 15-Apr-2016
###### Auto generated by spf13/cobra on 6-Jun-2016

<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/docs/user-guide/kubectl/kubectl_drain.md?pixel)]()
Expand Down
4 changes: 4 additions & 0 deletions docs/yaml/kubectl/kubectl_drain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ description: |
When you are ready to put the node back into service, use kubectl uncordon, which
will make the node schedulable again.
options:
- name: delete-local-data
default_value: "false"
usage: |
Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).
- name: force
default_value: "false"
usage: |
Expand Down
1 change: 1 addition & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ default-container-mem-limit
delay-shutdown
delete-collection-workers
delete-instances
delete-local-data
delete-namespace
deleting-pods-burst
deleting-pods-qps
Expand Down
260 changes: 140 additions & 120 deletions pkg/kubectl/cmd/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/controller"
// "k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/fields"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
Expand All @@ -43,14 +42,31 @@ type DrainOptions struct {
Force bool
GracePeriodSeconds int
IgnoreDaemonsets bool
DeleteLocalData bool
mapper meta.RESTMapper
nodeInfo *resource.Info
out io.Writer
typer runtime.ObjectTyper
}

// Takes a pod and returns a bool indicating whether or not to operate on the
// pod, an optional warning message, and an optional fatal error.
type podFilter func(api.Pod) (include bool, w *warning, f *fatal)
type warning struct {
string
}
type fatal struct {
string
}

const (
cordon_long = `Mark node as unschedulable.
kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)"
kDaemonsetWarning = "Ignoring DaemonSet-managed pods"
kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)"
kLocalStorageWarning = "Deleting pods with local storage"
kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet (use --force to override)"
kUnmanagedWarning = "Deleting pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet"
cordon_long = `Mark node as unschedulable.
`
cordon_example = `# Mark node "foo" as unschedulable.
kubectl cordon foo
Expand Down Expand Up @@ -136,6 +152,7 @@ func NewCmdDrain(f *cmdutil.Factory, out io.Writer) *cobra.Command {
}
cmd.Flags().BoolVar(&options.Force, "force", false, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet.")
cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.")
cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
return cmd
}
Expand Down Expand Up @@ -195,148 +212,151 @@ func (o *DrainOptions) RunDrain() error {
return nil
}

// getPodsForDeletion returns all the pods we're going to delete. If there are
// any unmanaged pods and the user didn't pass --force, we return that list in
// an error.
func (o *DrainOptions) getPodsForDeletion() ([]api.Pod, error) {
pods, unreplicatedPodNames, daemonSetPodNames, err := GetPodsForDeletionOnNodeDrain(
o.client,
o.nodeInfo.Name,
o.factory.Decoder(true),
o.Force,
o.IgnoreDaemonsets,
)
func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) {
switch sr.Reference.Kind {
case "ReplicationController":
return o.client.ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name)
case "DaemonSet":
return o.client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name)
case "Job":
return o.client.ExtensionsClient.Jobs(sr.Reference.Namespace).Get(sr.Reference.Name)
case "ReplicaSet":
return o.client.ExtensionsClient.ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name)
}
return nil, fmt.Errorf("Unknown controller kind %q", sr.Reference.Kind)
}

func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, error) {
creatorRef, found := pod.ObjectMeta.Annotations[controller.CreatedByAnnotation]
if !found {
return nil, nil
}

// Now verify that the specified creator actually exists.
sr := &api.SerializedReference{}
if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil {
return nil, err
}
// We assume the only reason for an error is because the controller is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
_, err := o.getController(sr)
if err != nil {
return []api.Pod{}, err
return nil, err
}
return sr, nil
}

daemonSetErrors := !o.IgnoreDaemonsets && len(daemonSetPodNames) > 0
unreplicatedErrors := !o.Force && len(unreplicatedPodNames) > 0
func (o *DrainOptions) unreplicatedFilter(pod api.Pod) (bool, *warning, *fatal) {
sr, err := o.getPodCreator(pod)
if err != nil {
return false, nil, &fatal{err.Error()}
}
if sr != nil {
return true, nil, nil
}
if !o.Force {
return false, nil, &fatal{kUnmanagedFatal}
}
return true, &warning{kUnmanagedWarning}, nil
}

switch {
case daemonSetErrors && unreplicatedErrors:
return []api.Pod{}, errors.New(unmanagedMsg(unreplicatedPodNames, daemonSetPodNames, true))
case daemonSetErrors && !unreplicatedErrors:
return []api.Pod{}, errors.New(unmanagedMsg([]string{}, daemonSetPodNames, true))
case unreplicatedErrors && !daemonSetErrors:
return []api.Pod{}, errors.New(unmanagedMsg(unreplicatedPodNames, []string{}, true))
func (o *DrainOptions) daemonsetFilter(pod api.Pod) (bool, *warning, *fatal) {
// Note that we return false in all cases where the pod is DaemonSet managed,
// regardless of flags. We never delete them, the only question is whether
// their presence constitutes an error.
sr, err := o.getPodCreator(pod)
if err != nil {
return false, nil, &fatal{err.Error()}
}
if sr == nil || sr.Reference.Kind != "DaemonSet" {
return true, nil, nil
}
if _, err := o.client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name); err != nil {
return false, nil, &fatal{err.Error()}
}
if !o.IgnoreDaemonsets {
return false, nil, &fatal{kDaemonsetFatal}
}
return false, &warning{kDaemonsetWarning}, nil
}

if len(unreplicatedPodNames) > 0 {
fmt.Fprintf(o.out, "WARNING: About to delete these %s\n", unmanagedMsg(unreplicatedPodNames, []string{}, false))
func mirrorPodFilter(pod api.Pod) (bool, *warning, *fatal) {
if _, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]; found {
return false, nil, nil
}
if len(daemonSetPodNames) > 0 {
fmt.Fprintf(o.out, "WARNING: Skipping %s\n", unmanagedMsg([]string{}, daemonSetPodNames, false))
return true, nil, nil
}

func hasLocalStorage(pod api.Pod) bool {
for _, volume := range pod.Spec.Volumes {
if volume.EmptyDir != nil {
return true
}
}

return pods, nil
return false
}

func (o *DrainOptions) localStorageFilter(pod api.Pod) (bool, *warning, *fatal) {
if !hasLocalStorage(pod) {
return true, nil, nil
}
if !o.DeleteLocalData {
return false, nil, &fatal{kLocalStorageFatal}
}
return true, &warning{kLocalStorageWarning}, nil
}

// GetPodsForDeletionOnNodeDrain returns pods that should be deleted on node drain as well as some extra information
// about possibly problematic pods (unreplicated and deamon sets).
func GetPodsForDeletionOnNodeDrain(client *client.Client, nodename string, decoder runtime.Decoder, force bool,
ignoreDeamonSet bool) (pods []api.Pod, unreplicatedPodNames []string, daemonSetPodNames []string, finalError error) {
// Map of status message to a list of pod names having that status.
type podStatuses map[string][]string

func (ps podStatuses) Message() string {
msgs := []string{}

for key, pods := range ps {
msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", ")))
}
return strings.Join(msgs, "; ")
}

pods = []api.Pod{}
unreplicatedPodNames = []string{}
daemonSetPodNames = []string{}
podList, err := client.Pods(api.NamespaceAll).List(api.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodename})})
// getPodsForDeletion returns all the pods we're going to delete. If there are
// any pods preventing us from deleting, we return that list in an error.
func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
podList, err := o.client.Pods(api.NamespaceAll).List(api.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name})})
if err != nil {
return []api.Pod{}, []string{}, []string{}, err
return pods, err
}

ws := podStatuses{}
fs := podStatuses{}

for _, pod := range podList.Items {
_, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]
if found {
// Skip mirror pod
continue
}
replicated := false
daemonset_pod := false

creatorRef, found := pod.ObjectMeta.Annotations[controller.CreatedByAnnotation]
if found {
// Now verify that the specified creator actually exists.
var sr api.SerializedReference
if err := runtime.DecodeInto(decoder, []byte(creatorRef), &sr); err != nil {
return []api.Pod{}, []string{}, []string{}, err
podOk := true
for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} {
filterOk, w, f := filt(pod)

podOk = podOk && filterOk
if w != nil {
ws[w.string] = append(ws[w.string], pod.Name)
}
if sr.Reference.Kind == "ReplicationController" {
rc, err := client.ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name)
// Assume the only reason for an error is because the RC is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && rc != nil {
replicated = true
}
} else if sr.Reference.Kind == "DaemonSet" {
ds, err := client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name)

// Assume the only reason for an error is because the DaemonSet is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && ds != nil {
// Otherwise, treat daemonset-managed pods as unmanaged since
// DaemonSet Controller currently ignores the unschedulable bit.
// FIXME(mml): Add link to the issue concerning a proper way to drain
// daemonset pods, probably using taints.
daemonset_pod = true
}
} else if sr.Reference.Kind == "Job" {
job, err := client.ExtensionsClient.Jobs(sr.Reference.Namespace).Get(sr.Reference.Name)

// Assume the only reason for an error is because the Job is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && job != nil {
replicated = true
}
} else if sr.Reference.Kind == "ReplicaSet" {
rs, err := client.ExtensionsClient.ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name)

// Assume the only reason for an error is because the RS is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && rs != nil {
replicated = true
}
if f != nil {
fs[f.string] = append(fs[f.string], pod.Name)
}
}

switch {
case daemonset_pod:
daemonSetPodNames = append(daemonSetPodNames, pod.Name)
case !replicated:
unreplicatedPodNames = append(unreplicatedPodNames, pod.Name)
if force {
pods = append(pods, pod)
}
default:
if podOk {
pods = append(pods, pod)
}
}
return pods, unreplicatedPodNames, daemonSetPodNames, nil
}

// Helper for generating errors or warnings about unmanaged pods.
func unmanagedMsg(unreplicatedNames []string, daemonSetNames []string, include_guidance bool) string {
msgs := []string{}
if len(unreplicatedNames) > 0 {
msg := fmt.Sprintf("pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet: %s", strings.Join(unreplicatedNames, ","))
if include_guidance {
msg += " (use --force to override)"
}
msgs = append(msgs, msg)
if len(fs) > 0 {
return []api.Pod{}, errors.New(fs.Message())
}
if len(daemonSetNames) > 0 {
msg := fmt.Sprintf("DaemonSet-managed pods: %s", strings.Join(daemonSetNames, ","))
if include_guidance {
msg += " (use --ignore-daemonsets to ignore)"
}
msgs = append(msgs, msg)
if len(ws) > 0 {
fmt.Fprintf(o.out, "WARNING: %s\n", ws.Message())
}

return strings.Join(msgs, " and ")
return pods, nil
}

// deletePods deletes the pods on the api server
Expand Down
Loading

0 comments on commit d09af4a

Please sign in to comment.