Skip to content

Commit

Permalink
1. ensure dns record when ingress ip is assigned after ready address …
Browse files Browse the repository at this point in the history
…creation

2. ensure dns record removal when service being removed
  • Loading branch information
mfanjie committed Jun 14, 2016
1 parent 72a0806 commit dd78dd8
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (rrsets ResourceRecordSets) List() ([]dnsprovider.ResourceRecordSet, error)
}
list := make([]dnsprovider.ResourceRecordSet, len(response.Rrsets()))
for i, rrset := range response.Rrsets() {
list[i] = &ResourceRecordSet{rrset, &rrsets}
list[i] = ResourceRecordSet{rrset, &rrsets}
}
return list, nil
}
Expand All @@ -58,7 +58,7 @@ func (rrsets ResourceRecordSets) Add(rrset dnsprovider.ResourceRecordSet) (dnspr

func (rrsets ResourceRecordSets) Remove(rrset dnsprovider.ResourceRecordSet) error {
service := rrsets.zone.zones.interface_.service.Changes()
deletions := []interfaces.ResourceRecordSet{rrset.(*ResourceRecordSet).impl}
deletions := []interfaces.ResourceRecordSet{rrset.(ResourceRecordSet).impl}
change := service.NewChange([]interfaces.ResourceRecordSet{}, deletions)
newChange, err := service.Create(rrsets.project(), rrsets.zone.impl.Name(), change).Do()
if err != nil {
Expand Down
64 changes: 39 additions & 25 deletions federation/pkg/federation-controller/service/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func getResolvedEndpoints(endpoints []string) ([]string, error) {
/* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
if endpoints is nil or empty, a CNAME record to uplevelCname is ensured.
*/
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string) error {
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string, endpointReachable bool) error {
dnsZone, err := getDnsZone(dnsZoneName, s.dnsZones)
if err != nil {
return err
Expand All @@ -160,37 +160,44 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
return err
}
if rrset == nil {
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
if len(endpoints) < 1 {
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
if uplevelCname != "" {
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
if endpointReachable {
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
if len(endpoints) < 1 {
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
if uplevelCname != "" {
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
// else we want no record, and we have no record, so we're all good.
} else {
// We have valid endpoint addresses, so just add them as A records.
// But first resolve DNS names, as some cloud providers (like AWS) expose
// load balancers behind DNS names, not IP addresses.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil {
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
// else we want no record, and we have no record, so we're all good.
} else {
// We have valid endpoint addresses, so just add them as A records.
// But first resolve DNS names, as some cloud providers (like AWS) expose
// load balancers behind DNS names, not IP addresses.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil {
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
} else {
// the rrset already exists, so make it right.
if len(endpoints) < 1 {
// Need an appropriate CNAME record. Check that we have it.
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
if rrset == newRrset {
if !endpointReachable {
if err = rrsets.Remove(rrset); err != nil {
return err
}
}
// The existing rrset is equal to the required one - our work is done here
return nil
} else {
Expand All @@ -199,7 +206,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
if err = rrsets.Remove(rrset); err != nil {
return err
}
if uplevelCname != "" {
if uplevelCname != "" && endpointReachable {
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
Expand All @@ -214,6 +221,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
if rrset == newRrset {
if !endpointReachable {
if err = rrsets.Remove(rrset); err != nil {
return err
}
}
// The existing rrset is equal to the required one - our work is done here
// TODO: We could be more thorough about checking for equivalence to avoid unnecessary updates, but in the
// worst case we'll just replace what's there with an equivalent, if not exactly identical record set.
Expand All @@ -224,8 +236,10 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
if err = rrsets.Remove(rrset); err != nil {
return err
}
if _, err = rrsets.Add(newRrset); err != nil {
return err
if endpointReachable {
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -279,7 +293,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
if err != nil {
return err
}

_, endpointReachable := cachedService.endpointMap[clusterName]
commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
// dnsNames is the path up the DNS search tree, starting at the leaf
dnsNames := []string{
Expand All @@ -292,7 +306,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}

for i, endpoint := range endpoints {
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1], endpointReachable); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
}

func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
var err error
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
Expand All @@ -102,10 +102,10 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
// need to query dns info from dnsprovider and make sure of if deletion is needed
if ok {
// endpoints lost, clean dns record
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil {
delete(cachedService.endpointMap, clusterName)
return nil
}
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported
var fakeDnsZones, _ = fakeDns.Zones()

var fakeServiceController = ServiceController{
dns: fakeDns,
dnsZones: fakeDnsZones,
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
dns: fakeDns,
dnsZones: fakeDnsZones,
federationName: "fed1",
zoneName: "example.com",
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
clientMap: make(map[string]*clusterCache),
},
Expand All @@ -52,8 +54,18 @@ func buildEndpoint(subsets [][]string) *v1.Endpoints {
}

func TestProcessEndpointUpdate(t *testing.T) {
clusterName := "foo"
cc := clusterClientCache{
clientMap: make(map[string]*clusterCache),
clientMap: map[string]*clusterCache{
clusterName: {
cluster: &v1alpha1.Cluster{
Status: v1alpha1.ClusterStatus{
Zones: []string{"foozone"},
Region: "fooregion",
},
},
},
},
}
tests := []struct {
name string
Expand All @@ -69,7 +81,7 @@ func TestProcessEndpointUpdate(t *testing.T) {
endpointMap: make(map[string]int),
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
1,
},
{
Expand All @@ -81,11 +93,11 @@ func TestProcessEndpointUpdate(t *testing.T) {
},
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
1,
},
}

fakeServiceController.clusterCache = &cc
for _, test := range tests {
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController)
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
Expand Down
21 changes: 18 additions & 3 deletions federation/pkg/federation-controller/service/service_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (sc *ServiceController) clusterServiceWorker() {
if quit {
return
}
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient)
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
if err != nil {
glog.Errorf("Failed to sync service: %+v", err)
}
Expand All @@ -55,7 +55,7 @@ func (sc *ServiceController) clusterServiceWorker() {
}

// Whenever there is change on service, the federation service should be updated
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface) error {
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface, sc *ServiceController) error {
// obj holds the latest service info from apiserver, return if there is no federation cache for the service
cachedService, ok := serviceCache.get(key)
if !ok {
Expand Down Expand Up @@ -88,6 +88,15 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
}

if needUpdate {
for i := 0; i < clientRetryCount; i++ {
if err := sc.ensureDnsRecords(clusterName, cachedService); err == nil {
break
}
glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, clusterName, err)
time.Sleep(cachedService.nextDNSUpdateDelay())
clusterCache.serviceQueue.Add(key)
// did not retry here as we still want to persist federation apiserver even ensure dns records fails
}
err := cc.persistFedServiceUpdate(cachedService, fedClient)
if err == nil {
cachedService.appliedState = cachedService.lastState
Expand Down Expand Up @@ -219,7 +228,13 @@ func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedServi
glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name)
var err error
for i := 0; i < clientRetryCount; i++ {
_, err := fedClient.Core().Services(service.Namespace).UpdateStatus(service)
_, err := fedClient.Core().Services(service.Namespace).Get(service.Name)
if errors.IsNotFound(err) {
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
service.Namespace, service.Name, err)
return nil
}
_, err = fedClient.Core().Services(service.Namespace).UpdateStatus(service)
if err == nil {
glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func (s *ServiceController) deleteClusterService(clusterName string, cachedServi
err = clientset.Core().Services(service.Namespace).Delete(service.Name, &api.DeleteOptions{})
if err == nil || errors.IsNotFound(err) {
glog.V(4).Infof("Service %s/%s deleted from cluster %s", service.Namespace, service.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
return nil
}
time.Sleep(cachedService.nextRetryDelay())
Expand Down

0 comments on commit dd78dd8

Please sign in to comment.