Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

federation: Updating KubeDNS to try finding a local service first for federation query #27708

Merged
merged 2 commits into from
Jun 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build/kube-dns/Changelog
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@

## Version 1.4 (Tue June 21 2016 Nikhil Jindal <nikhiljindal@google.com>)
- Initialising nodesStore (issue #27820)

## Version 1.5 (Thu June 23 2016 Nikhil Jindal <nikhiljindal@google.com>)
- Adding support to return local service (pr #27708)
4 changes: 2 additions & 2 deletions build/kube-dns/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
# If you update this image please bump the tag value before pushing.
#
# Usage:
# [ARCH=amd64] [TAG=1.4] [REGISTRY=gcr.io/google_containers] [BASEIMAGE=busybox] make (container|push)
# [ARCH=amd64] [TAG=1.5] [REGISTRY=gcr.io/google_containers] [BASEIMAGE=busybox] make (container|push)

# Default registry, arch and tag. This can be overwritten by arguments to make
PLATFORM?=linux
ARCH?=amd64
TAG?=1.4
TAG?=1.5
REGISTRY?=gcr.io/google_containers

GOLANG_VERSION=1.6
Expand Down
10 changes: 5 additions & 5 deletions cluster/gce/coreos/kube-manifests/addons/dns/skydns-rc.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
apiVersion: v1
kind: ReplicationController
metadata:
name: kube-dns-v14
name: kube-dns-v15
namespace: kube-system
labels:
k8s-app: kube-dns
version: v14
version: v15
kubernetes.io/cluster-service: "true"
spec:
replicas: ${DNS_REPLICAS}
selector:
k8s-app: kube-dns
version: v14
version: v15
template:
metadata:
labels:
k8s-app: kube-dns
version: v14
version: v15
kubernetes.io/cluster-service: "true"
spec:
containers:
- name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.3
image: gcr.io/google_containers/kubedns-amd64:1.5
resources:
# TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in
Expand Down
10 changes: 5 additions & 5 deletions cluster/saltbase/salt/kube-dns/skydns-rc.yaml.base
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@
apiVersion: v1
kind: ReplicationController
metadata:
name: kube-dns-v16
name: kube-dns-v17
namespace: kube-system
labels:
k8s-app: kube-dns
version: v16
version: v17
kubernetes.io/cluster-service: "true"
spec:
replicas: __PILLAR__DNS__REPLICAS__
selector:
k8s-app: kube-dns
version: v16
version: v17
template:
metadata:
labels:
k8s-app: kube-dns
version: v16
version: v17
kubernetes.io/cluster-service: "true"
spec:
containers:
- name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.4
image: gcr.io/google_containers/kubedns-amd64:1.5
resources:
# TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in
Expand Down
10 changes: 5 additions & 5 deletions cluster/saltbase/salt/kube-dns/skydns-rc.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@
apiVersion: v1
kind: ReplicationController
metadata:
name: kube-dns-v16
name: kube-dns-v17
namespace: kube-system
labels:
k8s-app: kube-dns
version: v16
version: v17
kubernetes.io/cluster-service: "true"
spec:
replicas: {{ pillar['dns_replicas'] }}
selector:
k8s-app: kube-dns
version: v16
version: v17
template:
metadata:
labels:
k8s-app: kube-dns
version: v16
version: v17
kubernetes.io/cluster-service: "true"
spec:
containers:
- name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.4
image: gcr.io/google_containers/kubedns-amd64:1.5
resources:
# TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in
Expand Down
10 changes: 5 additions & 5 deletions cluster/saltbase/salt/kube-dns/skydns-rc.yaml.sed
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@
apiVersion: v1
kind: ReplicationController
metadata:
name: kube-dns-v16
name: kube-dns-v17
namespace: kube-system
labels:
k8s-app: kube-dns
version: v16
version: v17
kubernetes.io/cluster-service: "true"
spec:
replicas: $DNS_REPLICAS
selector:
k8s-app: kube-dns
version: v16
version: v17
template:
metadata:
labels:
k8s-app: kube-dns
version: v16
version: v17
kubernetes.io/cluster-service: "true"
spec:
containers:
- name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.4
image: gcr.io/google_containers/kubedns-amd64:1.5
resources:
# TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in
Expand Down
5 changes: 4 additions & 1 deletion cmd/kube-dns/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
}
ks.healthzPort = config.HealthzPort
ks.dnsPort = config.DNSPort
ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations)
ks.kd, err = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations)
if err != nil {
glog.Fatalf("Failed to start kubeDNS: %v", err)
}
return &ks
}

