-
Notifications
You must be signed in to change notification settings - Fork 40.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
federation: Updating KubeDNS to try finding a local service first for federation query #27708
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,8 +92,15 @@ type KubeDNS struct { | |
// A Records and SRV Records for (regular) services and headless Services. | ||
cache *TreeCache | ||
|
||
// TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap. | ||
reverseRecordMap map[string]*skymsg.Service | ||
|
||
// Map of cluster IP to service object. Headless services are not part of this map. | ||
// Used to get a service when given its cluster IP. | ||
// Access to this is coordinated using cacheLock. We use the same lock for cache and this map | ||
// to ensure that they dont get out of sync. | ||
clusterIPServiceMap map[string]*kapi.Service | ||
|
||
// caller is responsible for using the cacheLock before invoking methods on cache | ||
// the cache is not thread-safe, and the caller can guarantee thread safety by using | ||
// the cacheLock | ||
|
@@ -119,20 +126,28 @@ type KubeDNS struct { | |
nodesStore kcache.Store | ||
} | ||
|
||
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) *KubeDNS { | ||
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) (*KubeDNS, error) { | ||
// Verify that federation names should not contain dots ('.') | ||
// We can not allow dots since we use that as separator for path segments (svcname.nsname.fedname.svc.domain) | ||
for key := range federations { | ||
if strings.ContainsAny(key, ".") { | ||
return nil, fmt.Errorf("invalid federation name: %s, cannot have '.'", key) | ||
} | ||
} | ||
kd := &KubeDNS{ | ||
kubeClient: client, | ||
domain: domain, | ||
cache: NewTreeCache(), | ||
cacheLock: sync.RWMutex{}, | ||
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc), | ||
reverseRecordMap: make(map[string]*skymsg.Service), | ||
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), | ||
federations: federations, | ||
kubeClient: client, | ||
domain: domain, | ||
cache: NewTreeCache(), | ||
cacheLock: sync.RWMutex{}, | ||
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc), | ||
reverseRecordMap: make(map[string]*skymsg.Service), | ||
clusterIPServiceMap: make(map[string]*kapi.Service), | ||
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), | ||
federations: federations, | ||
} | ||
kd.setEndpointsStore() | ||
kd.setServicesStore() | ||
return kd | ||
return kd, nil | ||
} | ||
|
||
func (kd *KubeDNS) Start() { | ||
|
@@ -245,6 +260,7 @@ func (kd *KubeDNS) removeService(obj interface{}) { | |
defer kd.cacheLock.Unlock() | ||
kd.cache.deletePath(subCachePath...) | ||
delete(kd.reverseRecordMap, s.Spec.ClusterIP) | ||
delete(kd.clusterIPServiceMap, s.Spec.ClusterIP) | ||
} | ||
} | ||
|
||
|
@@ -319,6 +335,7 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) { | |
defer kd.cacheLock.Unlock() | ||
kd.cache.setSubCache(service.Name, subCache, subCachePath...) | ||
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord | ||
kd.clusterIPServiceMap[service.Spec.ClusterIP] = service | ||
} | ||
|
||
func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error { | ||
|
@@ -422,7 +439,74 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er | |
glog.Infof("Received DNS Request:%s, exact:%v", name, exact) | ||
trimmed := strings.TrimRight(name, ".") | ||
segments := strings.Split(trimmed, ".") | ||
isFederationQuery := false | ||
federationSegments := []string{} | ||
if !exact && kd.isFederationQuery(segments) { | ||
glog.Infof("federation service query: Received federation query. Going to try to find local service first") | ||
// Try quering the non-federation (local) service first. | ||
// Will try the federation one later, if this fails. | ||
isFederationQuery = true | ||
federationSegments = append(federationSegments, segments...) | ||
// To try local service, remove federation name from segments. | ||
// Federation name is 3rd in the segment (after service name and namespace). | ||
segments = append(segments[:2], segments[3:]...) | ||
} | ||
path := reverseArray(segments) | ||
records, err := kd.getRecordsForPath(path, exact) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if !isFederationQuery { | ||
if len(records) > 0 { | ||
return records, nil | ||
} | ||
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} | ||
} | ||
|
||
// For federation query, verify that the local service has endpoints. | ||
validRecord := false | ||
for _, val := range records { | ||
// We know that a headless service has endpoints for sure if a record was returned for it. | ||
// The record contains endpoint IPs. So nothing to check for headless services. | ||
if !kd.isHeadlessServiceRecord(&val) { | ||
ok, err := kd.serviceWithClusterIPHasEndpoints(&val) | ||
if err != nil { | ||
glog.Infof("federation service query: unexpected error while trying to find if service has endpoint: %v") | ||
continue | ||
} | ||
if !ok { | ||
glog.Infof("federation service query: skipping record since service has no endpoint: %v", val) | ||
continue | ||
} | ||
} | ||
validRecord = true | ||
break | ||
} | ||
if validRecord { | ||
// There is a local service with valid endpoints, return its CNAME. | ||
name := strings.Join(reverseArray(path), ".") | ||
// Ensure that this name that we are returning as a CNAME response is a fully qualified | ||
// domain name so that the client's resolver library doesn't have to go through its | ||
// search list all over again. | ||
if !strings.HasSuffix(name, ".") { | ||
name = name + "." | ||
} | ||
glog.Infof("federation service query: Returning CNAME for local service : %s", name) | ||
return []skymsg.Service{{Host: name}}, nil | ||
} | ||
|
||
// If the name query is not an exact query and does not match any records in the local store, | ||
// attempt to send a federation redirect (CNAME) response. | ||
if !exact { | ||
glog.Infof("federation service query: Did not find a local service. Trying federation redirect (CNAME) response") | ||
return kd.federationRecords(reverseArray(federationSegments)) | ||
} | ||
|
||
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} | ||
} | ||
|
||
func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) { | ||
retval := []skymsg.Service{} | ||
if kd.isPodRecord(path) { | ||
ip, err := kd.getPodIP(path) | ||
if err == nil { | ||
|
@@ -448,21 +532,50 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er | |
kd.cacheLock.RLock() | ||
defer kd.cacheLock.RUnlock() | ||
records := kd.cache.getValuesForPathWithWildcards(path...) | ||
glog.V(2).Infof("Received %d records from cache", len(records)) | ||
for _, val := range records { | ||
retval = append(retval, *val) | ||
} | ||
glog.Infof("records:%v, retval:%v, path:%v", records, retval, path) | ||
if len(retval) > 0 { | ||
return retval, nil | ||
return retval, nil | ||
} | ||
|
||
// Returns true if the given record corresponds to a headless service. | ||
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it. | ||
// This is because the code will panic, if we try to acquire it again if we already have it. | ||
func (kd *KubeDNS) isHeadlessServiceRecord(msg *skymsg.Service) bool { | ||
// If it is not a headless service, then msg.Host will be the cluster IP. | ||
// So we can check if msg.host exists in our clusterIPServiceMap. | ||
_, ok := kd.clusterIPServiceMap[msg.Host] | ||
// It is headless service if no record was found. | ||
return !ok | ||
} | ||
|
||
// Returns true if the service corresponding to the given message has endpoints. | ||
// Note: Works only for services with ClusterIP. Will return an error for headless service (service without a clusterIP). | ||
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it. | ||
// This is because the code will panic, if we try to acquire it again if we already have it. | ||
func (kd *KubeDNS) serviceWithClusterIPHasEndpoints(msg *skymsg.Service) (bool, error) { | ||
svc, ok := kd.clusterIPServiceMap[msg.Host] | ||
if !ok { | ||
// It is a headless service. | ||
return false, fmt.Errorf("method not expected to be called for headless service") | ||
} | ||
|
||
// If the name query is not an exact query and does not match any records in the local store, | ||
// attempt to send a federation redirect (CNAME) response. | ||
if !exact { | ||
return kd.federationRecords(path) | ||
key, err := kcache.MetaNamespaceKeyFunc(svc) | ||
if err != nil { | ||
return false, err | ||
} | ||
|
||
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} | ||
e, exists, err := kd.endpointsStore.GetByKey(key) | ||
if err != nil { | ||
return false, fmt.Errorf("failed to get endpoints object from endpoints store - %v", err) | ||
} | ||
if !exists { | ||
return false, nil | ||
} | ||
if e, ok := e.(*kapi.Endpoints); ok { | ||
return len(e.Subsets) > 0, nil | ||
} | ||
return false, fmt.Errorf("unexpected: found non-endpoint object in endpoint store: %v", e) | ||
} | ||
|
||
// ReverseRecords performs a reverse lookup for the given name. | ||
|
@@ -558,6 +671,9 @@ func getSkyMsg(ip string, port int) (*skymsg.Service, string) { | |
// 5. Fourth segment is exactly "svc" | ||
// 6. The remaining segments match kd.domainPath. | ||
// 7. And federation must be one of the listed federations in the config. | ||
// Note: Because of the above conditions, this method will treat wildcard queries such as | ||
// *.mysvc.myns.myfederation.svc.domain.path as non-federation queries. | ||
// We can add support for wildcard queries later, if needed. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I would like us to support wildcard queries for federated services, as this will be an easy way for clients to discover all shards of a service globally, with a single DNS query. Pretty cool. Can wait until post-1.3. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @quinton-hoole How are the clients expected to format the wildcard-names? Specifically, where do you expect them to put the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, what do you expect the KubeDNS response to be? |
||
func (kd *KubeDNS) isFederationQuery(path []string) bool { | ||
if len(path) == 4+len(kd.domainPath) && | ||
len(validation.IsDNS952Label(path[0])) == 0 && | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to a lower log level ? Seems chatty with no specific information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the information this gives is that this query was interpreted as a federation query.
I would like to keep it for debugging.
Note that we wont see this in the logs unless someone has federation enabled and is sending federation queries.