Skip to content

Commit

Permalink
Merge pull request kubernetes#5433 from wojtek-t/remove_bound_pods
Browse files Browse the repository at this point in the history
Remove BoundPods from Kubelet
  • Loading branch information
vmarmol committed Mar 16, 2015
2 parents 69ce2b0 + 5d95e9e commit bdc1981
Show file tree
Hide file tree
Showing 27 changed files with 193 additions and 224 deletions.
16 changes: 6 additions & 10 deletions pkg/api/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ func init() {
*out = *in.Copy()
return nil
},
// Convert ContainerManifest to BoundPod
//
// This function generates a dummy selfLink using the same method as the
// boundPod registry, in order for the Kubelet to work with well-formed
// boundPods during the integration test.
func(in *ContainerManifest, out *BoundPod, s conversion.Scope) error {
// Convert ContainerManifest to Pod
func(in *ContainerManifest, out *Pod, s conversion.Scope) error {
out.Spec.Containers = in.Containers
out.Spec.Volumes = in.Volumes
out.Spec.RestartPolicy = in.RestartPolicy
Expand All @@ -53,12 +49,12 @@ func init() {
out.UID = in.UUID

if in.ID != "" {
out.SelfLink = "/api/v1beta1/boundPods/" + in.ID
out.SelfLink = "/api/v1beta1/pods/" + in.ID
}

return nil
},
func(in *BoundPod, out *ContainerManifest, s conversion.Scope) error {
func(in *Pod, out *ContainerManifest, s conversion.Scope) error {
out.Containers = in.Spec.Containers
out.Volumes = in.Spec.Volumes
out.RestartPolicy = in.Spec.RestartPolicy
Expand All @@ -70,7 +66,7 @@ func init() {
},

// ContainerManifestList
func(in *ContainerManifestList, out *BoundPods, s conversion.Scope) error {
func(in *ContainerManifestList, out *PodList, s conversion.Scope) error {
if err := s.Convert(&in.Items, &out.Items, 0); err != nil {
return err
}
Expand All @@ -80,7 +76,7 @@ func init() {
}
return nil
},
func(in *BoundPods, out *ContainerManifestList, s conversion.Scope) error {
func(in *PodList, out *ContainerManifestList, s conversion.Scope) error {
if err := s.Convert(&in.Items, &out.Items, 0); err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func init() {
&ContainerManifest{},
&ContainerManifestList{},
&BoundPod{},
&BoundPods{},
&List{},
&LimitRange{},
&LimitRangeList{},
Expand Down Expand Up @@ -79,7 +78,6 @@ func (*EventList) IsAnAPIObject() {}
func (*ContainerManifest) IsAnAPIObject() {}
func (*ContainerManifestList) IsAnAPIObject() {}
func (*BoundPod) IsAnAPIObject() {}
func (*BoundPods) IsAnAPIObject() {}
func (*List) IsAnAPIObject() {}
func (*LimitRange) IsAnAPIObject() {}
func (*LimitRangeList) IsAnAPIObject() {}
Expand Down
19 changes: 4 additions & 15 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ type EventList struct {
// ContainerManifest corresponds to the Container Manifest format, documented at:
// https://developers.google.com/compute/docs/containers/container_vms#container_manifest
// This is used as the representation of Kubernetes workloads.
// DEPRECATED: Replaced with BoundPod
// DEPRECATED: Replaced with Pod
type ContainerManifest struct {
// Required: This must be a supported version string, such as "v1beta1".
Version string `json:"version"`
Expand All @@ -1262,7 +1262,7 @@ type ContainerManifest struct {
}

// ContainerManifestList is used to communicate container manifests to kubelet.
// DEPRECATED: Replaced with BoundPods
// DEPRECATED: Replaced with Pods
type ContainerManifestList struct {
TypeMeta `json:",inline"`
ListMeta `json:"metadata,omitempty"`
Expand All @@ -1273,6 +1273,8 @@ type ContainerManifestList struct {
// BoundPod is a collection of containers that should be run on a host. A BoundPod
// defines how a Pod may change after a Binding is created. A Pod is a request to
// execute a pod, whereas a BoundPod is the specification that would be run on a server.
//
// TODO(wojtek-t): Get rid of this type.
type BoundPod struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`
Expand All @@ -1281,19 +1283,6 @@ type BoundPod struct {
Spec PodSpec `json:"spec,omitempty"`
}

// BoundPods is a list of Pods bound to a common server. The resource version of
// the pod list is guaranteed to only change when the list of bound pods changes.
type BoundPods struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`

// Host is the name of a node that these pods were bound to.
Host string `json:"host"`

// Items is the list of all pods bound to a given host.
Items []BoundPod `json:"items"`
}

// List holds a list of objects, which may not be known by the server.
type List struct {
TypeMeta `json:",inline"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ type BoundPods struct {
Host string `json:"host" description:"name of a node that these pods were bound to"`

// Items is the list of all pods bound to a given host.
Items []BoundPod `json:"items" description:"list of all pods bound to a given host"`
Items []Pod `json:"items" description:"list of all pods bound to a given host"`
}

// List holds a list of objects, which may not be known by the server.
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ type BoundPods struct {
Host string `json:"host" description:"name of a node that these pods were bound to"`

// Items is the list of all pods bound to a given host.
Items []BoundPod `json:"items" description:"list of all pods bound to a given host"`
Items []Pod `json:"items" description:"list of all pods bound to a given host"`
}

// List holds a list of objects, which may not be known by the server.
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/v1beta3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ type BoundPods struct {
Host string `json:"host" description:"name of a node that these pods were bound to"`

// Items is the list of all pods bound to a given host.
Items []BoundPod `json:"items" description:"list of all pods bound to a given host"`
Items []Pod `json:"items" description:"list of all pods bound to a given host"`
}

// ReplicationControllerSpec is the specification of a replication controller.
Expand Down
16 changes: 5 additions & 11 deletions pkg/kubelet/config/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/golang/glog"
)

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
Expand All @@ -35,19 +34,14 @@ func NewSourceApiserver(client *client.Client, hostname string, updates chan<- i
// newSourceApiserverFromLW holds creates a config source that watches an pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var bpods []api.BoundPod
var pods []api.Pod
for _, o := range objs {
pod := o.(*api.Pod)
bpod := api.BoundPod{}
if err := api.Scheme.Convert(pod, &bpod); err != nil {
glog.Errorf("Unable to interpret Pod from apiserver as a BoundPod: %v: %+v", err, pod)
continue
}
// Make a dummy self link so that references to this bound pod will work.
bpod.SelfLink = "/api/v1beta1/boundPods/" + bpod.Name
bpods = append(bpods, bpod)
// Make a dummy self link so that references to this pod will work.
pod.SelfLink = "/api/v1beta1/pods/" + pod.Name
pods = append(pods, *pod)
}
updates <- kubelet.PodUpdate{bpods, kubelet.SET, kubelet.ApiserverSource}
updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.ApiserverSource}
}
cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/kubelet/config/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "q"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/blah"}}}}

expectedBoundPod1v1 := api.BoundPod{
ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"},
expectedPod1v1 := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/pods/p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}}
expectedBoundPod1v2 := api.BoundPod{
ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"},
expectedPod1v2 := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/pods/p"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}}
expectedBoundPod2 := api.BoundPod{
ObjectMeta: api.ObjectMeta{Name: "q", SelfLink: "/api/v1beta1/boundPods/q"},
expectedPod2 := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "q", SelfLink: "/api/v1beta1/pods/q"},
Spec: api.PodSpec{Containers: []api.Container{{Image: "image/blah"}}}}

// Setup fake api client.
Expand All @@ -80,7 +80,7 @@ func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) {
t.Errorf("Unable to read from channel when expected")
}
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v1)
expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedPod1v1)
if !api.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v; Got %#v", expected, update)
}
Expand All @@ -93,8 +93,8 @@ func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) {
}
update = got.(kubelet.PodUpdate)
// Could be sorted either of these two ways:
expectedA := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v1, expectedBoundPod2)
expectedB := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2, expectedBoundPod1v1)
expectedA := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedPod1v1, expectedPod2)
expectedB := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedPod2, expectedPod1v1)

if !api.Semantic.DeepEqual(expectedA, update) && !api.Semantic.DeepEqual(expectedB, update) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update)
Expand All @@ -107,8 +107,8 @@ func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubelet.PodUpdate)
expectedA = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v2, expectedBoundPod2)
expectedB = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2, expectedBoundPod1v2)
expectedA = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedPod1v2, expectedPod2)
expectedB = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedPod2, expectedPod1v2)

if !api.Semantic.DeepEqual(expectedA, update) && !api.Semantic.DeepEqual(expectedB, update) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update)
Expand All @@ -121,7 +121,7 @@ func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubelet.PodUpdate)
expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2)
expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedPod2)
if !api.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/kubelet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *PodConfig) Sync() {
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod name to pod reference
pods map[string]map[string]*api.BoundPod
pods map[string]map[string]*api.Pod
mode PodConfigNotificationMode

// ensures that updates are delivered in strict order
Expand All @@ -134,7 +134,7 @@ type podStorage struct {
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
return &podStorage{
pods: make(map[string]map[string]*api.BoundPod),
pods: make(map[string]map[string]*api.Pod),
mode: mode,
updates: updates,
sourcesSeen: util.StringSet{},
Expand Down Expand Up @@ -169,12 +169,12 @@ func (s *podStorage) Merge(source string, change interface{}) error {
s.updates <- *updates
}
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.Pod), kubelet.SET, source}
}

case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.Pod), kubelet.SET, source}
}

default:
Expand All @@ -194,7 +194,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de

pods := s.pods[source]
if pods == nil {
pods = make(map[string]*api.BoundPod)
pods = make(map[string]*api.Pod)
}

update := change.(kubelet.PodUpdate)
Expand Down Expand Up @@ -246,7 +246,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[string]*api.BoundPod)
pods = make(map[string]*api.Pod)

filtered := filterInvalidPods(update.Pods, source, s.recorder)
for _, ref := range filtered {
Expand Down Expand Up @@ -298,12 +298,12 @@ func (s *podStorage) seenSources(sources ...string) bool {
return s.sourcesSeen.HasAll(sources...)
}

func filterInvalidPods(pods []api.BoundPod, source string, recorder record.EventRecorder) (filtered []*api.BoundPod) {
func filterInvalidPods(pods []api.Pod, source string, recorder record.EventRecorder) (filtered []*api.Pod) {
names := util.StringSet{}
for i := range pods {
pod := &pods[i]
var errlist []error
if errs := validation.ValidateBoundPod(pod); len(errs) != 0 {
if errs := validation.ValidatePod(pod); len(errs) != 0 {
errlist = append(errlist, errs...)
// If validation fails, don't trust it any further -
// even Name could be bad.
Expand Down Expand Up @@ -331,27 +331,27 @@ func filterInvalidPods(pods []api.BoundPod, source string, recorder record.Event
func (s *podStorage) Sync() {
s.updateLock.Lock()
defer s.updateLock.Unlock()
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, kubelet.AllSource}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.Pod), kubelet.SET, kubelet.AllSource}
}

// Object implements config.Accessor
func (s *podStorage) MergedState() interface{} {
s.podLock.RLock()
defer s.podLock.RUnlock()
pods := make([]api.BoundPod, 0)
pods := make([]api.Pod, 0)
for _, sourcePods := range s.pods {
for _, podRef := range sourcePods {
pod, err := api.Scheme.Copy(podRef)
if err != nil {
glog.Errorf("unable to copy pod: %v", err)
}
pods = append(pods, *pod.(*api.BoundPod))
pods = append(pods, *pod.(*api.Pod))
}
}
return pods
}

func bestPodIdentString(pod *api.BoundPod) string {
func bestPodIdentString(pod *api.Pod) string {
namespace := pod.Namespace
if namespace == "" {
namespace = "<empty-namespace>"
Expand Down
14 changes: 7 additions & 7 deletions pkg/kubelet/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
}
}

type sortedPods []api.BoundPod
type sortedPods []api.Pod

func (s sortedPods) Len() int {
return len(s)
Expand All @@ -51,8 +51,8 @@ func (s sortedPods) Less(i, j int) bool {
return s[i].Namespace < s[j].Namespace
}

func CreateValidPod(name, namespace, source string) api.BoundPod {
return api.BoundPod{
func CreateValidPod(name, namespace, source string) api.Pod {
return api.Pod{
ObjectMeta: api.ObjectMeta{
UID: types.UID(name), // for the purpose of testing, this is unique enough
Name: name,
Expand All @@ -66,8 +66,8 @@ func CreateValidPod(name, namespace, source string) api.BoundPod {
}
}

func CreatePodUpdate(op kubelet.PodOperation, source string, pods ...api.BoundPod) kubelet.PodUpdate {
newPods := make([]api.BoundPod, len(pods))
func CreatePodUpdate(op kubelet.PodOperation, source string, pods ...api.Pod) kubelet.PodUpdate {
newPods := make([]api.Pod, len(pods))
for i := range pods {
newPods[i] = pods[i]
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestInvalidPodFiltered(t *testing.T) {
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))

// add an invalid update
podUpdate = CreatePodUpdate(kubelet.UPDATE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
podUpdate = CreatePodUpdate(kubelet.UPDATE, NoneSource, api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
channel <- podUpdate
expectNoPodUpdate(t, ch)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))

podUpdate = CreatePodUpdate(kubelet.REMOVE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}})
podUpdate = CreatePodUpdate(kubelet.REMOVE, NoneSource, api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}})
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, NoneSource, pod))
}
Expand Down
Loading

0 comments on commit bdc1981

Please sign in to comment.