Skip to content

Commit

Permalink
Write BoundPods to etcd instead of ContainerManifestList
Browse files Browse the repository at this point in the history
Rename ManifestFactory -> BoundPodFactory and change the general structure
of the call to focus on BoundPod.

TBD: Should the etcd path be changed to be more accurate?
TBD: Only a subset of Pod data is copied to the BoundPod
  • Loading branch information
smarterclayton committed Oct 9, 2014
1 parent 4304114 commit ea8f2f3
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 110 deletions.
17 changes: 16 additions & 1 deletion pkg/api/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,20 @@ func init() {
}
out.ResourceVersion = in.ResourceVersion
return nil
})
},

// Convert Pod to BoundPod
func(in *Pod, out *BoundPod, s conversion.Scope) error {
if err := s.Convert(&in.DesiredState.Manifest, out, 0); err != nil {
return err
}
// Only copy a subset of fields, and override manifest attributes with the pod
// metadata
out.UID = in.UID
out.ID = in.ID
out.Namespace = in.Namespace
out.CreationTimestamp = in.CreationTimestamp
return nil
},
)
}
6 changes: 3 additions & 3 deletions pkg/constraint/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)

// Allowed returns true if manifests is a collection of manifests
// Allowed returns true if pods is a collection of bound pods
// which can run without conflict on a single minion.
func Allowed(manifests []api.ContainerManifest) bool {
return !PortsConflict(manifests)
func Allowed(pods []api.BoundPod) bool {
return !PortsConflict(pods)
}
36 changes: 18 additions & 18 deletions pkg/constraint/constraint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,69 +30,69 @@ func containerWithHostPorts(ports ...int) api.Container {
return c
}

func manifestWithContainers(containers ...api.Container) api.ContainerManifest {
m := api.ContainerManifest{}
func podWithContainers(containers ...api.Container) api.BoundPod {
m := api.BoundPod{}
for _, c := range containers {
m.Containers = append(m.Containers, c)
m.Spec.Containers = append(m.Spec.Containers, c)
}
return m
}

func TestAllowed(t *testing.T) {
table := []struct {
allowed bool
manifests []api.ContainerManifest
allowed bool
pods []api.BoundPod
}{
{
allowed: true,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(1, 2, 3),
containerWithHostPorts(4, 5, 6),
),
manifestWithContainers(
podWithContainers(
containerWithHostPorts(7, 8, 9),
containerWithHostPorts(10, 11, 12),
),
},
},
{
allowed: true,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(0, 0),
containerWithHostPorts(0, 0),
),
manifestWithContainers(
podWithContainers(
containerWithHostPorts(0, 0),
containerWithHostPorts(0, 0),
),
},
},
{
allowed: false,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(3, 3),
),
},
},
{
allowed: false,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(6),
),
manifestWithContainers(
podWithContainers(
containerWithHostPorts(6),
),
},
},
}

