Skip to content

Commit

Permalink
Merge pull request #19746 from madhusudancs/replicaset-client
Browse files Browse the repository at this point in the history
Implement ReplicaSet client.
  • Loading branch information
fabioy committed Jan 29, 2016
2 parents b4188ec + c25a0ce commit e827c1c
Show file tree
Hide file tree
Showing 8 changed files with 580 additions and 0 deletions.
56 changes: 56 additions & 0 deletions pkg/client/cache/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,62 @@ func (s *StoreToDeploymentLister) GetDeploymentsForRC(rc *api.ReplicationControl
return
}

// StoreToReplicaSetLister gives a store List and Exists methods. The store must contain only ReplicaSets.
type StoreToReplicaSetLister struct {
Store
}

// Exists checks if the given ReplicaSet exists in the store.
func (s *StoreToReplicaSetLister) Exists(rs *extensions.ReplicaSet) (bool, error) {
_, exists, err := s.Store.Get(rs)
if err != nil {
return false, err
}
return exists, nil
}

// List lists all ReplicaSets in the store.
// TODO: converge on the interface in pkg/client
func (s *StoreToReplicaSetLister) List() (rss []extensions.ReplicaSet, err error) {
for _, rs := range s.Store.List() {
rss = append(rss, *(rs.(*extensions.ReplicaSet)))
}
return rss, nil
}

// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found.
func (s *StoreToReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
var selector labels.Selector
var rs extensions.ReplicaSet

if len(pod.Labels) == 0 {
err = fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name)
return
}

for _, m := range s.Store.List() {
rs = *m.(*extensions.ReplicaSet)
if rs.Namespace != pod.Namespace {
continue
}
selector, err = extensions.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
err = fmt.Errorf("failed to convert pod selector to selector: %v", err)
return
}

// If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
rss = append(rss, rs)
}
if len(rss) == 0 {
err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}

// StoreToDaemonSetLister gives a store List and Exists methods. The store must contain only DaemonSets.
type StoreToDaemonSetLister struct {
Store
Expand Down
112 changes: 112 additions & 0 deletions pkg/client/cache/listers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,118 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
}
}

func TestStoreToReplicaSetLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
lister := StoreToReplicaSetLister{store}
testCases := []struct {
inRSs []*extensions.ReplicaSet
list func() ([]extensions.ReplicaSet, error)
outRSNames sets.String
expectErr bool
}{
// Basic listing with all labels and no selectors
{
inRSs: []*extensions.ReplicaSet{
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
},
list: func() ([]extensions.ReplicaSet, error) {
return lister.List()
},
outRSNames: sets.NewString("basic"),
},
// No pod labels
{
inRSs: []*extensions.ReplicaSet{
{
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
Spec: extensions.ReplicaSetSpec{
Selector: &extensions.LabelSelector{MatchLabels: map[string]string{"foo": "baz"}},
},
},
},
list: func() ([]extensions.ReplicaSet, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"},
}
return lister.GetPodReplicaSets(pod)
},
outRSNames: sets.NewString(),
expectErr: true,
},
// No ReplicaSet selectors
{
inRSs: []*extensions.ReplicaSet{
{
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
},
},
list: func() ([]extensions.ReplicaSet, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
Namespace: "ns",
Labels: map[string]string{"foo": "bar"},
},
}
return lister.GetPodReplicaSets(pod)
},
outRSNames: sets.NewString(),
expectErr: true,
},
// Matching labels to selectors and namespace
{
inRSs: []*extensions.ReplicaSet{
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: extensions.ReplicaSetSpec{
Selector: &extensions.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
Spec: extensions.ReplicaSetSpec{
Selector: &extensions.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
},
},
},
list: func() ([]extensions.ReplicaSet, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
Labels: map[string]string{"foo": "bar"},
Namespace: "ns",
},
}
return lister.GetPodReplicaSets(pod)
},
outRSNames: sets.NewString("bar"),
},
}
for _, c := range testCases {
for _, r := range c.inRSs {
store.Add(r)
}

gotRSs, err := c.list()
if err != nil && c.expectErr {
continue
} else if c.expectErr {
t.Error("Expected error, got none")
continue
} else if err != nil {
t.Errorf("Unexpected error %#v", err)
continue
}
gotNames := make([]string, len(gotRSs))
for ix := range gotRSs {
gotNames[ix] = gotRSs[ix].Name
}
if !c.outRSNames.HasAll(gotNames...) || len(gotNames) != len(c.outRSNames) {
t.Errorf("Unexpected got ReplicaSets %+v expected %+v", gotNames, c.outRSNames)
}
}
}

func TestStoreToDaemonSetLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
lister := StoreToDaemonSetLister{store}
Expand Down
23 changes: 23 additions & 0 deletions pkg/client/unversioned/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,29 @@ func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationContro
}
}

// ReplicaSetHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a ReplicaSet's ReplicaSelector equals the Replicas count.
func ReplicaSetHasDesiredReplicas(c ExtensionsInterface, replicaSet *extensions.ReplicaSet) wait.ConditionFunc {

// If we're given a ReplicaSet where the status lags the spec, it either means that the
// ReplicaSet is stale, or that the ReplicaSet manager hasn't noticed the update yet.
// Polling status.Replicas is not safe in the latter case.
desiredGeneration := replicaSet.Generation

return func() (bool, error) {
rs, err := c.ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name)
if err != nil {
return false, err
}
// There's a chance a concurrent update modifies the Spec.Replicas causing this check to
// pass, or, after this check has passed, a modification causes the ReplicaSet manager to
// create more pods. This will not be an issue once we've implemented graceful delete for
// ReplicaSets, but till then concurrent stop operations on the same ReplicaSet might have
// unintended side effects.
return rs.Status.ObservedGeneration >= desiredGeneration && rs.Status.Replicas == rs.Spec.Replicas, nil
}
}

// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count
// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count.
func JobHasDesiredParallelism(c ExtensionsInterface, job *extensions.Job) wait.ConditionFunc {
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/unversioned/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ExtensionsInterface interface {
JobsNamespacer
IngressNamespacer
ThirdPartyResourceNamespacer
ReplicaSetsNamespacer
}

// ExtensionsClient is used to interact with experimental Kubernetes features.
Expand Down Expand Up @@ -71,6 +72,10 @@ func (c *ExtensionsClient) ThirdPartyResources(namespace string) ThirdPartyResou
return newThirdPartyResources(c, namespace)
}

func (c *ExtensionsClient) ReplicaSets(namespace string) ReplicaSetInterface {
return newReplicaSets(c, namespace)
}

// NewExtensions creates a new ExtensionsClient for the given config. This client
// provides access to experimental Kubernetes features.
// Features of Extensions group are not supported and may be changed or removed in
Expand Down
100 changes: 100 additions & 0 deletions pkg/client/unversioned/replica_sets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package unversioned

import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/watch"
)

// ReplicaSetsNamespacer has methods to work with ReplicaSet resources in a namespace
type ReplicaSetsNamespacer interface {
ReplicaSets(namespace string) ReplicaSetInterface
}

// ReplicaSetInterface has methods to work with ReplicaSet resources.
type ReplicaSetInterface interface {
List(opts api.ListOptions) (*extensions.ReplicaSetList, error)
Get(name string) (*extensions.ReplicaSet, error)
Create(ctrl *extensions.ReplicaSet) (*extensions.ReplicaSet, error)
Update(ctrl *extensions.ReplicaSet) (*extensions.ReplicaSet, error)
UpdateStatus(ctrl *extensions.ReplicaSet) (*extensions.ReplicaSet, error)
Delete(name string, options *api.DeleteOptions) error
Watch(opts api.ListOptions) (watch.Interface, error)
}

// replicaSets implements ReplicaSetsNamespacer interface
type replicaSets struct {
client *ExtensionsClient
ns string
}

// newReplicaSets returns a ReplicaSetClient
func newReplicaSets(c *ExtensionsClient, namespace string) *replicaSets {
return &replicaSets{c, namespace}
}

// List takes a selector, and returns the list of ReplicaSets that match that selector.
func (c *replicaSets) List(opts api.ListOptions) (result *extensions.ReplicaSetList, err error) {
result = &extensions.ReplicaSetList{}
err = c.client.Get().Namespace(c.ns).Resource("replicasets").VersionedParams(&opts, api.Scheme).Do().Into(result)
return
}

// Get returns information about a particular ReplicaSet.
func (c *replicaSets) Get(name string) (result *extensions.ReplicaSet, err error) {
result = &extensions.ReplicaSet{}
err = c.client.Get().Namespace(c.ns).Resource("replicasets").Name(name).Do().Into(result)
return
}

// Create creates a new ReplicaSet.
func (c *replicaSets) Create(rs *extensions.ReplicaSet) (result *extensions.ReplicaSet, err error) {
result = &extensions.ReplicaSet{}
err = c.client.Post().Namespace(c.ns).Resource("replicasets").Body(rs).Do().Into(result)
return
}

// Update updates an existing ReplicaSet.
func (c *replicaSets) Update(rs *extensions.ReplicaSet) (result *extensions.ReplicaSet, err error) {
result = &extensions.ReplicaSet{}
err = c.client.Put().Namespace(c.ns).Resource("replicasets").Name(rs.Name).Body(rs).Do().Into(result)
return
}

// UpdateStatus updates an existing ReplicaSet status
func (c *replicaSets) UpdateStatus(rs *extensions.ReplicaSet) (result *extensions.ReplicaSet, err error) {
result = &extensions.ReplicaSet{}
err = c.client.Put().Namespace(c.ns).Resource("replicasets").Name(rs.Name).SubResource("status").Body(rs).Do().Into(result)
return
}

// Delete deletes an existing ReplicaSet.
func (c *replicaSets) Delete(name string, options *api.DeleteOptions) (err error) {
return c.client.Delete().Namespace(c.ns).Resource("replicasets").Name(name).Body(options).Do().Error()
}

// Watch returns a watch.Interface that watches the requested ReplicaSets.
func (c *replicaSets) Watch(opts api.ListOptions) (watch.Interface, error) {
return c.client.Get().
Prefix("watch").
Namespace(c.ns).
Resource("replicasets").
VersionedParams(&opts, api.Scheme).
Watch()
}
Loading

0 comments on commit e827c1c

Please sign in to comment.