Skip to content

Commit

Permalink
Prepare for external scheduler
Browse files Browse the repository at this point in the history
1. Change names of Pod statuses (Waiting, Running, Terminated).
2. Store assigned host in etcd.
3. Change pod key to /registry/pods/<podid>. Container location remains
   the same (/registry/hosts/<machine>/kublet).
  • Loading branch information
lavalamp committed Aug 10, 2014
1 parent e35dfed commit 5cdce0e
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 119 deletions.
7 changes: 5 additions & 2 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,12 @@ type PodStatus string

// These are the valid statuses of pods.
const (
// PodWaiting means that we're waiting for the pod to begin running.
PodWaiting = "Waiting"
// PodRunning means that the pod is up and running.
PodRunning PodStatus = "Running"
PodPending PodStatus = "Pending"
PodStopped PodStatus = "Stopped"
// PodTerminated means that the pod has stopped.
PodTerminated PodStatus = "Terminated"
)

// PodInfo contains one entry for every container with available info.
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,12 @@ type PodStatus string

// These are the valid statuses of pods.
const (
// PodWaiting means that we're waiting for the pod to begin running.
PodWaiting = "Waiting"
// PodRunning means that the pod is up and running.
PodRunning PodStatus = "Running"
PodPending PodStatus = "Pending"
PodStopped PodStatus = "Stopped"
// PodTerminated means that the pod has stopped.
PodTerminated PodStatus = "Terminated"
)

// PodInfo contains one entry for every container with available info.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) {
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
var result []api.Pod
for _, value := range pods {
if api.PodStopped != value.CurrentState.Status {
if api.PodTerminated != value.CurrentState.Status {
result = append(result, value)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/master/pod_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func (p *PodCache) updatePodInfo(host, id string) error {
func (p *PodCache) UpdateAllContainers() {
pods, err := p.pods.ListPods(labels.Everything())
if err != nil {
glog.Errorf("Error synchronizing container list: %#v", err)
glog.Errorf("Error synchronizing container list: %v", err)
return
}
for _, pod := range pods {
err := p.updatePodInfo(pod.CurrentState.Host, pod.ID)
if err != nil && err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error synchronizing container: %#v", err)
glog.Errorf("Error synchronizing container: %v", err)
}
}
}
Expand Down
147 changes: 81 additions & 66 deletions pkg/registry/etcdregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
type EtcdRegistry struct {
helper tools.EtcdHelper
machines MinionRegistry
manifestFactory ManifestFactory
}

Expand All @@ -43,79 +42,112 @@ type EtcdRegistry struct {
// 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
registry := &EtcdRegistry{
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
machines: machines,
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
}
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: registry,
}
return registry
}

func makePodKey(machine, podID string) string {
return "/registry/hosts/" + machine + "/pods/" + podID
func makePodKey(podID string) string {
return "/registry/pods/" + podID
}

// ListPods obtains a list of pods that match selector.
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
pods := []api.Pod{}
machines, err := registry.machines.List()
allPods := []api.Pod{}
filteredPods := []api.Pod{}
err := registry.helper.ExtractList("/registry/pods", &allPods)
if err != nil {
return nil, err
}
for _, machine := range machines {
var machinePods []api.Pod
err := registry.helper.ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
if err != nil {
return pods, err
}
for _, pod := range machinePods {
if selector.Matches(labels.Set(pod.Labels)) {
pod.CurrentState.Host = machine
pods = append(pods, pod)
}
for _, pod := range allPods {
if selector.Matches(labels.Set(pod.Labels)) {
filteredPods = append(filteredPods, pod)
}
}
return pods, nil
return filteredPods, nil
}

// GetPod gets a specific pod specified by its ID.
func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) {
pod, _, err := registry.findPod(podID)
return &pod, err
var pod api.Pod
err := registry.helper.ExtractObj(makePodKey(podID), &pod, false)
if err != nil {
return nil, err
}
return &pod, nil
}

func makeContainerKey(machine string) string {
return "/registry/hosts/" + machine + "/kubelet"
}

// CreatePod creates a pod based on a specification, schedule it onto a specific machine.
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
podOut, machine, err := registry.findPod(pod.ID)
func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error {
// TODO: When our client supports it, switch to atomic creates.
var pod2 api.Pod
err := registry.helper.ExtractObj(makePodKey(pod.ID), &pod2, false)
if err == nil {
// TODO: this error message looks racy.
return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut)
return fmt.Errorf("a pod named %s already exists (%#v)", pod.ID, pod2)
}
return registry.runPod(pod, machineIn)

// Set status to "Waiting".
pod.CurrentState.Status = api.PodWaiting
pod.CurrentState.Host = ""

err = registry.helper.SetObj(makePodKey(pod.ID), &pod)
if err != nil {
return err
}

// TODO: Until scheduler separation is completed, just assign here.
return registry.AssignPod(pod.ID, machine)
}

func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
podKey := makePodKey(machine, pod.ID)
err := registry.helper.SetObj(podKey, pod)
// AssignPod assigns the given pod to the given machine.
// TODO: hook this up via apiserver, not by calling it from CreatePod().
func (registry *EtcdRegistry) AssignPod(podID string, machine string) error {
podKey := makePodKey(podID)
var finalPod *api.Pod
err := registry.helper.AtomicUpdate(
podKey,
&api.Pod{},
func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
pod.CurrentState.Host = machine
pod.CurrentState.Status = api.PodWaiting
finalPod = pod
return pod, nil
},
)
if err != nil {
return err
}

manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
// TODO: move this to a watch/rectification loop.
manifest, err := registry.manifestFactory.MakeManifest(machine, *finalPod)
if err != nil {
return err
}

contKey := makeContainerKey(machine)
err = registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
})
err = registry.helper.AtomicUpdate(
contKey,
&api.ContainerManifestList{},
func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
},
)
if err != nil {
// Don't strand stuff.
// Don't strand stuff. This is a terrible hack that won't be needed
// when the above TODO is fixed.
err2 := registry.helper.Delete(podKey, false)
if err2 != nil {
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
Expand All @@ -130,25 +162,32 @@ func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {

// DeletePod deletes an existing pod specified by its ID.
func (registry *EtcdRegistry) DeletePod(podID string) error {
_, machine, err := registry.findPod(podID)
var pod api.Pod
podKey := makePodKey(podID)
err := registry.helper.ExtractObj(podKey, &pod, false)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
if err != nil {
return err
}
return registry.deletePodFromMachine(machine, podID)
}

func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
// First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere.
podKey := makePodKey(machine, podID)
err := registry.helper.Delete(podKey, true)
err = registry.helper.Delete(podKey, true)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
if err != nil {
return err
}

machine := pod.CurrentState.Host
if machine == "" {
// Pod was never scheduled anywhere, just return.
return nil
}

// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
Expand All @@ -173,30 +212,6 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
})
}

func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
key := makePodKey(machine, podID)
err = registry.helper.ExtractObj(key, &pod, false)
if err != nil {
return
}
pod.CurrentState.Host = machine
return
}

func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
machines, err := registry.machines.List()
if err != nil {
return api.Pod{}, "", err
}
for _, machine := range machines {
pod, err := registry.getPodForMachine(machine, podID)
if err == nil {
return pod, machine, nil
}
}
return api.Pod{}, "", apiserver.NewNotFoundErr("pod", podID)
}

// ListControllers obtains a list of ReplicationControllers.
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
var controllers []api.ReplicationController
Expand Down
Loading

0 comments on commit 5cdce0e

Please sign in to comment.