Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

federation: fix dns provider initialization issues #27252

Merged
merged 5 commits into from
Jun 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions federation/cluster/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ function create-federation-api-objects {
export FEDERATION_API_NODEPORT=32111
export FEDERATION_NAMESPACE
export FEDERATION_NAME="${FEDERATION_NAME:-federation}"
export DNS_ZONE_NAME="${DNS_ZONE_NAME:-example.com}"

template="go run ${KUBE_ROOT}/federation/cluster/template.go"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
servicecontroller := servicecontroller.New(scClientset, dns)
servicecontroller := servicecontroller.New(scClientset, dns, s.FederationName, s.ZoneName)
if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type ControllerManagerConfiguration struct {
Port int `json:"port"`
// address is the IP address to serve on (set to 0.0.0.0 for all interfaces).
Address string `json:"address"`
// federation name.
FederationName string `json:"federationName"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I choose any name? Does it have to match a DNS config parameter?
Or any other parameter anywhere else?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhiljindal Yes, any name, provided that all federations of which a cluster is a member have unique names. The federation name does not need to match any DNS configuration parameters, or anything else (that I can think of).

// zone name, like example.com.
ZoneName string `json:"zoneName"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which zone is this? The zone in which federation control plane is running?
k8s clusters could be in multiple zones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: at other places we just use Zone (or Zones) rather than ZoneName

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhil this is the DNS zone name of the federation (see comment directly above). We could perhaps prefix it with "dns" to make that clearer. Zone or Zones usually refer to structs or interfaces, not plain names. Make sense?

// dnsProvider is the provider for dns services.
DnsProvider string `json:"dnsProvider"`
// dnsConfigFile is the path to the dns provider configuration file.
Expand Down Expand Up @@ -90,6 +94,8 @@ func NewCMServer() *CMServer {
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
fs.StringVar(&s.FederationName, "federation-name", s.FederationName, "Federation name.")
fs.StringVar(&s.ZoneName, "zone-name", s.ZoneName, "Zone name, like example.com.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think that we should call this "DNSZoneName", "dns-zone-name" etc, to disambiguate it from an availability zone name. Comment/doc field should similarly include "DNS" to make it clearer.

fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ spec:
- --master=https://{{.FEDERATION_APISERVER_DEPLOYMENT_NAME}}:443
- --dns-provider={{.FEDERATION_DNS_PROVIDER}}
- --dns-provider-config={{.FEDERATION_DNS_PROVIDER_CONFIG}}
- --federation-name={{.FEDERATION_NAME}}
- --zone-name={{.DNS_ZONE_NAME}}
ports:
- containerPort: 443
name: https
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (rrsets ResourceRecordSets) List() ([]dnsprovider.ResourceRecordSet, error)
}
list := make([]dnsprovider.ResourceRecordSet, len(response.Rrsets()))
for i, rrset := range response.Rrsets() {
list[i] = &ResourceRecordSet{rrset, &rrsets}
list[i] = ResourceRecordSet{rrset, &rrsets}
}
return list, nil
}
Expand Down
73 changes: 43 additions & 30 deletions federation/pkg/federation-controller/service/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ func (s *ServiceController) getClusterZoneNames(clusterName string) (zones []str

// getFederationDNSZoneName returns the name of the managed DNS Zone configured for this federation
func (s *ServiceController) getFederationDNSZoneName() (string, error) {
return "example.com", nil // TODO: quinton: Get this from the federation configuration.
// Note: For unit testing this must match the domain populated in the test/stub dnsprovider.
return s.zoneName, nil
}

// getDnsZone is a hack around the fact that dnsprovider does not yet support a Get() method, only a List() method. TODO: Fix that.
Expand Down Expand Up @@ -147,7 +146,7 @@ func getResolvedEndpoints(endpoints []string) ([]string, error) {
/* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
if endpoints is nil or empty, a CNAME record to uplevelCname is ensured.
*/
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string) error {
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string, endpointReachable bool) error {
dnsZone, err := getDnsZone(dnsZoneName, s.dnsZones)
if err != nil {
return err
Expand All @@ -161,37 +160,44 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
return err
}
if rrset == nil {
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
if len(endpoints) < 1 {
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
if uplevelCname != "" {
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
if endpointReachable {
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
if len(endpoints) < 1 {
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
if uplevelCname != "" {
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
// else we want no record, and we have no record, so we're all good.
} else {
// We have valid endpoint addresses, so just add them as A records.
// But first resolve DNS names, as some cloud providers (like AWS) expose
// load balancers behind DNS names, not IP addresses.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil {
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
// else we want no record, and we have no record, so we're all good.
} else {
// We have valid endpoint addresses, so just add them as A records.
// But first resolve DNS names, as some cloud providers (like AWS) expose
// load balancers behind DNS names, not IP addresses.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil {
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
} else {
// the rrset already exists, so make it right.
if len(endpoints) < 1 {
// Need an appropriate CNAME record. Check that we have it.
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
if rrset == newRrset {
if !endpointReachable {
if err = rrsets.Remove(rrset); err != nil {
return err
}
}
// The existing rrset is equal to the required one - our work is done here
return nil
} else {
Expand All @@ -200,7 +206,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
if err = rrsets.Remove(rrset); err != nil {
return err
}
if uplevelCname != "" {
if uplevelCname != "" && endpointReachable {
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
Expand All @@ -215,6 +221,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
if rrset == newRrset {
if !endpointReachable {
if err = rrsets.Remove(rrset); err != nil {
return err
}
}
// The existing rrset is equal to the required one - our work is done here
// TODO: We could be more thorough about checking for equivalence to avoid unnecessary updates, but in the
// worst case we'll just replace what's there with an equivalent, if not exactly identical record set.
Expand All @@ -225,8 +236,10 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
if err = rrsets.Remove(rrset); err != nil {
return err
}
if _, err = rrsets.Add(newRrset); err != nil {
return err
if endpointReachable {
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -254,12 +267,12 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
// the state of the service when we last successfully sync'd it's DNS records.
// So this time around we only need to patch that (add new records, remove deleted records, and update changed records.
//
if s.dns == nil {
return nil
}
if s == nil {
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
}
if s.dns == nil {
return nil
}
if cachedService == nil {
return fmt.Errorf("nil cachedService passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
}
Expand All @@ -280,7 +293,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
if err != nil {
return err
}

_, endpointReachable := cachedService.endpointMap[clusterName]
commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
// dnsNames is the path up the DNS search tree, starting at the leaf
dnsNames := []string{
Expand All @@ -293,7 +306,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}

for i, endpoint := range endpoints {
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1], endpointReachable); err != nil {
return err
}
}
Expand Down
65 changes: 46 additions & 19 deletions federation/pkg/federation-controller/service/endpoint_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
}

func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
var err error
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
Expand All @@ -102,10 +102,10 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
// need to query dns info from dnsprovider and make sure of if deletion is needed
if ok {
// endpoints lost, clean dns record
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil {
delete(cachedService.endpointMap, clusterName)
return nil
}
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
Expand All @@ -120,27 +120,54 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.rwlock.Lock()
var reachable bool
defer cachedService.rwlock.Unlock()
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
cachedService.endpointMap[clusterName] = 1
}
}
_, ok := cachedService.endpointMap[clusterName]
if !ok {
// first time get endpoints, update dns record
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
reachable = true
break
}
}
if reachable {
// first time get endpoints, update dns record
glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
}
}
return err
}
}
} else {
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
reachable = true
break
}
}
if !reachable {
// first time get endpoints, update dns record
glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
}
}
return err
}
return err
}
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported
var fakeDnsZones, _ = fakeDns.Zones()

var fakeServiceController = ServiceController{
dns: fakeDns,
dnsZones: fakeDnsZones,
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
dns: fakeDns,
dnsZones: fakeDnsZones,
federationName: "fed1",
zoneName: "example.com",
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
clientMap: make(map[string]*clusterCache),
},
Expand All @@ -52,8 +54,18 @@ func buildEndpoint(subsets [][]string) *v1.Endpoints {
}

func TestProcessEndpointUpdate(t *testing.T) {
clusterName := "foo"
cc := clusterClientCache{
clientMap: make(map[string]*clusterCache),
clientMap: map[string]*clusterCache{
clusterName: {
cluster: &v1alpha1.Cluster{
Status: v1alpha1.ClusterStatus{
Zones: []string{"foozone"},
Region: "fooregion",
},
},
},
},
}
tests := []struct {
name string
Expand All @@ -69,7 +81,7 @@ func TestProcessEndpointUpdate(t *testing.T) {
endpointMap: make(map[string]int),
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
1,
},
{
Expand All @@ -81,11 +93,11 @@ func TestProcessEndpointUpdate(t *testing.T) {
},
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
1,
},
}

fakeServiceController.clusterCache = &cc
for _, test := range tests {
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController)
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
Expand Down
Loading