Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for external scheduler #557

Merged
merged 3 commits into from
Aug 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,12 @@ type PodStatus string

// These are the valid statuses of pods.
const (
// PodWaiting means that we're waiting for the pod to begin running.
PodWaiting PodStatus = "Waiting"
// PodRunning means that the pod is up and running.
PodRunning PodStatus = "Running"
PodPending PodStatus = "Pending"
PodStopped PodStatus = "Stopped"
// PodTerminated means that the pod has stopped.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopped, or was rejected before it even started running, or has been disconnected and is considered lost (for now; what about when it comes back?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per what @bgrant0607 was sayaing, we'll disambiguate those cases with a StatusReason/StatusDetails field, to be added in another PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I was just pointing out that the comment (and name "Stopped") isn't 100% accurate, since it might never have been "Running", or it may in fact still be running, but in a disconnected mode. Not sure I have a better name that encompasses all terminal states. In Mesos, we use "Lost" to indicate alternative non-running states.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did change it to Terminated, which is admittedly not much different. I view both words as a current state and not carrying the implication that it previously was in a Running state. E.g., a pod could go directly from waiting to terminated under some unfortunate circumstances. "Lost" sounds like it's always a bad thing to me, whereas "Terminated" sounds like it could normal or bad depending on exactly why.

PodTerminated PodStatus = "Terminated"
)

// PodInfo contains one entry for every container with available info.
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,12 @@ type PodStatus string

// These are the valid statuses of pods.
const (
// PodWaiting means that we're waiting for the pod to begin running.
PodWaiting PodStatus = "Waiting"
// PodRunning means that the pod is up and running.
PodRunning PodStatus = "Running"
PodPending PodStatus = "Pending"
PodStopped PodStatus = "Stopped"
// PodTerminated means that the pod has stopped.
PodTerminated PodStatus = "Terminated"
)

