Skip to content

Commit

Permalink
kubelet: add GetNodeInfo implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Simoncelli <fsimonce@redhat.com>
  • Loading branch information
simon3z committed Mar 10, 2015
1 parent eb0b6f2 commit 1b18440
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 23 deletions.
4 changes: 4 additions & 0 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (fakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.Pod
return r, nil
}

func (fakeKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) {
return api.NodeInfo{}, nil
}

func (fakeKubeletClient) HealthCheck(host string) (probe.Result, error) {
return probe.Success, nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func init() {
&Service{},
&NodeList{},
&Node{},
&NodeInfo{},
&Status{},
&Endpoints{},
&EndpointsList{},
Expand Down Expand Up @@ -70,6 +71,7 @@ func (*ServiceList) IsAnAPIObject() {}
func (*Endpoints) IsAnAPIObject() {}
func (*EndpointsList) IsAnAPIObject() {}
func (*Node) IsAnAPIObject() {}
func (*NodeInfo) IsAnAPIObject() {}
func (*NodeList) IsAnAPIObject() {}
func (*Binding) IsAnAPIObject() {}
func (*Status) IsAnAPIObject() {}
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,13 @@ type NodeStatus struct {
Addresses []NodeAddress `json:"addresses,omitempty"`
}

// NodeInfo is the information collected on the node.
type NodeInfo struct {
TypeMeta `json:",inline"`
// Capacity represents the available resources of a node
Capacity ResourceList `json:"capacity,omitempty"`
}

type NodePhase string

// These are the valid phases of node.
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/v1beta1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func init() {
&EndpointsList{},
&Minion{},
&MinionList{},
&NodeInfo{},
&Binding{},
&Status{},
&Event{},
Expand Down Expand Up @@ -77,6 +78,7 @@ func (*ServiceList) IsAnAPIObject() {}
func (*Endpoints) IsAnAPIObject() {}
func (*EndpointsList) IsAnAPIObject() {}
func (*Minion) IsAnAPIObject() {}
func (*NodeInfo) IsAnAPIObject() {}
func (*MinionList) IsAnAPIObject() {}
func (*Binding) IsAnAPIObject() {}
func (*Status) IsAnAPIObject() {}
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,13 @@ type NodeStatus struct {
Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node"`
}

// NodeInfo is the information collected on the node.
type NodeInfo struct {
TypeMeta `json:",inline"`
// Capacity represents the available resources.
Capacity ResourceList `json:"capacity,omitempty" description:"resource capacity of a node represented as a map of resource name to quantity of resource"`
}

type NodePhase string

// These are the valid phases of node.
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/v1beta2/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func init() {
&Endpoints{},
&EndpointsList{},
&Minion{},
&NodeInfo{},
&MinionList{},
&Binding{},
&Status{},
Expand Down Expand Up @@ -77,6 +78,7 @@ func (*ServiceList) IsAnAPIObject() {}
func (*Endpoints) IsAnAPIObject() {}
func (*EndpointsList) IsAnAPIObject() {}
func (*Minion) IsAnAPIObject() {}
func (*NodeInfo) IsAnAPIObject() {}
func (*MinionList) IsAnAPIObject() {}
func (*Binding) IsAnAPIObject() {}
func (*Status) IsAnAPIObject() {}
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,13 @@ type NodeStatus struct {
Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node"`
}

// NodeInfo is the information collected on the node.
type NodeInfo struct {
TypeMeta `json:",inline"`
// Capacity represents the available resources.
Capacity ResourceList `json:"capacity,omitempty" description:"resource capacity of a node represented as a map of resource name to quantity of resource"`
}

// Described the current lifecycle phase of a node.
//
// https://github.com/GoogleCloudPlatform/kubernetes/blob/master/docs/node.md#node-phase
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/v1beta3/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func init() {
&Endpoints{},
&EndpointsList{},
&Node{},
&NodeInfo{},
&NodeList{},
&Binding{},
&Status{},
Expand Down Expand Up @@ -75,6 +76,7 @@ func (*ServiceList) IsAnAPIObject() {}
func (*Endpoints) IsAnAPIObject() {}
func (*EndpointsList) IsAnAPIObject() {}
func (*Node) IsAnAPIObject() {}
func (*NodeInfo) IsAnAPIObject() {}
func (*NodeList) IsAnAPIObject() {}
func (*Binding) IsAnAPIObject() {}
func (*Status) IsAnAPIObject() {}
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/v1beta3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,13 @@ type NodeStatus struct {
Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node"`
}

// NodeInfo is the information collected on the node.
type NodeInfo struct {
TypeMeta `json:",inline"`
// Capacity represents the available resources of a node
Capacity ResourceList `json:"capacity,omitempty"`
}

type NodePhase string

// These are the valid phases of node.
Expand Down
49 changes: 34 additions & 15 deletions pkg/client/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)

// ErrPodInfoNotAvailable may be returned when the requested pod info is not available.
Expand All @@ -38,6 +39,7 @@ var ErrPodInfoNotAvailable = errors.New("no pod info available")
type KubeletClient interface {
KubeletHealthChecker
PodInfoGetter
NodeInfoGetter
}

// KubeletHealthchecker is an interface for healthchecking kubelets
Expand All @@ -53,6 +55,10 @@ type PodInfoGetter interface {
GetPodStatus(host, podNamespace, podID string) (api.PodStatusResult, error)
}

type NodeInfoGetter interface {
GetNodeInfo(host string) (api.NodeInfo, error)
}

// HTTPKubeletClient is the default implementation of PodInfoGetter and KubeletHealthchecker, accesses the kubelet over HTTP.
type HTTPKubeletClient struct {
Client *http.Client
Expand Down Expand Up @@ -102,33 +108,41 @@ func (c *HTTPKubeletClient) url(host, path, query string) string {

// GetPodInfo gets information about the specified pod.
func (c *HTTPKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.PodStatusResult, error) {
query := url.Values{"podID": {podID}, "podNamespace": {podNamespace}}
request, err := http.NewRequest("GET", c.url(host, "/api/v1beta1/podInfo", query.Encode()), nil)
status := api.PodStatusResult{}
query := url.Values{"podID": {podID}, "podNamespace": {podNamespace}}
response, err := c.getEntity(host, "/api/v1beta1/podInfo", query.Encode(), &status)
if response.StatusCode == http.StatusNotFound {
return status, ErrPodInfoNotAvailable
}
return status, err
}

// GetNodeInfo gets information about the specified node.
func (c *HTTPKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) {
info := api.NodeInfo{}
_, err := c.getEntity(host, "/api/v1beta1/nodeInfo", "", &info)
return info, err
}

func (c *HTTPKubeletClient) getEntity(host, path, query string, entity runtime.Object) (*http.Response, error) {
request, err := http.NewRequest("GET", c.url(host, path, query), nil)
if err != nil {
return status, err
return nil, err
}
response, err := c.Client.Do(request)
if err != nil {
return status, err
return response, err
}
defer response.Body.Close()
if response.StatusCode == http.StatusNotFound {
return status, ErrPodInfoNotAvailable
}
if response.StatusCode >= 300 || response.StatusCode < 200 {
return status, fmt.Errorf("kubelet %q server responded with HTTP error code %d for pod %s/%s", host, response.StatusCode, podNamespace, podID)
return response, fmt.Errorf("kubelet %q server responded with HTTP error code %d", host, response.StatusCode)
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return status, err
return response, err
}
// Check that this data can be unmarshalled
err = latest.Codec.DecodeInto(body, &status)
if err != nil {
return status, err
}
return status, nil
err = latest.Codec.DecodeInto(body, entity)
return response, err
}

func (c *HTTPKubeletClient) HealthCheck(host string) (probe.Result, error) {
Expand All @@ -145,6 +159,11 @@ func (c FakeKubeletClient) GetPodStatus(host, podNamespace string, podID string)
return api.PodStatusResult{}, errors.New("Not Implemented")
}

// GetNodeInfo is a fake implementation of PodInfoGetter.GetNodeInfo
func (c FakeKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) {
return api.NodeInfo{}, errors.New("Not Implemented")
}

func (c FakeKubeletClient) HealthCheck(host string) (probe.Result, error) {
return probe.Unknown, errors.New("Not Implemented")
}
24 changes: 19 additions & 5 deletions pkg/cloudprovider/controller/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type NodeController struct {
staticResources *api.NodeResources
nodes []string
kubeClient client.Interface
kubeletClient client.KubeletHealthChecker
kubeletClient client.KubeletClient
registerRetryCount int
podEvictionTimeout time.Duration
}
Expand All @@ -65,7 +65,7 @@ func NewNodeController(
nodes []string,
staticResources *api.NodeResources,
kubeClient client.Interface,
kubeletClient client.KubeletHealthChecker,
kubeletClient client.KubeletClient,
registerRetryCount int,
podEvictionTimeout time.Duration) *NodeController {
return &NodeController{
Expand Down Expand Up @@ -219,7 +219,7 @@ func (s *NodeController) SyncNodeStatus() error {
if err != nil {
return err
}
nodes = s.DoChecks(nodes)
nodes = s.UpdateNodesStatus(nodes)
nodes, err = s.PopulateAddresses(nodes)
if err != nil {
return err
Expand Down Expand Up @@ -304,20 +304,34 @@ func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList,
return nodes, nil
}

// DoChecks performs health checking for given list of nodes.
func (s *NodeController) DoChecks(nodes *api.NodeList) *api.NodeList {
// UpdateNodesStatus performs health checking for given list of nodes.
func (s *NodeController) UpdateNodesStatus(nodes *api.NodeList) *api.NodeList {
var wg sync.WaitGroup
wg.Add(len(nodes.Items))
for i := range nodes.Items {
go func(node *api.Node) {
node.Status.Conditions = s.DoCheck(node)
if err := s.updateNodeInfo(node); err != nil {
glog.Errorf("Can't collect information for node %s: %v", node.Name, err)
}
wg.Done()
}(&nodes.Items[i])
}
wg.Wait()
return nodes
}

func (s *NodeController) updateNodeInfo(node *api.Node) error {
nodeInfo, err := s.kubeletClient.GetNodeInfo(node.Name)
if err != nil {
return err
}
for key, value := range nodeInfo.Capacity {
node.Spec.Capacity[key] = value
}
return nil
}

// DoCheck performs health checking for given node.
func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition {
var conditions []api.NodeCondition
Expand Down
4 changes: 4 additions & 0 deletions pkg/cloudprovider/controller/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func (c *FakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.
return api.PodStatusResult{}, errors.New("Not Implemented")
}

func (c *FakeKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) {
return api.NodeInfo{}, errors.New("Not Implemented")
}

func (c *FakeKubeletClient) HealthCheck(host string) (probe.Result, error) {
return c.Status, c.Err
}
Expand Down
47 changes: 44 additions & 3 deletions pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
Expand All @@ -46,8 +47,9 @@ import (

// Server is a http.Handler which exposes kubelet functionality over HTTP.
type Server struct {
host HostInterface
mux *http.ServeMux
host HostInterface
mux *http.ServeMux
machineInfo *cadvisorApi.MachineInfo
}

type TLSOptions struct {
Expand Down Expand Up @@ -114,6 +116,7 @@ func (s *Server) InstallDefaultHandlers() {
healthz.InstallHandler(s.mux)
s.mux.HandleFunc("/podInfo", s.handlePodInfoOld)
s.mux.HandleFunc("/api/v1beta1/podInfo", s.handlePodInfoVersioned)
s.mux.HandleFunc("/api/v1beta1/nodeInfo", s.handleNodeInfoVersioned)
s.mux.HandleFunc("/boundPods", s.handleBoundPods)
s.mux.HandleFunc("/stats/", s.handleStats)
s.mux.HandleFunc("/spec/", s.handleSpec)
Expand Down Expand Up @@ -329,9 +332,47 @@ func (s *Server) handleLogs(w http.ResponseWriter, req *http.Request) {
s.host.ServeLogs(w, req)
}

// getCachedMachineInfo assumes that the machine info can't change without a reboot
func (s *Server) getCachedMachineInfo() (*cadvisorApi.MachineInfo, error) {
if s.machineInfo == nil {
info, err := s.host.GetMachineInfo()
if err != nil {
return nil, err
}
s.machineInfo = info
}
return s.machineInfo, nil
}

// handleNodeInfoVersioned handles node info requests against the Kubelet.
func (s *Server) handleNodeInfoVersioned(w http.ResponseWriter, req *http.Request) {
info, err := s.getCachedMachineInfo()
if err != nil {
s.error(w, err)
return
}
capacity := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
data, err := json.Marshal(api.NodeInfo{
Capacity: capacity,
})
if err != nil {
s.error(w, err)
return
}
w.Header().Add("Content-type", "application/json")
w.Write(data)
}

// handleSpec handles spec requests against the Kubelet.
func (s *Server) handleSpec(w http.ResponseWriter, req *http.Request) {
info, err := s.host.GetMachineInfo()
info, err := s.getCachedMachineInfo()
if err != nil {
s.error(w, err)
return
Expand Down

0 comments on commit 1b18440

Please sign in to comment.