Skip to content

Commit

Permalink
Merge pull request #27252 from mfanjie/fix-ensuredns
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

federation: fix dns provider initialization issues

This PR is based on the integration test with Google DNS API. This is the first time of full integration test.
So multiple issues was found and I combined all of them in this single PR

1. add dns provider initialization and add ensureDns call when removing federation service.
2. add new flags federation-name and zone-name to controller manager, both are used as part of the dns record name
3. fix assertion failure at rrsets.go#L61, which will cause panic
4. change getFederationDNSZoneName to get zoneName from config instead of hard code
5. change logic of ensureDnsRrsets, only add new dns record when endpointReachable(set to true when ready address is catched) is true
6. fix bug in processEndpointUpdate, only call ensuredns when ready address is caught
7. change behavior of syncService, there is cases that endpoint is created before ingress IP assignment, so before there is defect for this case, ensureDns was not called when service being updated, so if Ingress IP is assigned after endpoint ready address is caught, the corresponding A records can not be created
8. add a checking before update federation service

@nikhiljindal , can you help to add 1.3 milestone when @quinton-hoole is on leave?
Thanks.

[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/.github/PULL_REQUEST_TEMPLATE.md?pixel)]()
  • Loading branch information
k8s-merge-robot authored Jun 14, 2016
2 parents 5fbde0a + 8e26283 commit c7e3b2f
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 63 deletions.
1 change: 1 addition & 0 deletions federation/cluster/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ function create-federation-api-objects {
export FEDERATION_API_NODEPORT=32111
export FEDERATION_NAMESPACE
export FEDERATION_NAME="${FEDERATION_NAME:-federation}"
export DNS_ZONE_NAME="${DNS_ZONE_NAME:-example.com}"

template="go run ${KUBE_ROOT}/federation/cluster/template.go"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
servicecontroller := servicecontroller.New(scClientset, dns)
servicecontroller := servicecontroller.New(scClientset, dns, s.FederationName, s.ZoneName)
if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type ControllerManagerConfiguration struct {
Port int `json:"port"`
// address is the IP address to serve on (set to 0.0.0.0 for all interfaces).
Address string `json:"address"`
// federation name.
FederationName string `json:"federationName"`
// zone name, like example.com.
ZoneName string `json:"zoneName"`
// dnsProvider is the provider for dns services.
DnsProvider string `json:"dnsProvider"`
// dnsConfigFile is the path to the dns provider configuration file.
Expand Down Expand Up @@ -90,6 +94,8 @@ func NewCMServer() *CMServer {
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
fs.StringVar(&s.FederationName, "federation-name", s.FederationName, "Federation name.")
fs.StringVar(&s.ZoneName, "zone-name", s.ZoneName, "Zone name, like example.com.")
fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ spec:
- --master=https://{{.FEDERATION_APISERVER_DEPLOYMENT_NAME}}:443
- --dns-provider={{.FEDERATION_DNS_PROVIDER}}
- --dns-provider-config={{.FEDERATION_DNS_PROVIDER_CONFIG}}
- --federation-name={{.FEDERATION_NAME}}
- --zone-name={{.DNS_ZONE_NAME}}
ports:
- containerPort: 443
name: https
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (rrsets ResourceRecordSets) List() ([]dnsprovider.ResourceRecordSet, error)
}
list := make([]dnsprovider.ResourceRecordSet, len(response.Rrsets()))
for i, rrset := range response.Rrsets() {
list[i] = &ResourceRecordSet{rrset, &rrsets}
list[i] = ResourceRecordSet{rrset, &rrsets}
}
return list, nil
}
Expand Down
73 changes: 43 additions & 30 deletions federation/pkg/federation-controller/service/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ func (s *ServiceController) getClusterZoneNames(clusterName string) (zones []str

// getFederationDNSZoneName returns the name of the managed DNS Zone configured for this federation
func (s *ServiceController) getFederationDNSZoneName() (string, error) {
return "example.com", nil // TODO: quinton: Get this from the federation configuration.
// Note: For unit testing this must match the domain populated in the test/stub dnsprovider.
return s.zoneName, nil
}

// getDnsZone is a hack around the fact that dnsprovider does not yet support a Get() method, only a List() method. TODO: Fix that.
Expand Down Expand Up @@ -147,7 +146,7 @@ func getResolvedEndpoints(endpoints []string) ([]string, error) {
/* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
if endpoints is nil or empty, a CNAME record to uplevelCname is ensured.
*/
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string) error {
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string, endpointReachable bool) error {
dnsZone, err := getDnsZone(dnsZoneName, s.dnsZones)
if err != nil {
return err
Expand All @@ -161,37 +160,44 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
return err
}
if rrset == nil {
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
if len(endpoints) < 1 {
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
if uplevelCname != "" {
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
if endpointReachable {
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
if len(endpoints) < 1 {
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
if uplevelCname != "" {
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
// else we want no record, and we have no record, so we're all good.
} else {
// We have valid endpoint addresses, so just add them as A records.
// But first resolve DNS names, as some cloud providers (like AWS) expose
// load balancers behind DNS names, not IP addresses.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil {
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
// else we want no record, and we have no record, so we're all good.
} else {
// We have valid endpoint addresses, so just add them as A records.
// But first resolve DNS names, as some cloud providers (like AWS) expose
// load balancers behind DNS names, not IP addresses.
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
if err != nil {
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
rrset, err = rrsets.Add(newRrset)
if err != nil {
return err
}
}
} else {
// the rrset already exists, so make it right.
if len(endpoints) < 1 {
// Need an appropriate CNAME record. Check that we have it.
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
if rrset == newRrset {
if !endpointReachable {
if err = rrsets.Remove(rrset); err != nil {
return err
}
}
// The existing rrset is equal to the required one - our work is done here
return nil
} else {
Expand All @@ -200,7 +206,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
if err = rrsets.Remove(rrset); err != nil {
return err
}
if uplevelCname != "" {
if uplevelCname != "" && endpointReachable {
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
Expand All @@ -215,6 +221,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
if rrset == newRrset {
if !endpointReachable {
if err = rrsets.Remove(rrset); err != nil {
return err
}
}
// The existing rrset is equal to the required one - our work is done here
// TODO: We could be more thorough about checking for equivalence to avoid unnecessary updates, but in the
// worst case we'll just replace what's there with an equivalent, if not exactly identical record set.
Expand All @@ -225,8 +236,10 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
if err = rrsets.Remove(rrset); err != nil {
return err
}
if _, err = rrsets.Add(newRrset); err != nil {
return err
if endpointReachable {
if _, err = rrsets.Add(newRrset); err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -254,12 +267,12 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
// the state of the service when we last successfully sync'd it's DNS records.
// So this time around we only need to patch that (add new records, remove deleted records, and update changed records.
//
if s.dns == nil {
return nil
}
if s == nil {
return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
}
if s.dns == nil {
return nil
}
if cachedService == nil {
return fmt.Errorf("nil cachedService passed to ServiceController.ensureDnsRecords(clusterName: %s, cachedService: %v)", clusterName, cachedService)
}
Expand All @@ -280,7 +293,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
if err != nil {
return err
}

_, endpointReachable := cachedService.endpointMap[clusterName]
commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
// dnsNames is the path up the DNS search tree, starting at the leaf
dnsNames := []string{
Expand All @@ -293,7 +306,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}

for i, endpoint := range endpoints {
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1], endpointReachable); err != nil {
return err
}
}
Expand Down
65 changes: 46 additions & 19 deletions federation/pkg/federation-controller/service/endpoint_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
}

func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
var err error
cachedService.rwlock.Lock()
defer cachedService.rwlock.Unlock()
Expand All @@ -102,10 +102,10 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
// need to query dns info from dnsprovider and make sure of if deletion is needed
if ok {
// endpoints lost, clean dns record
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil {
delete(cachedService.endpointMap, clusterName)
return nil
}
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
Expand All @@ -120,27 +120,54 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error {
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.rwlock.Lock()
var reachable bool
defer cachedService.rwlock.Unlock()
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
cachedService.endpointMap[clusterName] = 1
}
}
_, ok := cachedService.endpointMap[clusterName]
if !ok {
// first time get endpoints, update dns record
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
reachable = true
break
}
}
if reachable {
// first time get endpoints, update dns record
glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
cachedService.endpointMap[clusterName] = 1
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
}
}
return err
}
}
} else {
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) > 0 {
reachable = true
break
}
}
if !reachable {
// first time get endpoints, update dns record
glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err != nil {
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
for i := 0; i < clientRetryCount; i++ {
time.Sleep(cachedService.nextDNSUpdateDelay())
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
}
}
return err
}
return err
}
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported
var fakeDnsZones, _ = fakeDns.Zones()

var fakeServiceController = ServiceController{
dns: fakeDns,
dnsZones: fakeDnsZones,
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
dns: fakeDns,
dnsZones: fakeDnsZones,
federationName: "fed1",
zoneName: "example.com",
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
clusterCache: &clusterClientCache{
clientMap: make(map[string]*clusterCache),
},
Expand All @@ -52,8 +54,18 @@ func buildEndpoint(subsets [][]string) *v1.Endpoints {
}

func TestProcessEndpointUpdate(t *testing.T) {
clusterName := "foo"
cc := clusterClientCache{
clientMap: make(map[string]*clusterCache),
clientMap: map[string]*clusterCache{
clusterName: {
cluster: &v1alpha1.Cluster{
Status: v1alpha1.ClusterStatus{
Zones: []string{"foozone"},
Region: "fooregion",
},
},
},
},
}
tests := []struct {
name string
Expand All @@ -69,7 +81,7 @@ func TestProcessEndpointUpdate(t *testing.T) {
endpointMap: make(map[string]int),
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
1,
},
{
Expand All @@ -81,11 +93,11 @@ func TestProcessEndpointUpdate(t *testing.T) {
},
},
buildEndpoint([][]string{{"ip1", ""}}),
"foo",
clusterName,
1,
},
}

fakeServiceController.clusterCache = &cc
for _, test := range tests {
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController)
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
Expand Down
Loading

0 comments on commit c7e3b2f

Please sign in to comment.