Expand Down
154 changes: 135 additions & 19 deletions pkg/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,15 @@ type KubeDNS struct {
// A Records and SRV Records for (regular) services and headless Services.
cache *TreeCache

// TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap.
reverseRecordMap map[string]*skymsg.Service

// Map of cluster IP to service object. Headless services are not part of this map.
// Used to get a service when given its cluster IP.
// Access to this is coordinated using cacheLock. We use the same lock for cache and this map
// to ensure that they dont get out of sync.
clusterIPServiceMap map[string]*kapi.Service

// caller is responsible for using the cacheLock before invoking methods on cache
// the cache is not thread-safe, and the caller can guarantee thread safety by using
// the cacheLock
Expand All @@ -119,20 +126,28 @@ type KubeDNS struct {
nodesStore kcache.Store
}

func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) *KubeDNS {
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) (*KubeDNS, error) {
// Verify that federation names should not contain dots ('.')
// We can not allow dots since we use that as separator for path segments (svcname.nsname.fedname.svc.domain)
for key := range federations {
if strings.ContainsAny(key, ".") {
return nil, fmt.Errorf("invalid federation name: %s, cannot have '.'", key)
}
}
kd := &KubeDNS{
kubeClient: client,
domain: domain,
cache: NewTreeCache(),
cacheLock: sync.RWMutex{},
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
reverseRecordMap: make(map[string]*skymsg.Service),
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
federations: federations,
kubeClient: client,
domain: domain,
cache: NewTreeCache(),
cacheLock: sync.RWMutex{},
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*kapi.Service),
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
federations: federations,
}
kd.setEndpointsStore()
kd.setServicesStore()
return kd
return kd, nil
}

func (kd *KubeDNS) Start() {
Expand Down Expand Up @@ -245,6 +260,7 @@ func (kd *KubeDNS) removeService(obj interface{}) {
defer kd.cacheLock.Unlock()
kd.cache.deletePath(subCachePath...)
delete(kd.reverseRecordMap, s.Spec.ClusterIP)
delete(kd.clusterIPServiceMap, s.Spec.ClusterIP)
}
}

Expand Down Expand Up @@ -319,6 +335,7 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
defer kd.cacheLock.Unlock()
kd.cache.setSubCache(service.Name, subCache, subCachePath...)
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
kd.clusterIPServiceMap[service.Spec.ClusterIP] = service
}

func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error {
Expand Down Expand Up @@ -422,7 +439,74 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
glog.Infof("Received DNS Request:%s, exact:%v", name, exact)
trimmed := strings.TrimRight(name, ".")
segments := strings.Split(trimmed, ".")
isFederationQuery := false
federationSegments := []string{}
if !exact && kd.isFederationQuery(segments) {
glog.Infof("federation service query: Received federation query. Going to try to find local service first")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to a lower log level ? Seems chatty with no specific information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the information this gives is that this query was interpreted as a federation query.
I would like to keep it for debugging.
Note that we wont see this in the logs unless someone has federation enabled and is sending federation queries.

// Try quering the non-federation (local) service first.
// Will try the federation one later, if this fails.
isFederationQuery = true
federationSegments = append(federationSegments, segments...)
// To try local service, remove federation name from segments.
// Federation name is 3rd in the segment (after service name and namespace).
segments = append(segments[:2], segments[3:]...)
}
path := reverseArray(segments)
records, err := kd.getRecordsForPath(path, exact)
if err != nil {
return nil, err
}
if !isFederationQuery {
if len(records) > 0 {
return records, nil
}
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}

// For federation query, verify that the local service has endpoints.
validRecord := false
for _, val := range records {
// We know that a headless service has endpoints for sure if a record was returned for it.
// The record contains endpoint IPs. So nothing to check for headless services.
if !kd.isHeadlessServiceRecord(&val) {
ok, err := kd.serviceWithClusterIPHasEndpoints(&val)
if err != nil {
glog.Infof("federation service query: unexpected error while trying to find if service has endpoint: %v")
continue
}
if !ok {
glog.Infof("federation service query: skipping record since service has no endpoint: %v", val)
continue
}
}
validRecord = true
break
}
if validRecord {
// There is a local service with valid endpoints, return its CNAME.
name := strings.Join(reverseArray(path), ".")
// Ensure that this name that we are returning as a CNAME response is a fully qualified
// domain name so that the client's resolver library doesn't have to go through its
// search list all over again.
if !strings.HasSuffix(name, ".") {
name = name + "."
}
glog.Infof("federation service query: Returning CNAME for local service : %s", name)
return []skymsg.Service{{Host: name}}, nil
}

// If the name query is not an exact query and does not match any records in the local store,
// attempt to send a federation redirect (CNAME) response.
if !exact {
glog.Infof("federation service query: Did not find a local service. Trying federation redirect (CNAME) response")
return kd.federationRecords(reverseArray(federationSegments))
}

return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}

