Skip to content

Commit

Permalink
Merge pull request kubernetes#557 from lavalamp/podLocation
Browse files Browse the repository at this point in the history
Prepare for external scheduler
  • Loading branch information
brendandburns committed Aug 11, 2014
2 parents 1c8704b + b5352a8 commit 3222f61
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 126 deletions.
7 changes: 5 additions & 2 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,12 @@ type PodStatus string

// These are the valid statuses of pods.
const (
// PodWaiting means that we're waiting for the pod to begin running.
PodWaiting PodStatus = "Waiting"
// PodRunning means that the pod is up and running.
PodRunning PodStatus = "Running"
PodPending PodStatus = "Pending"
PodStopped PodStatus = "Stopped"
// PodTerminated means that the pod has stopped.
PodTerminated PodStatus = "Terminated"
)

// PodInfo contains one entry for every container with available info.
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,12 @@ type PodStatus string

// These are the valid statuses of pods.
const (
// PodWaiting means that we're waiting for the pod to begin running.
PodWaiting PodStatus = "Waiting"
// PodRunning means that the pod is up and running.
PodRunning PodStatus = "Running"
PodPending PodStatus = "Pending"
PodStopped PodStatus = "Stopped"
// PodTerminated means that the pod has stopped.
PodTerminated PodStatus = "Terminated"
)

// PodInfo contains one entry for every container with available info.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) {
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
var result []api.Pod
for _, value := range pods {
if api.PodStopped != value.CurrentState.Status {
if api.PodTerminated != value.CurrentState.Status {
result = append(result, value)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/master/pod_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func (p *PodCache) updatePodInfo(host, id string) error {
func (p *PodCache) UpdateAllContainers() {
pods, err := p.pods.ListPods(labels.Everything())
if err != nil {
glog.Errorf("Error synchronizing container list: %#v", err)
glog.Errorf("Error synchronizing container list: %v", err)
return
}
for _, pod := range pods {
err := p.updatePodInfo(pod.CurrentState.Host, pod.ID)
if err != nil && err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error synchronizing container: %#v", err)
glog.Errorf("Error synchronizing container: %v", err)
}
}
}
Expand Down
153 changes: 86 additions & 67 deletions pkg/registry/etcdregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
type EtcdRegistry struct {
helper tools.EtcdHelper
machines MinionRegistry
manifestFactory ManifestFactory
}

Expand All @@ -43,79 +42,116 @@ type EtcdRegistry struct {
// 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
registry := &EtcdRegistry{
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
machines: machines,
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
}
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: registry,
}
return registry
}

func makePodKey(machine, podID string) string {
return "/registry/hosts/" + machine + "/pods/" + podID
func makePodKey(podID string) string {
return "/registry/pods/" + podID
}

// ListPods obtains a list of pods that match selector.
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
pods := []api.Pod{}
machines, err := registry.machines.List()
allPods := []api.Pod{}
filteredPods := []api.Pod{}
err := registry.helper.ExtractList("/registry/pods", &allPods)
if err != nil {
return nil, err
}
for _, machine := range machines {
var machinePods []api.Pod
err := registry.helper.ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
if err != nil {
return pods, err
}
for _, pod := range machinePods {
if selector.Matches(labels.Set(pod.Labels)) {
pod.CurrentState.Host = machine
pods = append(pods, pod)
}
for _, pod := range allPods {
if selector.Matches(labels.Set(pod.Labels)) {
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
// matches our desires.
pod.CurrentState.Host = pod.DesiredState.Host
filteredPods = append(filteredPods, pod)
}
}
return pods, nil
return filteredPods, nil
}

// GetPod gets a specific pod specified by its ID.
func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) {
pod, _, err := registry.findPod(podID)
return &pod, err
var pod api.Pod
err := registry.helper.ExtractObj(makePodKey(podID), &pod, false)
if err != nil {
return nil, err
}
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
// matches our desires.
pod.CurrentState.Host = pod.DesiredState.Host
return &pod, nil
}

func makeContainerKey(machine string) string {
return "/registry/hosts/" + machine + "/kubelet"
}

// CreatePod creates a pod based on a specification, schedule it onto a specific machine.
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
podOut, machine, err := registry.findPod(pod.ID)
if err == nil {
// TODO: this error message looks racy.
return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut)
func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error {
// Set current status to "Waiting".
pod.CurrentState.Status = api.PodWaiting
pod.CurrentState.Host = ""

// DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling.
pod.DesiredState.Status = api.PodRunning
pod.DesiredState.Host = ""

err := registry.helper.CreateObj(makePodKey(pod.ID), &pod)
if err != nil {
return err
}
return registry.runPod(pod, machineIn)

// TODO: Until scheduler separation is completed, just assign here.
return registry.AssignPod(pod.ID, machine)
}

func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
podKey := makePodKey(machine, pod.ID)
err := registry.helper.SetObj(podKey, pod)
// AssignPod assigns the given pod to the given machine.
// TODO: hook this up via apiserver, not by calling it from CreatePod().
func (registry *EtcdRegistry) AssignPod(podID string, machine string) error {
podKey := makePodKey(podID)
var finalPod *api.Pod
err := registry.helper.AtomicUpdate(
podKey,
&api.Pod{},
func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
pod.DesiredState.Host = machine
finalPod = pod
return pod, nil
},
)
if err != nil {
return err
}

manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
// TODO: move this to a watch/rectification loop.
manifest, err := registry.manifestFactory.MakeManifest(machine, *finalPod)
if err != nil {
return err
}

contKey := makeContainerKey(machine)
err = registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
})
err = registry.helper.AtomicUpdate(
contKey,
&api.ContainerManifestList{},
func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
},
)
if err != nil {
// Don't strand stuff.
// Don't strand stuff. This is a terrible hack that won't be needed
// when the above TODO is fixed.
err2 := registry.helper.Delete(podKey, false)
if err2 != nil {
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
Expand All @@ -130,25 +166,32 @@ func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {

// DeletePod deletes an existing pod specified by its ID.
func (registry *EtcdRegistry) DeletePod(podID string) error {
_, machine, err := registry.findPod(podID)
var pod api.Pod
podKey := makePodKey(podID)
err := registry.helper.ExtractObj(podKey, &pod, false)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
if err != nil {
return err
}
return registry.deletePodFromMachine(machine, podID)
}

func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
// First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere.
podKey := makePodKey(machine, podID)
err := registry.helper.Delete(podKey, true)
err = registry.helper.Delete(podKey, true)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
if err != nil {
return err
}

machine := pod.DesiredState.Host
if machine == "" {
// Pod was never scheduled anywhere, just return.
return nil
}

// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
Expand All @@ -173,30 +216,6 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
})
}

func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
key := makePodKey(machine, podID)
err = registry.helper.ExtractObj(key, &pod, false)
if err != nil {
return
}
pod.CurrentState.Host = machine
return
}

func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
machines, err := registry.machines.List()
if err != nil {
return api.Pod{}, "", err
}
for _, machine := range machines {
pod, err := registry.getPodForMachine(machine, podID)
if err == nil {
return pod, machine, nil
}
}
return api.Pod{}, "", apiserver.NewNotFoundErr("pod", podID)
}

// ListControllers obtains a list of ReplicationControllers.
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
var controllers []api.ReplicationController
Expand Down
Loading

0 comments on commit 3222f61

Please sign in to comment.