Skip to content

Commit

Permalink
Modify nodes to register directly with the master.
Browse files Browse the repository at this point in the history
  • Loading branch information
roberthbailey committed Apr 17, 2015
1 parent e1b0320 commit fcae414
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 876 deletions.
9 changes: 9 additions & 0 deletions cluster/saltbase/salt/kubelet/default
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
{% set api_servers = "--api_servers=https://" + ips[0][0] + ":6443" -%}
{% endif -%}

# Disable registration for the kubelet running on the master on GCE.
# TODO(roberthbailey): Make this configurable via an env var in config-default.sh
{% if grains.cloud == 'gce' -%}
{% if grains['roles'][0] == 'kubernetes-master' -%}
{% set api_servers = "" -%}
{% endif -%}
{% endif -%}


{% set config = "--config=/etc/kubernetes/manifests" -%}
{% set hostname_override = "" -%}
{% if grains.minion_ip is defined -%}
Expand Down
309 changes: 2 additions & 307 deletions pkg/cloudprovider/nodecontroller/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"errors"
"fmt"
"net"
"strings"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
Expand Down Expand Up @@ -136,249 +134,16 @@ func NewNodeController(
}
}

// Run creates initial node list and start syncing instances from cloudprovider, if any.
// It also starts syncing or monitoring cluster node status.
// 1. registerNodes() is called only once to register all initial nodes (from cloudprovider
// or from command line flag). To make cluster bootstrap faster, node controller populates
// node addresses.
// 2. syncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider.
// Node created here will only have specs.
// 3. monitorNodeStatus() is called periodically to incorporate the results of node status
// pushed from kubelet to master.
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
// Register intial set of nodes with their status set.
var nodes *api.NodeList
var err error
if nc.isRunningCloudProvider() {
if syncNodeList {
if nodes, err = nc.getCloudNodesWithSpec(); err != nil {
glog.Errorf("Error loading initial node from cloudprovider: %v", err)
}
} else {
nodes = &api.NodeList{}
}
} else {
if nodes, err = nc.getStaticNodesWithSpec(); err != nil {
glog.Errorf("Error loading initial static nodes: %v", err)
}
}

if nodes, err = nc.populateAddresses(nodes); err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
}
if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil {
glog.Errorf("Error registering node list %+v: %v", nodes, err)
}

// Start syncing node list from cloudprovider.
if syncNodeList && nc.isRunningCloudProvider() {
go util.Forever(func() {
if err := nc.syncCloudNodes(); err != nil {
glog.Errorf("Error syncing cloud: %v", err)
}
}, period)
}

// Start monitoring node status.
// Incorporate the results of node status pushed from kubelet to master.
go util.Forever(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod)
}

// registerNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error {
if len(nodes.Items) == 0 {
return nil
}

registered := util.NewStringSet()
nodes = nc.canonicalizeName(nodes)
for i := 0; i < retryCount; i++ {
for _, node := range nodes.Items {
if registered.Has(node.Name) {
continue
}
_, err := nc.kubeClient.Nodes().Create(&node)
if err == nil || apierrors.IsAlreadyExists(err) {
registered.Insert(node.Name)
glog.Infof("Registered node in registry: %s", node.Name)
} else {
glog.Errorf("Error registering node %s, retrying: %s", node.Name, err)
}
if registered.Len() == len(nodes.Items) {
glog.Infof("Successfully registered all nodes")
return nil
}
}
time.Sleep(retryInterval)
}
if registered.Len() != len(nodes.Items) {
return ErrRegistration
} else {
return nil
}
}

// reconcileExternalServices updates balancers for external services, so that they will match the nodes given.
// Returns true if something went wrong and we should call reconcile again.
func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) (shouldRetry bool) {
balancer, ok := nc.cloud.TCPLoadBalancer()
if !ok {
glog.Error("The cloud provider does not support external TCP load balancers.")
return false
}

zones, ok := nc.cloud.Zones()
if !ok {
glog.Error("The cloud provider does not support zone enumeration.")
return false
}
zone, err := zones.GetZone()
if err != nil {
glog.Errorf("Error while getting zone: %v", err)
return false
}

hosts := []string{}
for _, node := range nodes.Items {
hosts = append(hosts, node.Name)
}

services, err := nc.kubeClient.Services(api.NamespaceAll).List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing services: %v", err)
return true
}
shouldRetry = false
for _, service := range services.Items {
if service.Spec.CreateExternalLoadBalancer {
nonTCPPort := false
for i := range service.Spec.Ports {
if service.Spec.Ports[i].Protocol != api.ProtocolTCP {
nonTCPPort = true
break
}
}
if nonTCPPort {
// TODO: Support UDP here.
glog.Errorf("External load balancers for non TCP services are not currently supported: %v.", service)
continue
}
name := cloudprovider.GetLoadBalancerName(nc.clusterName, service.Namespace, service.Name)
err := balancer.UpdateTCPLoadBalancer(name, zone.Region, hosts)
if err != nil {
glog.Errorf("External error while updating TCP load balancer: %v.", err)
shouldRetry = true
}
}
}
return shouldRetry
}

