Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtfulCoder committed May 21, 2016
1 parent fc04064 commit 3ada217
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 244 deletions.
5 changes: 1 addition & 4 deletions cluster/saltbase/salt/kube-dns/kubedns-rc.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ spec:
version: v12
kubernetes.io/cluster-service: "true"
spec:
{% if grains['cloud'] is defined and grains['cloud'] in [ 'vsphere', 'photon-controller' ] %}
hostNetwork: true
{% endif %}
containers:
- name: kubedns
image: artfulcoder/kubedns-amd64:1.0
image: gcr.io/google_containers/kubedns-amd64:1.0
resources:
# TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in
Expand Down
6 changes: 3 additions & 3 deletions cmd/kube-dns/app/options/options.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,8 +52,8 @@ func (m clusterDomainVar) Set(v string) error {
v = strings.TrimSuffix(v, ".")
segments := strings.Split(v, ".")
for _, segment := range segments {
if !validation.IsDNS1123Label(segment) {
return fmt.Errorf("Not a valid DNS label")
if errs := validation.IsDNS1123Label(segment); len(errs) > 0 {
return fmt.Errorf("Not a valid DNS label. %v", errs)
}
}
if !strings.HasSuffix(v, ".") {
Expand Down
11 changes: 8 additions & 3 deletions cmd/kube-dns/app/server.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,6 @@ import (
"syscall"

"github.com/golang/glog"

"github.com/skynetservices/skydns/metrics"
"github.com/skynetservices/skydns/server"
"k8s.io/kubernetes/cmd/kube-dns/app/options"
Expand Down Expand Up @@ -104,7 +103,13 @@ func (server *KubeDNSServer) setupHealthzHandlers() {
fmt.Fprintf(w, "ok\n")
})
http.HandleFunc("/cache", func(w http.ResponseWriter, req *http.Request) {
fmt.Fprint(w, server.kd.GetCacheAsJSON())
serializedJSON, err := server.kd.GetCacheAsJSON()
if err == nil {
fmt.Fprint(w, serializedJSON)
} else {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err)
}
})
}

Expand Down
4 changes: 1 addition & 3 deletions cmd/kube-dns/dns.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -23,11 +23,9 @@ import (
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/version/verflag"
"runtime"
)

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
config := options.NewKubeDNSConfig()
config.AddFlags(pflag.CommandLine)

Expand Down
136 changes: 87 additions & 49 deletions pkg/dns/dns.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -19,13 +19,14 @@ package dns
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"hash/fnv"
"net"
"strings"
"sync"
"time"

etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
skymsg "github.com/skynetservices/skydns/msg"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
Expand All @@ -47,20 +48,40 @@ const (
podSubdomain = "pod"

// Resync period for the kube controller loop.
resyncPeriod = 30 * time.Minute
resyncPeriod = 5 * time.Minute
)

type KubeDNS struct {
// kubeClient makes calls to API Server and registers calls with API Server
// to get Endpoints and Service objects.
kubeClient *kclient.Client
// DNS domain name.

// The domain for which this DNS Server is authoritative.
domain string

// A cache that contains all the endpoints in the system.
endpointsStore kcache.Store

// A cache that contains all the services in the system.
servicesStore kcache.Store
cache *TreeCache
domainPath []string
eController *kframework.Controller
servicesStore kcache.Store

// stores DNS records for the domain.
// A Records and SRV Records for (regular) services and headless Services.
cache *TreeCache

// caller is responsible for using the cacheLock before invoking methods on cache
// the cache is not thread-safe, and the caller can guarantee thread safety by using
// the cacheLock
cacheLock sync.RWMutex

// The domain for which this DNS Server is authoritative, in array format and reversed.
// e.g. if domain is "cluster.local", domainPath is []string{"local", "cluster"}
domainPath []string

// endpointsController invokes registered callbacks when endpoints change.
endpointsController *kframework.Controller

// serviceController invokes registered callbacks when services change.
serviceController *kframework.Controller
}

Expand All @@ -69,6 +90,7 @@ func NewKubeDNS(client *kclient.Client, domain string) *KubeDNS {
kubeClient: client,
domain: domain,
cache: NewTreeCache(),
cacheLock: sync.RWMutex{},
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
}
kd.setEndpointsStore()
Expand All @@ -77,11 +99,14 @@ func NewKubeDNS(client *kclient.Client, domain string) *KubeDNS {
}

func (kd *KubeDNS) Start() {
go kd.eController.Run(wait.NeverStop)
go kd.endpointsController.Run(wait.NeverStop)
go kd.serviceController.Run(wait.NeverStop)
// Wait synchronously for the Kubernetes service and add a DNS record for it.
// TODO (abshah) UNCOMMENT AFTER TEST COMPLETE
//kd.waitForKubernetesService()
// This ensures that the Start function returns only after having received Service objects
// from APIServer.
// TODO: we might not have to wait for kubernetes service specifically. We should just wait
// for a list operation to be complete from APIServer.
kd.waitForKubernetesService()
}

func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) {
Expand All @@ -101,9 +126,11 @@ func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) {
return
}

