From f91162cf789ac6742e4aa822a2542ab14454f909 Mon Sep 17 00:00:00 2001 From: Johan Euphrosine Date: Wed, 1 Oct 2014 13:36:51 -0700 Subject: [PATCH 1/2] kubelet: add --runonce flag, exits after starting pod from the manifest --- cmd/kubelet/kubelet.go | 24 +++++++- pkg/kubelet/kubelet.go | 7 +-- pkg/kubelet/runonce.go | 126 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 152 insertions(+), 5 deletions(-) create mode 100644 pkg/kubelet/runonce.go diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 036fe3da17613..45922774e7ad4 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -64,6 +64,7 @@ var ( allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") + runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server") ) func init() { @@ -106,6 +107,17 @@ func main() { verflag.PrintAndExitIfRequested() + if *runonce { + exclusiveFlag := "invalid option: --runonce and %s are mutually exclusive" + if len(etcdServerList) > 0 { + glog.Fatalf(exclusiveFlag, "--etcd_servers") + } + if *enableServer { + glog.Infof("--runonce is set, disabling server") + *enableServer = false + } + } + etcd.SetLogger(util.NewLogger("etcd ")) capabilities.Initialize(capabilities.Capabilities{ @@ -128,7 +140,9 @@ func main() { glog.Fatal("Invalid root directory path.") } *rootDirectory = path.Clean(*rootDirectory) - os.MkdirAll(*rootDirectory, 0750) + if err := os.MkdirAll(*rootDirectory, 0750); err != nil { + glog.Warningf("Error creating root directory: %v", err) + } // source of all configuration cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates) @@ -171,6 +185,14 @@ func main() { health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{})) health.AddHealthChecker(&health.TCPHealthChecker{}) + // process pods and exit. + if *runonce { + if _, err := k.RunOnce(cfg.Updates()); err != nil { + glog.Fatalf("--runonce failed: %v", err) + } + return + } + // start the kubelet go util.Forever(func() { k.Run(cfg.Updates()) }, 0) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e210038368bf3..039be7b890d54 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -634,9 +634,8 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { } // Check for any containers that need starting - for i := range pods { - pod := &pods[i] - podFullName := GetPodFullName(pod) + for _, pod := range pods { + podFullName := GetPodFullName(&pod) uuid := pod.Manifest.UUID // Add all containers (including net) to the map. @@ -647,7 +646,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { // Run the sync in an async manifest worker. kl.podWorkers.Run(podFullName, func() { - err := kl.syncPod(pod, dockerContainers) + err := kl.syncPod(&pod, dockerContainers) if err != nil { glog.Errorf("Error syncing pod: %v skipping.", err) } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go new file mode 100644 index 0000000000000..f1f48d25b3907 --- /dev/null +++ b/pkg/kubelet/runonce.go @@ -0,0 +1,126 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + "github.com/golang/glog" +) + +const ( + RunOnceManifestDelay = 1 * time.Second + RunOnceMaxRetries = 1 + RunOnceRetryDelay = 1 * time.Second + RunOnceRetryDelayBackoff = 2 +) + +type RunPodResult struct { + Pod *Pod + Info api.PodInfo + Err error +} + +// RunOnce polls from one configuration update and run the associated pods. +func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) { + select { + case u := <-updates: + glog.Infof("processing manifest with %d pods", len(u.Pods)) + result, err := kl.runOnce(u.Pods) + glog.Infof("finished processing %d pods", len(u.Pods)) + return result, err + case <-time.After(RunOnceManifestDelay): + return nil, fmt.Errorf("no pod manifest update after %v", RunOnceManifestDelay) + } +} + +// runOnce runs a given set of pods and returns their status. +func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) { + if kl.dockerPuller == nil { + kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) + } + pods = filterHostPortConflicts(pods) + + ch := make(chan RunPodResult) + for _, pod := range pods { + go func() { + info, err := kl.runPod(pod) + ch <- RunPodResult{&pod, info, err} + }() + } + + glog.Infof("waiting for %d pods", len(pods)) + failedPods := []string{} + for i := 0; i < len(pods); i++ { + res := <-ch + results = append(results, res) + if res.Err != nil { + glog.Infof("failed to start pod %q: %v", res.Pod.Name, res.Err) + failedPods = append(failedPods, res.Pod.Name) + } else { + glog.Infof("started pod %q: %#v", res.Pod.Name, res.Info) + } + } + if len(failedPods) > 0 { + return results, fmt.Errorf("error running pods: %v", failedPods) + } + glog.Infof("%d pods started", len(pods)) + return results, err +} + +// Run a single pod and wait until all containers are running. +func (kl *Kubelet) runPod(pod Pod) (api.PodInfo, error) { + dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) + if err != nil { + return nil, fmt.Errorf("failed to get kubelet docker containers: %v", err) + } + + delay := RunOnceRetryDelay + for i := 0; i < RunOnceMaxRetries; i++ { + err := kl.syncPod(&pod, dockerContainers) + if err != nil { + return nil, fmt.Errorf("error syncing pod: %v", err) + } + info, err := kl.GetPodInfo(GetPodFullName(&pod), pod.Manifest.UUID) + if err != nil { + return nil, fmt.Errorf("error getting pod info: %v", err) + } + if podInfo(info).isRunning() { + return info, nil + } + glog.Infof("pod %q containers not running, waiting for %v", pod.Name, delay) + <-time.After(delay) + delay *= RunOnceRetryDelayBackoff + } + return nil, fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.Name, RunOnceMaxRetries) +} + +// Alias PodInfo for internal usage. +type podInfo api.PodInfo + +// Check if all containers of a pod are running. +func (info podInfo) isRunning() bool { + for _, container := range info { + if container.State.Running == nil { + return false + } + } + return true +} From fa1dcd0a8bbe02d7eb82d3afac2b83ba547f51c7 Mon Sep 17 00:00:00 2001 From: Johan Euphrosine Date: Fri, 3 Oct 2014 17:48:05 -0700 Subject: [PATCH 2/2] kubelet: add basic test for runonce --- pkg/kubelet/runonce.go | 11 +++- pkg/kubelet/runonce_test.go | 115 ++++++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 3 deletions(-) create mode 100644 pkg/kubelet/runonce_test.go diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index f1f48d25b3907..524972eee8296 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -27,7 +27,7 @@ import ( const ( RunOnceManifestDelay = 1 * time.Second - RunOnceMaxRetries = 1 + RunOnceMaxRetries = 10 RunOnceRetryDelay = 1 * time.Second RunOnceRetryDelayBackoff = 2 ) @@ -93,7 +93,9 @@ func (kl *Kubelet) runPod(pod Pod) (api.PodInfo, error) { } delay := RunOnceRetryDelay - for i := 0; i < RunOnceMaxRetries; i++ { + retry := 0 + for { + glog.Infof("syncing pod") err := kl.syncPod(&pod, dockerContainers) if err != nil { return nil, fmt.Errorf("error syncing pod: %v", err) @@ -105,11 +107,14 @@ func (kl *Kubelet) runPod(pod Pod) (api.PodInfo, error) { if podInfo(info).isRunning() { return info, nil } + if retry >= RunOnceMaxRetries { + return nil, fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.Name, RunOnceMaxRetries) + } glog.Infof("pod %q containers not running, waiting for %v", pod.Name, delay) <-time.After(delay) + retry++ delay *= RunOnceRetryDelayBackoff } - return nil, fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.Name, RunOnceMaxRetries) } // Alias PodInfo for internal usage. diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go new file mode 100644 index 0000000000000..4949a99594ca5 --- /dev/null +++ b/pkg/kubelet/runonce_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "strconv" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + docker "github.com/fsouza/go-dockerclient" +) + +type listContainersResult struct { + label string + containers []docker.APIContainers + err error +} + +type inspectContainersResult struct { + label string + container docker.Container + err error +} + +type testDocker struct { + listContainersResults []listContainersResult + inspectContainersResults []inspectContainersResult + dockertools.FakeDockerClient + t *testing.T +} + +func (d *testDocker) ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) { + if len(d.listContainersResults) > 0 { + result := d.listContainersResults[0] + d.listContainersResults = d.listContainersResults[1:] + d.t.Logf("ListContainers: %q, returning: (%v, %v)", result.label, result.containers, result.err) + return result.containers, result.err + } + return nil, fmt.Errorf("ListContainers error: no more test results") +} + +func (d *testDocker) InspectContainer(id string) (*docker.Container, error) { + if len(d.inspectContainersResults) > 0 { + result := d.inspectContainersResults[0] + d.inspectContainersResults = d.inspectContainersResults[1:] + d.t.Logf("InspectContainers: %q, returning: (%v, %v)", result.label, result.container, result.err) + return &result.container, result.err + } + return nil, fmt.Errorf("InspectContainer error: no more test results") +} + +func TestRunOnce(t *testing.T) { + kb := &Kubelet{} + container := api.Container{Name: "bar"} + kb.dockerClient = &testDocker{ + listContainersResults: []listContainersResult{ + {label: "pre syncPod", containers: []docker.APIContainers{}}, + {label: "syncPod #1", containers: []docker.APIContainers{}}, + {label: "syncPod #2", containers: []docker.APIContainers{}}, + {label: "post syncPod", containers: []docker.APIContainers{ + { + Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo.test"}, + ID: "1234", + }, + { + Names: []string{"/k8s_net_foo.test_"}, + ID: "9876", + }, + }}, + }, + inspectContainersResults: []inspectContainersResult{ + {label: "syncPod", container: docker.Container{State: docker.State{Running: true}}}, + {label: "syncPod", container: docker.Container{State: docker.State{Running: true}}}, + }, + t: t, + } + kb.dockerPuller = &dockertools.FakeDockerPuller{} + results, err := kb.runOnce([]Pod{ + { + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar"}, + }, + }, + }, + }) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if results[0].Err != nil { + t.Errorf("unexpected run pod error: %v", results[0].Err) + } + if results[0].Pod.Name != "foo" { + t.Errorf("unexpected pod: %q", results[0].Pod.Name) + } +}