// syncCloudNodes synchronizes the list of instances from cloudprovider to master server.
func (nc *NodeController) syncCloudNodes() error {
matches, err := nc.getCloudNodesWithSpec()
if err != nil {
return err
}
nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
if err != nil {
return err
}
nodeMap := make(map[string]*api.Node)
for i := range nodes.Items {
node := nodes.Items[i]
nodeMap[node.Name] = &node
}

// Create nodes which have been created in cloud, but not in kubernetes cluster
// Skip nodes if we hit an error while trying to get their addresses.
nodesChanged := false
for _, node := range matches.Items {
if _, ok := nodeMap[node.Name]; !ok {
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
nodeList := &api.NodeList{}
nodeList.Items = []api.Node{node}
_, err = nc.populateAddresses(nodeList)
if err != nil {
glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err)
continue
}
node.Status.Addresses = nodeList.Items[0].Status.Addresses

glog.Infof("Create node in registry: %s", node.Name)
_, err = nc.kubeClient.Nodes().Create(&node)
if err != nil {
glog.Errorf("Create node %s error: %v", node.Name, err)
}
nodesChanged = true
}
delete(nodeMap, node.Name)
}

// Delete nodes which have been deleted from cloud, but not from kubernetes cluster.
for nodeID := range nodeMap {
glog.Infof("Delete node from registry: %s", nodeID)
err = nc.kubeClient.Nodes().Delete(nodeID)
if err != nil {
glog.Errorf("Delete node %s error: %v", nodeID, err)
}
nc.deletePods(nodeID)
nodesChanged = true
}

// Make external services aware of nodes currently present in the cluster.
if nodesChanged || nc.reconcileServices {
nc.reconcileServices = nc.reconcileExternalServices(matches)
if nc.reconcileServices {
glog.Error("Reconcilation of external services failed and will be retried during the next sync.")
}
}

return nil
}

// populateAddresses queries Address for given list of nodes.
func (nc *NodeController) populateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
if nc.isRunningCloudProvider() {
instances, ok := nc.cloud.Instances()
if !ok {
return nodes, ErrCloudInstance
}
for i := range nodes.Items {
node := &nodes.Items[i]
nodeAddresses, err := instances.NodeAddresses(node.Name)
if err != nil {
glog.Errorf("error getting instance addresses for %s: %v", node.Name, err)
} else {
node.Status.Addresses = nodeAddresses
}
}
} else {
for i := range nodes.Items {
node := &nodes.Items[i]
addr := net.ParseIP(node.Name)
if addr != nil {
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()}
node.Status.Addresses = []api.NodeAddress{address}
} else {
addrs, err := nc.lookupIP(node.Name)
if err != nil {
glog.Errorf("Can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
glog.Errorf("No ip address for node %v", node.Name)
} else {
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()}
node.Status.Addresses = []api.NodeAddress{address}
}
}
}
}
return nodes, nil
}

func (nc *NodeController) recordNodeEvent(node *api.Node, event string) {
ref := &api.ObjectReference{
Kind: "Node",
Expand Down Expand Up @@ -591,63 +356,6 @@ func (nc *NodeController) monitorNodeStatus() error {
return nil
}

// getStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
// constructs spec fields for nodes.
func (nc *NodeController) getStaticNodesWithSpec() (*api.NodeList, error) {
result := &api.NodeList{}
for _, nodeID := range nc.nodes {
node := api.Node{
ObjectMeta: api.ObjectMeta{Name: nodeID},
Spec: api.NodeSpec{
ExternalID: nodeID,
},
Status: api.NodeStatus{
Capacity: nc.staticResources.Capacity,
},
}
result.Items = append(result.Items, node)
}
return result, nil
}

// getCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
// constructs spec fields for nodes.
func (nc *NodeController) getCloudNodesWithSpec() (*api.NodeList, error) {
result := &api.NodeList{}
instances, ok := nc.cloud.Instances()
if !ok {
return result, ErrCloudInstance
}
matches, err := instances.List(nc.matchRE)
if err != nil {
return result, err
}
for i := range matches {
node := api.Node{}
node.Name = matches[i]
resources, err := instances.GetNodeResources(matches[i])
if err != nil {
return nil, err
}
if resources == nil {
resources = nc.staticResources
}
if resources != nil {
node.Status.Capacity = resources.Capacity
}
instanceID, err := instances.ExternalID(node.Name)
if err != nil {
glog.Errorf("Error getting instance id for %s: %v", node.Name, err)
} else {
node.Spec.ExternalID = instanceID
}
result.Items = append(result.Items, node)
}
return result, nil
}

// deletePods will delete all pods from master running on given node.
func (nc *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
Expand All @@ -669,19 +377,6 @@ func (nc *NodeController) deletePods(nodeID string) error {
return nil
}

// isRunningCloudProvider checks if cluster is running with cloud provider.
func (nc *NodeController) isRunningCloudProvider() bool {
return nc.cloud != nil && len(nc.matchRE) > 0
}

// canonicalizeName takes a node list and lowercases all nodes' name.
func (nc *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList {
for i := range nodes.Items {
nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name)
}
return nodes
}

// getCondition returns a condition object for the specific condition
// type, nil if the condition is not set.
func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition {
Expand Down
Loading

0 comments on commit fcae414

Please sign in to comment.