func (kd *KubeDNS) GetCacheAsJSON() string {
json, _ := kd.cache.Serialize("")
return json
func (kd *KubeDNS) GetCacheAsJSON() (string, error) {
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
json, err := kd.cache.Serialize()
return json, err
}

func (kd *KubeDNS) setServicesStore() {
Expand All @@ -124,7 +151,7 @@ func (kd *KubeDNS) setServicesStore() {
func (kd *KubeDNS) setEndpointsStore() {
// Returns a cache.ListWatch that gets all changes to endpoints.
endpointsWatch := kcache.NewListWatchFromClient(kd.kubeClient, "endpoints", kapi.NamespaceAll, kselector.Everything())
kd.endpointsStore, kd.eController = kframework.NewInformer(
kd.endpointsStore, kd.endpointsController = kframework.NewInformer(
endpointsWatch,
&kapi.Endpoints{},
resyncPeriod,
Expand All @@ -138,24 +165,35 @@ func (kd *KubeDNS) setEndpointsStore() {
)
}

func (kd *KubeDNS) newService(obj interface{}) {
func assertIsService(obj interface{}) (*kapi.Service, bool) {
if service, ok := obj.(*kapi.Service); ok {
return service, ok
} else {
glog.Errorf("Type assertion failed! Expected 'Service', got %T", service)
return nil, ok
}
}

func (kd *KubeDNS) newService(obj interface{}) {
if service, ok := assertIsService(obj); ok {
// if ClusterIP is not set, a DNS entry should not be created
if !kapi.IsServiceIPSet(service) {
kd.newHeadlessService(service)
return
}
if len(service.Spec.Ports) == 0 {
glog.Info("Unexpected service with no ports, this should not have happend: %v", service)
glog.Warning("Unexpected service with no ports, this should not have happend: %v", service)
}
kd.newPortalService(service)
}
}

func (kd *KubeDNS) removeService(obj interface{}) {
if s, ok := obj.(*kapi.Service); ok {
if s, ok := assertIsService(obj); ok {
subCachePath := append(kd.domainPath, serviceSubdomain, s.Namespace, s.Name)
kd.cache.DeletePath(subCachePath...)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.deletePath(subCachePath...)
}
}

Expand Down Expand Up @@ -194,7 +232,7 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
glog.V(1).Infof("could not find service for endpoint %q in namespace %q", e.Name, e.Namespace)
return nil, nil
}
if svc, ok := obj.(*kapi.Service); ok {
if svc, ok := assertIsService(obj); ok {
return svc, nil
}
return nil, fmt.Errorf("got a non service object in services store %v", obj)
Expand All @@ -203,18 +241,20 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
func (kd *KubeDNS) newPortalService(service *kapi.Service) {
subCache := NewTreeCache()
recordValue, recordLabel := getSkyMsg(service.Spec.ClusterIP, 0)
subCache.SetEntry(recordLabel, recordValue)
subCache.setEntry(recordLabel, recordValue)

// Generate SRV Records
for i := range service.Spec.Ports {
port := &service.Spec.Ports[i]
if port.Name != "" && port.Protocol != "" {
srvValue := kd.generateSRVRecordValue(service, int(port.Port))
subCache.SetEntry(recordLabel, srvValue, "_"+strings.ToLower(string(port.Protocol)), "_"+port.Name)
subCache.setEntry(recordLabel, srvValue, "_"+strings.ToLower(string(port.Protocol)), "_"+port.Name)
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
kd.cache.SetSubCache(service.Name, subCache, subCachePath...)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.setSubCache(service.Name, subCache, subCachePath...)
}

func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error {
Expand All @@ -233,26 +273,28 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
if hostLabel, exists := getHostname(address, podHostnames); exists {
endpointName = hostLabel
}
subCache.SetEntry(endpointName, recordValue)
subCache.setEntry(endpointName, recordValue)
for portIdx := range e.Subsets[idx].Ports {
endpointPort := &e.Subsets[idx].Ports[portIdx]
if endpointPort.Name != "" && endpointPort.Protocol != "" {
srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName)
subCache.SetEntry(endpointName, srvValue, "_"+strings.ToLower(string(endpointPort.Protocol)), "_"+endpointPort.Name)
subCache.setEntry(endpointName, srvValue, "_"+strings.ToLower(string(endpointPort.Protocol)), "_"+endpointPort.Name)
}
}
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, svc.Namespace)
kd.cache.SetSubCache(svc.Name, subCache, subCachePath...)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.setSubCache(svc.Name, subCache, subCachePath...)
return nil
}

func getHostname(address *kapi.EndpointAddress, podHostnames map[string]endpoints.HostRecord) (string, bool) {
if len(address.Hostname) > 0 {
return address.Hostname, true
}
if hostRecord, exists := podHostnames[address.IP]; exists && validation.IsDNS1123Label(hostRecord.HostName) {
if hostRecord, exists := podHostnames[address.IP]; exists && len(validation.IsDNS1123Label(hostRecord.HostName)) == 0 {
return hostRecord.HostName, true
}
return "", false
Expand All @@ -272,12 +314,12 @@ func getPodHostnamesFromAnnotation(annotations map[string]string) (map[string]en
return hostnames, nil
}

func (kd *KubeDNS) generateSRVRecordValue(svc *kapi.Service, portNumber int, cNameLabels ...string) *skymsg.Service {
cName := strings.Join([]string{svc.Name, svc.Namespace, serviceSubdomain, kd.domain}, ".")
for _, cNameLabel := range cNameLabels {
cName = cNameLabel + "." + cName
func (kd *KubeDNS) generateSRVRecordValue(svc *kapi.Service, portNumber int, labels ...string) *skymsg.Service {
host := strings.Join([]string{svc.Name, svc.Namespace, serviceSubdomain, kd.domain}, ".")
for _, cNameLabel := range labels {
host = cNameLabel + "." + host
}
recordValue, _ := getSkyMsg(cName, portNumber)
recordValue, _ := getSkyMsg(host, portNumber)
return recordValue
}

Expand Down Expand Up @@ -312,9 +354,10 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
segments := strings.Split(trimmed, ".")
path := reverseArray(segments)
if kd.isPodRecord(path) {
response, err := kd.getPodRecord(path)
ip, err := kd.getPodIP(path)
if err == nil {
return []skymsg.Service{*response}, nil
skyMsg, _ := getSkyMsg(ip, 0)
return []skymsg.Service{*skyMsg}, nil
}
return nil, err
}
Expand All @@ -324,15 +367,17 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
if key == "" {
return []skymsg.Service{}, nil
}
if record, ok := kd.cache.GetEntry(key, path[:len(path)-1]...); ok {
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
if record, ok := kd.cache.getEntry(key, path[:len(path)-1]...); ok {
return []skymsg.Service{*(record.(*skymsg.Service))}, nil
}
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}

// tmp, _ := kd.cache.Serialize("")
// glog.Infof("Searching path:%q, %v", path, tmp)
records := kd.cache.GetValuesForPathWithRegex(path...)
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
records := kd.cache.getValuesForPathWithWildcards(path...)
retval := []skymsg.Service{}
for _, val := range records {
retval = append(retval, *(val.(*skymsg.Service)))
Expand All @@ -350,7 +395,7 @@ func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
segments := strings.Split(strings.TrimRight(name, "."), ".")

for _, k := range segments {
if k == "*" || k == "any" {
if k == "*" {
return nil, fmt.Errorf("reverse can not contain wildcards")
}
}
Expand All @@ -374,20 +419,13 @@ func (kd *KubeDNS) isPodRecord(path []string) bool {
return true
}

func (kd *KubeDNS) getPodRecord(path []string) (*skymsg.Service, error) {
func (kd *KubeDNS) getPodIP(path []string) (string, error) {
ipStr := path[len(path)-1]
ip := strings.Replace(ipStr, "-", ".", -1)
if parsed := net.ParseIP(ip); parsed != nil {
msg := &skymsg.Service{
Host: ip,
Port: 0,
Priority: 10,
Weight: 10,
Ttl: 30,
}
return msg, nil
return ip, nil
}
return nil, fmt.Errorf("Invalid IP Address %v", ip)
return "", fmt.Errorf("Invalid IP Address %v", ip)
}

// Returns record in a format that SkyDNS understands.
Expand Down
Loading

0 comments on commit 3ada217

Please sign in to comment.