// PodInfo contains one entry for every container with available info.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) {
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
var result []api.Pod
for _, value := range pods {
if api.PodStopped != value.CurrentState.Status {
if api.PodTerminated != value.CurrentState.Status {
result = append(result, value)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/master/pod_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func (p *PodCache) updatePodInfo(host, id string) error {
func (p *PodCache) UpdateAllContainers() {
pods, err := p.pods.ListPods(labels.Everything())
if err != nil {
glog.Errorf("Error synchronizing container list: %#v", err)
glog.Errorf("Error synchronizing container list: %v", err)
return
}
for _, pod := range pods {
err := p.updatePodInfo(pod.CurrentState.Host, pod.ID)
if err != nil && err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error synchronizing container: %#v", err)
glog.Errorf("Error synchronizing container: %v", err)
}
}
}
Expand Down
153 changes: 86 additions & 67 deletions pkg/registry/etcdregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
type EtcdRegistry struct {
helper tools.EtcdHelper
machines MinionRegistry
manifestFactory ManifestFactory
}

Expand All @@ -43,79 +42,116 @@ type EtcdRegistry struct {
// 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
registry := &EtcdRegistry{
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
machines: machines,
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
}
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: registry,
}
return registry
}

func makePodKey(machine, podID string) string {
return "/registry/hosts/" + machine + "/pods/" + podID
func makePodKey(podID string) string {
return "/registry/pods/" + podID
}

// ListPods obtains a list of pods that match selector.
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
pods := []api.Pod{}
machines, err := registry.machines.List()
allPods := []api.Pod{}
filteredPods := []api.Pod{}
err := registry.helper.ExtractList("/registry/pods", &allPods)
if err != nil {
return nil, err
}
for _, machine := range machines {
var machinePods []api.Pod
err := registry.helper.ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
if err != nil {
return pods, err
}
for _, pod := range machinePods {
if selector.Matches(labels.Set(pod.Labels)) {
pod.CurrentState.Host = machine
pods = append(pods, pod)
}
for _, pod := range allPods {
if selector.Matches(labels.Set(pod.Labels)) {
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
// matches our desires.
pod.CurrentState.Host = pod.DesiredState.Host
filteredPods = append(filteredPods, pod)
}
}
return pods, nil
return filteredPods, nil
}

// GetPod gets a specific pod specified by its ID.
func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) {
pod, _, err := registry.findPod(podID)
return &pod, err
var pod api.Pod
err := registry.helper.ExtractObj(makePodKey(podID), &pod, false)
if err != nil {
return nil, err
}
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
// matches our desires.
pod.CurrentState.Host = pod.DesiredState.Host
return &pod, nil
}

func makeContainerKey(machine string) string {
return "/registry/hosts/" + machine + "/kubelet"
}

// CreatePod creates a pod based on a specification, schedule it onto a specific machine.
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
podOut, machine, err := registry.findPod(pod.ID)
if err == nil {
// TODO: this error message looks racy.
return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut)
func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error {
// Set current status to "Waiting".
pod.CurrentState.Status = api.PodWaiting
pod.CurrentState.Host = ""

// DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling.
pod.DesiredState.Status = api.PodRunning
pod.DesiredState.Host = ""

err := registry.helper.CreateObj(makePodKey(pod.ID), &pod)
if err != nil {
return err
}
return registry.runPod(pod, machineIn)

// TODO: Until scheduler separation is completed, just assign here.
return registry.AssignPod(pod.ID, machine)
}

func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
podKey := makePodKey(machine, pod.ID)
err := registry.helper.SetObj(podKey, pod)
// AssignPod assigns the given pod to the given machine.
// TODO: hook this up via apiserver, not by calling it from CreatePod().
func (registry *EtcdRegistry) AssignPod(podID string, machine string) error {
podKey := makePodKey(podID)
var finalPod *api.Pod
err := registry.helper.AtomicUpdate(
podKey,
&api.Pod{},
func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
pod.DesiredState.Host = machine
finalPod = pod
return pod, nil
},
)
if err != nil {
return err
}

manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
// TODO: move this to a watch/rectification loop.
manifest, err := registry.manifestFactory.MakeManifest(machine, *finalPod)
if err != nil {
return err
}

contKey := makeContainerKey(machine)
err = registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
})
err = registry.helper.AtomicUpdate(
contKey,
&api.ContainerManifestList{},
func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
},
)
if err != nil {
// Don't strand stuff.
// Don't strand stuff. This is a terrible hack that won't be needed
// when the above TODO is fixed.
err2 := registry.helper.Delete(podKey, false)
if err2 != nil {
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
Expand All @@ -130,25 +166,32 @@ func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {

// DeletePod deletes an existing pod specified by its ID.
func (registry *EtcdRegistry) DeletePod(podID string) error {
_, machine, err := registry.findPod(podID)
var pod api.Pod
podKey := makePodKey(podID)
err := registry.helper.ExtractObj(podKey, &pod, false)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
if err != nil {
return err
}
return registry.deletePodFromMachine(machine, podID)
}

func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
// First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere.
podKey := makePodKey(machine, podID)
err := registry.helper.Delete(podKey, true)
err = registry.helper.Delete(podKey, true)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
if err != nil {
return err
}

machine := pod.DesiredState.Host
if machine == "" {
// Pod was never scheduled anywhere, just return.
return nil
}

// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
Expand All @@ -173,30 +216,6 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
})
}

func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
key := makePodKey(machine, podID)
err = registry.helper.ExtractObj(key, &pod, false)
if err != nil {
return
}
pod.CurrentState.Host = machine
return
}

func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
machines, err := registry.machines.List()
if err != nil {
return api.Pod{}, "", err
}
for _, machine := range machines {
pod, err := registry.getPodForMachine(machine, podID)
if err == nil {
return pod, machine, nil
}
}
return api.Pod{}, "", apiserver.NewNotFoundErr("pod", podID)
}

// ListControllers obtains a list of ReplicationControllers.
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
var controllers []api.ReplicationController
Expand Down
Loading