func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) {
retval := []skymsg.Service{}
if kd.isPodRecord(path) {
ip, err := kd.getPodIP(path)
if err == nil {
Expand All @@ -448,21 +532,50 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
records := kd.cache.getValuesForPathWithWildcards(path...)
glog.V(2).Infof("Received %d records from cache", len(records))
for _, val := range records {
retval = append(retval, *val)
}
glog.Infof("records:%v, retval:%v, path:%v", records, retval, path)
if len(retval) > 0 {
return retval, nil
return retval, nil
}

// Returns true if the given record corresponds to a headless service.
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it.
// This is because the code will panic, if we try to acquire it again if we already have it.
func (kd *KubeDNS) isHeadlessServiceRecord(msg *skymsg.Service) bool {
// If it is not a headless service, then msg.Host will be the cluster IP.
// So we can check if msg.host exists in our clusterIPServiceMap.
_, ok := kd.clusterIPServiceMap[msg.Host]
// It is headless service if no record was found.
return !ok
}

// Returns true if the service corresponding to the given message has endpoints.
// Note: Works only for services with ClusterIP. Will return an error for headless service (service without a clusterIP).
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it.
// This is because the code will panic, if we try to acquire it again if we already have it.
func (kd *KubeDNS) serviceWithClusterIPHasEndpoints(msg *skymsg.Service) (bool, error) {
svc, ok := kd.clusterIPServiceMap[msg.Host]
if !ok {
// It is a headless service.
return false, fmt.Errorf("method not expected to be called for headless service")
}

// If the name query is not an exact query and does not match any records in the local store,
// attempt to send a federation redirect (CNAME) response.
if !exact {
return kd.federationRecords(path)
key, err := kcache.MetaNamespaceKeyFunc(svc)
if err != nil {
return false, err
}

return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
e, exists, err := kd.endpointsStore.GetByKey(key)
if err != nil {
return false, fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
}
if !exists {
return false, nil
}
if e, ok := e.(*kapi.Endpoints); ok {
return len(e.Subsets) > 0, nil
}
return false, fmt.Errorf("unexpected: found non-endpoint object in endpoint store: %v", e)
}

// ReverseRecords performs a reverse lookup for the given name.
Expand Down Expand Up @@ -558,6 +671,9 @@ func getSkyMsg(ip string, port int) (*skymsg.Service, string) {
// 5. Fourth segment is exactly "svc"
// 6. The remaining segments match kd.domainPath.
// 7. And federation must be one of the listed federations in the config.
// Note: Because of the above conditions, this method will treat wildcard queries such as
// *.mysvc.myns.myfederation.svc.domain.path as non-federation queries.
// We can add support for wildcard queries later, if needed.
Copy link

@ghost ghost Jun 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I would like us to support wildcard queries for federated services, as this will be an easy way for clients to discover all shards of a service globally, with a single DNS query. Pretty cool. Can wait until post-1.3.

Copy link
Contributor

@madhusudancs madhusudancs Jun 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quinton-hoole How are the clients expected to format the wildcard-names? Specifically, where do you expect them to put the *?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what do you expect the KubeDNS response to be?

func (kd *KubeDNS) isFederationQuery(path []string) bool {
if len(path) == 4+len(kd.domainPath) &&
len(validation.IsDNS952Label(path[0])) == 0 &&
Expand Down
Loading