Skip to content

Commit

Permalink
Implement a cachedNodeInfo in predicates
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangyaoguo committed Nov 27, 2015
1 parent 5f10b70 commit f4c5d00
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 21 deletions.
17 changes: 0 additions & 17 deletions pkg/client/cache/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,23 +138,6 @@ func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) {
return
}

// TODO Move this back to scheduler as a helper function that takes a Store,
// rather than a method of StoreToNodeLister.
// GetNodeInfo returns cached data for the node 'id'.
func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) {
node, exists, err := s.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}})

if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
}

if !exists {
return nil, fmt.Errorf("node '%v' is not in cache", id)
}

return node.(*api.Node), nil
}

// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers.
type StoreToReplicationControllerLister struct {
Store
Expand Down
6 changes: 4 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func NewMainKubelet(
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
nodeInfo := &predicates.CachedNodeInfo{nodeLister}

// TODO: get the real node object of ourself,
// and use the real node name and UID.
Expand Down Expand Up @@ -301,6 +302,7 @@ func NewMainKubelet(
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
nodeInfo: nodeInfo,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
Expand Down Expand Up @@ -473,7 +475,6 @@ type serviceLister interface {

type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}

// Kubelet is the main kubelet implementation.
Expand Down Expand Up @@ -527,6 +528,7 @@ type Kubelet struct {
masterServiceNamespace string
serviceLister serviceLister
nodeLister nodeLister
nodeInfo predicates.NodeInfo

// a list of node labels to register
nodeLabels []string
Expand Down Expand Up @@ -822,7 +824,7 @@ func (kl *Kubelet) GetNode() (*api.Node, error) {
if kl.standaloneMode {
return nil, errors.New("no node entry for kubelet in standalone mode")
}
return kl.nodeLister.GetNodeInfo(kl.nodeName)
return kl.nodeInfo.GetNodeInfo(kl.nodeName)
}

// Starts garbage collection threads.
Expand Down
10 changes: 9 additions & 1 deletion pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
kubelet.nodeInfo = testNodeInfo{}
kubelet.recorder = fakeRecorder
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
Expand Down Expand Up @@ -1045,7 +1046,11 @@ type testNodeLister struct {
nodes []api.Node
}

func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) {
type testNodeInfo struct {
nodes []api.Node
}

func (ls testNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
for _, node := range ls.nodes {
if node.Name == id {
return &node, nil
Expand Down Expand Up @@ -2319,6 +2324,9 @@ func TestHandleNodeSelector(t *testing.T) {
kl.nodeLister = testNodeLister{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}},
}}
kl.nodeInfo = testNodeInfo{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}},
}}
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/runonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestRunOnce(t *testing.T) {
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
nodeLister: testNodeLister{},
nodeInfo: testNodeInfo{},
statusManager: status.NewManager(nil, podManager),
containerRefManager: kubecontainer.NewRefManager(),
podManager: podManager,
Expand Down
20 changes: 20 additions & 0 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
Expand Down Expand Up @@ -52,6 +53,25 @@ func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) {
return nodes.Nodes().Get(nodeID)
}

type CachedNodeInfo struct {
*cache.StoreToNodeLister
}

// GetNodeInfo returns cached data for the node 'id'.
func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
node, exists, err := c.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}})

if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
}

if !exists {
return nil, fmt.Errorf("node '%v' is not in cache", id)
}

return node.(*api.Node), nil
}

func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
if volume.GCEPersistentDisk != nil {
disk := volume.GCEPersistentDisk
Expand Down
3 changes: 2 additions & 1 deletion plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/api/validation"

Expand Down Expand Up @@ -176,7 +177,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
ControllerLister: f.ControllerLister,
// All fit predicates only need to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: f.NodeLister,
NodeInfo: &predicates.CachedNodeInfo{f.NodeLister},
}
predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs)
if err != nil {
Expand Down

1 comment on commit f4c5d00

@k8s-teamcity-mesosphere

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity OSS :: Kubernetes Mesos :: 4 - Smoke Tests Build 7049 outcome was SUCCESS
Summary: Tests passed: 1, ignored: 204 Build time: 00:05:17

Please sign in to comment.