for _, item := range table {
if e, a := item.allowed, Allowed(item.manifests); e != a {
t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.manifests)
if e, a := item.allowed, Allowed(item.pods); e != a {
t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.pods)
}
}
}
6 changes: 3 additions & 3 deletions pkg/constraint/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (

// PortsConflict returns true iff two containers attempt to expose
// the same host port.
func PortsConflict(manifests []api.ContainerManifest) bool {
func PortsConflict(pods []api.BoundPod) bool {
hostPorts := map[int]struct{}{}
for _, manifest := range manifests {
for _, container := range manifest.Containers {
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort == 0 {
continue
Expand Down
6 changes: 3 additions & 3 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe
func New(c *Config) *Master {
minionRegistry := makeMinionRegistry(c)
serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil)
manifestFactory := &pod.BasicManifestFactory{
boundPodFactory := &pod.BasicBoundPodFactory{
ServiceRegistry: serviceRegistry,
}
m := &Master{
podRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory),
podRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
serviceRegistry: serviceRegistry,
endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
bindingRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory),
bindingRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
minionRegistry: minionRegistry,
client: c.Client,
}
Expand Down
36 changes: 18 additions & 18 deletions pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ import (
// Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd.
type Registry struct {
tools.EtcdHelper
manifestFactory pod.ManifestFactory
boundPodFactory pod.BoundPodFactory
}

// NewRegistry creates an etcd registry.
func NewRegistry(helper tools.EtcdHelper, manifestFactory pod.ManifestFactory) *Registry {
func NewRegistry(helper tools.EtcdHelper, boundPodFactory pod.BoundPodFactory) *Registry {
registry := &Registry{
EtcdHelper: helper,
}
registry.manifestFactory = manifestFactory
registry.boundPodFactory = boundPodFactory
return registry
}

Expand Down Expand Up @@ -174,18 +174,18 @@ func (r *Registry) assignPod(podID string, machine string) error {
return err
}
// TODO: move this to a watch/rectification loop.
manifest, err := r.manifestFactory.MakeManifest(machine, *finalPod)
pod, err := r.boundPodFactory.MakeBoundPod(machine, finalPod)
if err != nil {
return err
}
contKey := makeContainerKey(machine)
err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
if !constraint.Allowed(manifests.Items) {
err = r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
pods := *in.(*api.BoundPods)
pods.Items = append(pods.Items, *pod)
if !constraint.Allowed(pods.Items) {
return nil, fmt.Errorf("The assignment would cause a constraint violation")
}
return &manifests, nil
return &pods, nil
})
if err != nil {
// Put the pod's host back the way it was. This is a terrible hack that
Expand Down Expand Up @@ -222,13 +222,13 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
}
// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) {
manifests := in.(*api.ContainerManifestList)
newManifests := make([]api.ContainerManifest, 0, len(manifests.Items))
return r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
pods := in.(*api.BoundPods)
newPods := make([]api.BoundPod, 0, len(pods.Items))
found := false
for _, manifest := range manifests.Items {
if manifest.ID != podID {
newManifests = append(newManifests, manifest)
for _, pod := range pods.Items {
if pod.ID != podID {
newPods = append(newPods, pod)
} else {
found = true
}
Expand All @@ -237,10 +237,10 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
// This really shouldn't happen, it indicates something is broken, and likely
// there is a lost pod somewhere.
// However it is "deleted" so log it and move on
glog.Warningf("Couldn't find: %s in %#v", podID, manifests)
glog.Warningf("Couldn't find: %s in %#v", podID, pods)
}
manifests.Items = newManifests
return manifests, nil
pods.Items = newPods
return pods, nil
})
}

Expand Down
66 changes: 33 additions & 33 deletions pkg/registry/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicManifestFactory{
&pod.BasicBoundPodFactory{
ServiceRegistry: &registrytest.ServiceRegistry{},
})
return registry
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestEtcdCreatePod(t *testing.T) {
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
TypeMeta: api.TypeMeta{
Expand Down Expand Up @@ -156,15 +156,15 @@ func TestEtcdCreatePod(t *testing.T) {
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests api.ContainerManifestList
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests)
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}

Expand Down Expand Up @@ -286,15 +286,15 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests api.ContainerManifestList
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests)
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}

Expand All @@ -308,9 +308,9 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "bar"},
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "bar"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
Expand Down Expand Up @@ -352,15 +352,15 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests api.ContainerManifestList
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests.Items) != 2 || manifests.Items[1].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests)
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 2 || boundPods.Items[1].ID != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}

Expand All @@ -374,9 +374,9 @@ func TestEtcdDeletePod(t *testing.T) {
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{Host: "machine"},
}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
Expand All @@ -394,9 +394,9 @@ func TestEtcdDeletePod(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var manifests api.ContainerManifestList
latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests)
if len(manifests.Items) != 0 {
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 0 {
t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value)
}
}
Expand All @@ -411,10 +411,10 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{Host: "machine"},
}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
{ID: "bar"},
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo"}},
{TypeMeta: api.TypeMeta{ID: "bar"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
Expand All @@ -433,13 +433,13 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var manifests api.ContainerManifestList
latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests)
if len(manifests.Items) != 1 {
t.Fatalf("Unexpected manifest set: %#v, expected empty", manifests)
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 1 {
t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods)
}
if manifests.Items[0].ID != "bar" {
t.Errorf("Deleted wrong manifest: %#v", manifests)
if boundPods.Items[0].ID != "bar" {
t.Errorf("Deleted wrong boundPod: %#v", boundPods)
}
}

Expand Down
Loading

0 comments on commit ea8f2f3

Please sign in to comment.