Skip to content

Commit

Permalink
Specify hostname, subdomain via annotation on podspec.
Browse files Browse the repository at this point in the history
The hostname is a DNS A record, if the subdomain maps to a service name
in the same namespace
  • Loading branch information
ArtfulCoder committed Mar 4, 2016
1 parent 4e00333 commit a3c00aa
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 54 deletions.
4 changes: 4 additions & 0 deletions cluster/addons/dns/kube2sky/Changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Version 1.14 (Mar 4 2016 Abhishek Shah <abshah@google.com>)
- If Endpoint has hostnames-map annotation (endpoints.net.beta.kubernetes.io/hostnames-map),
the hostnames supplied via the annotation will be used to generate A Records for Headless Service.

## Version 1.13 (Mar 1 2016 Prashanth.B <beeps@google.com>)
- Synchronously wait for the Kubernetes service at startup.
- Add a SIGTERM/SIGINT handler.
Expand Down
2 changes: 1 addition & 1 deletion cluster/addons/dns/kube2sky/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

.PHONY: all kube2sky container push clean test

TAG = 1.13
TAG = 1.14
PREFIX = gcr.io/google_containers

all: container
Expand Down
18 changes: 17 additions & 1 deletion cluster/addons/dns/kube2sky/kube2sky.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
skymsg "github.com/skynetservices/skydns/msg"
flag "github.com/spf13/pflag"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/unversioned"
kcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/restclient"
Expand All @@ -46,6 +47,7 @@ import (
kselector "k8s.io/kubernetes/pkg/fields"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/wait"
)

Expand Down Expand Up @@ -159,14 +161,28 @@ func getSkyMsg(ip string, port int) *skymsg.Service {
}

func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error {
glog.V(4).Infof("Endpoints Annotations: %v", e.Annotations)
for idx := range e.Subsets {
for subIdx := range e.Subsets[idx].Addresses {
b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, 0))
endpointIP := e.Subsets[idx].Addresses[subIdx].IP
b, err := json.Marshal(getSkyMsg(endpointIP, 0))
if err != nil {
return err
}
recordValue := string(b)
recordLabel := getHash(recordValue)
if serializedPodHostnames := e.Annotations[endpoints.PodHostnamesAnnotation]; len(serializedPodHostnames) > 0 {
podHostnames := map[string]endpoints.HostRecord{}
err := json.Unmarshal([]byte(serializedPodHostnames), &podHostnames)
if err != nil {
return err
}
if hostRecord, exists := podHostnames[string(endpointIP)]; exists {
if validation.IsDNS1123Label(hostRecord.HostName) {
recordLabel = hostRecord.HostName
}
}
}
recordKey := buildDNSNameString(subdomain, recordLabel)

glog.V(2).Infof("Setting DNS record: %v -> %q\n", recordKey, recordValue)
Expand Down
2 changes: 1 addition & 1 deletion cluster/addons/dns/skydns-rc.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ spec:
- name: etcd-storage
mountPath: /var/etcd/data
- name: kube2sky
image: gcr.io/google_containers/kube2sky:1.13
image: gcr.io/google_containers/kube2sky:1.14
resources:
# TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in
Expand Down
10 changes: 10 additions & 0 deletions pkg/api/endpoints/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ import (
hashutil "k8s.io/kubernetes/pkg/util/hash"
)

const (
// Its value is the json representation of map[string(IP)][HostRecord]
// example: '{"10.245.1.6":{"HostName":"my-webserver"}}'
PodHostnamesAnnotation = "endpoints.beta.kubernetes.io/hostnames-map"
)

type HostRecord struct {
HostName string
}

// RepackSubsets takes a slice of EndpointSubset objects, expands it to the full
// representation, and then repacks that into the canonical layout. This
// ensures that code which operates on these objects can rely on the common
Expand Down
12 changes: 12 additions & 0 deletions pkg/api/pod/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ import (
"k8s.io/kubernetes/pkg/util/intstr"
)

const (
// The annotation value is a string specifying the hostname to be used for the pod e.g 'my-webserver-1'
PodHostnameAnnotation = "pod.beta.kubernetes.io/hostname"

// The annotation value is a string specifying the subdomain e.g. "my-web-service"
// If specified, on the the pod itself, "<hostname>.my-web-service.<namespace>.svc.<cluster domain>" would resolve to
// the pod's IP.
// If there is a headless service named "my-web-service" in the same namespace as the pod, then,
// <hostname>.my-web-service.<namespace>.svc.<cluster domain>" would be resolved by the cluster DNS Server.
PodSubdomainAnnotation = "pod.beta.kubernetes.io/subdomain"
)

// FindPort locates the container port for the given pod and portName. If the
// targetPort is a number, use that. If the targetPort is a string, look that
// string up in all named ports in all containers in the target pod. If no
Expand Down
62 changes: 57 additions & 5 deletions pkg/api/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
"regexp"
"strings"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
utilpod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/api/resource"
apiservice "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/capabilities"
Expand All @@ -36,8 +39,6 @@ import (
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/validation/field"

"github.com/golang/glog"
)

// TODO: delete this global variable when we enable the validation of common
Expand Down Expand Up @@ -113,10 +114,34 @@ func ValidateAnnotations(annotations map[string]string, fldPath *field.Path) fie
if totalSize > (int64)(totalAnnotationSizeLimitB) {
allErrs = append(allErrs, field.TooLong(fldPath, "", totalAnnotationSizeLimitB))
}
return allErrs
}

func ValidatePodSpecificAnnotations(annotations map[string]string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if annotations[api.AffinityAnnotationKey] != "" {
allErrs = append(allErrs, ValidateAffinityInPodAnnotations(annotations, fldPath)...)
}

if hostname, exists := annotations[utilpod.PodHostnameAnnotation]; exists && !validation.IsDNS1123Label(hostname) {
allErrs = append(allErrs, field.Invalid(fldPath, utilpod.PodHostnameAnnotation, DNS1123LabelErrorMsg))
}

if subdomain, exists := annotations[utilpod.PodSubdomainAnnotation]; exists && !validation.IsDNS1123Label(subdomain) {
allErrs = append(allErrs, field.Invalid(fldPath, utilpod.PodSubdomainAnnotation, DNS1123LabelErrorMsg))
}

return allErrs
}

func ValidateEndpointsSpecificAnnotations(annotations map[string]string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
hostnamesMap, exists := annotations[endpoints.PodHostnamesAnnotation]
if exists && !isValidHostnamesMap(hostnamesMap) {
allErrs = append(allErrs, field.Invalid(fldPath, endpoints.PodHostnamesAnnotation,
`must be a valid json representation of map[string(IP)][HostRecord] e.g. "{"10.245.1.6":{"HostName":"my-webserver"}}"`))
}

return allErrs
}

Expand Down Expand Up @@ -1356,7 +1381,9 @@ func validateImagePullSecrets(imagePullSecrets []api.LocalObjectReference, fldPa

// ValidatePod tests if required fields in the pod are set.
func ValidatePod(pod *api.Pod) field.ErrorList {
allErrs := ValidateObjectMeta(&pod.ObjectMeta, true, ValidatePodName, field.NewPath("metadata"))
fldPath := field.NewPath("metadata")
allErrs := ValidateObjectMeta(&pod.ObjectMeta, true, ValidatePodName, fldPath)
allErrs = append(allErrs, ValidatePodSpecificAnnotations(pod.ObjectMeta.Annotations, fldPath.Child("annotations"))...)
allErrs = append(allErrs, ValidatePodSpec(&pod.Spec, field.NewPath("spec"))...)
return allErrs
}
Expand Down Expand Up @@ -1520,8 +1547,9 @@ func ValidatePodSecurityContext(securityContext *api.PodSecurityContext, spec *a
// ValidatePodUpdate tests to see if the update is legal for an end user to make. newPod is updated with fields
// that cannot be changed.
func ValidatePodUpdate(newPod, oldPod *api.Pod) field.ErrorList {
allErrs := ValidateObjectMetaUpdate(&newPod.ObjectMeta, &oldPod.ObjectMeta, field.NewPath("metadata"))

fldPath := field.NewPath("metadata")
allErrs := ValidateObjectMetaUpdate(&newPod.ObjectMeta, &oldPod.ObjectMeta, fldPath)
allErrs = append(allErrs, ValidatePodSpecificAnnotations(newPod.ObjectMeta.Annotations, fldPath.Child("annotations"))...)
specPath := field.NewPath("spec")
if len(newPod.Spec.Containers) != len(oldPod.Spec.Containers) {
//TODO: Pinpoint the specific container that causes the invalid error after we have strategic merge diff
Expand Down Expand Up @@ -1884,6 +1912,7 @@ func ValidatePodTemplateSpec(spec *api.PodTemplateSpec, fldPath *field.Path) fie
allErrs := field.ErrorList{}
allErrs = append(allErrs, ValidateLabels(spec.Labels, fldPath.Child("labels"))...)
allErrs = append(allErrs, ValidateAnnotations(spec.Annotations, fldPath.Child("annotations"))...)
allErrs = append(allErrs, ValidatePodSpecificAnnotations(spec.Annotations, fldPath.Child("annotations"))...)
allErrs = append(allErrs, ValidatePodSpec(&spec.Spec, fldPath.Child("spec"))...)
return allErrs
}
Expand Down Expand Up @@ -2503,6 +2532,7 @@ func ValidateNamespaceFinalizeUpdate(newNamespace, oldNamespace *api.Namespace)
// ValidateEndpoints tests if required fields are set.
func ValidateEndpoints(endpoints *api.Endpoints) field.ErrorList {
allErrs := ValidateObjectMeta(&endpoints.ObjectMeta, true, ValidateEndpointsName, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateEndpointsSpecificAnnotations(endpoints.Annotations, field.NewPath("annotations"))...)
allErrs = append(allErrs, validateEndpointSubsets(endpoints.Subsets, field.NewPath("subsets"))...)
return allErrs
}
Expand Down Expand Up @@ -2586,6 +2616,7 @@ func validateEndpointPort(port *api.EndpointPort, requireName bool, fldPath *fie
func ValidateEndpointsUpdate(newEndpoints, oldEndpoints *api.Endpoints) field.ErrorList {
allErrs := ValidateObjectMetaUpdate(&newEndpoints.ObjectMeta, &oldEndpoints.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, validateEndpointSubsets(newEndpoints.Subsets, field.NewPath("subsets"))...)
allErrs = append(allErrs, ValidateEndpointsSpecificAnnotations(newEndpoints.Annotations, field.NewPath("annotations"))...)
return allErrs
}

Expand Down Expand Up @@ -2651,3 +2682,24 @@ func ValidateLoadBalancerStatus(status *api.LoadBalancerStatus, fldPath *field.P
}
return allErrs
}

func isValidHostnamesMap(serializedPodHostNames string) bool {
if len(serializedPodHostNames) == 0 {
return false
}
podHostNames := map[string]endpoints.HostRecord{}
err := json.Unmarshal([]byte(serializedPodHostNames), &podHostNames)
if err != nil {
return false
}

for ip, hostRecord := range podHostNames {
if !validation.IsDNS1123Label(hostRecord.HostName) {
return false
}
if net.ParseIP(ip) == nil {
return false
}
}
return true
}
83 changes: 71 additions & 12 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"reflect"
"time"

"encoding/json"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/errors"
podutil "k8s.io/kubernetes/pkg/api/pod"
utilpod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller"
Expand All @@ -37,8 +40,6 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"

"github.com/golang/glog"
)

const (
Expand Down Expand Up @@ -187,7 +188,8 @@ func (e *EndpointController) updatePod(old, cur interface{}) {

oldPod := cur.(*api.Pod)
// Only need to get the old services if the labels changed.
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) {
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
!hostNameAndDomainAnnotationsAreEqual(newPod.Annotations, oldPod.Annotations) {
oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
Expand All @@ -200,6 +202,17 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
}
}

func hostNameAndDomainAnnotationsAreEqual(annotation1, annotation2 map[string]string) bool {
if annotation1 == nil {
annotation1 = map[string]string{}
}
if annotation2 == nil {
annotation2 = map[string]string{}
}
return annotation1[utilpod.PodHostnameAnnotation] == annotation2[utilpod.PodHostnameAnnotation] &&
annotation1[utilpod.PodSubdomainAnnotation] == annotation2[utilpod.PodSubdomainAnnotation]
}

// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) {
Expand Down Expand Up @@ -294,6 +307,8 @@ func (e *EndpointController) syncService(key string) {
}

subsets := []api.EndpointSubset{}
podHostNames := map[string]endpoints.HostRecord{}

for i := range pods.Items {
pod := &pods.Items[i]

Expand All @@ -316,14 +331,26 @@ func (e *EndpointController) syncService(key string) {
continue
}

hostname := pod.Annotations[utilpod.PodHostnameAnnotation]
if len(hostname) > 0 &&
pod.Annotations[utilpod.PodSubdomainAnnotation] == service.Name &&
service.Namespace == pod.Namespace {
hostRecord := endpoints.HostRecord{
HostName: hostname,
}
podHostNames[string(pod.Status.PodIP)] = hostRecord
}

epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
epa := api.EndpointAddress{
IP: pod.Status.PodIP,
TargetRef: &api.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
if api.IsPodReady(pod) {
subsets = append(subsets, api.EndpointSubset{
Addresses: []api.EndpointAddress{epa},
Expand Down Expand Up @@ -356,14 +383,38 @@ func (e *EndpointController) syncService(key string) {
return
}
}
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {

serializedPodHostNames := ""
if len(podHostNames) > 0 {
b, err := json.Marshal(podHostNames)
if err != nil {
glog.Errorf("Error updating endpoints. Marshalling of hostnames failed.: %v", err)
e.queue.Add(key) // Retry
return
}
serializedPodHostNames = string(b)
}

podHostNamesAreEqual := verifyPodHostNamesAreEqual(serializedPodHostNames, currentEndpoints.Annotations)

newAnnotations := make(map[string]string)
newAnnotations[endpoints.PodHostnamesAnnotation] = serializedPodHostNames
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) &&
reflect.DeepEqual(currentEndpoints.Labels, service.Labels) && podHostNamesAreEqual {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return
}
newEndpoints := currentEndpoints
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels

if newEndpoints.Annotations == nil {
newEndpoints.Annotations = make(map[string]string)
}
if len(serializedPodHostNames) == 0 {
delete(newEndpoints.Annotations, endpoints.PodHostnamesAnnotation)
} else {
newEndpoints.Annotations[endpoints.PodHostnamesAnnotation] = serializedPodHostNames
}
if len(currentEndpoints.ResourceVersion) == 0 {
// No previous endpoints, create them
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
Expand All @@ -377,6 +428,14 @@ func (e *EndpointController) syncService(key string) {
}
}

func verifyPodHostNamesAreEqual(newPodHostNames string, oldAnnotations map[string]string) bool {
oldPodHostNames := ""
if oldAnnotations != nil {
oldPodHostNames = oldAnnotations[endpoints.PodHostnamesAnnotation]
}
return oldPodHostNames == newPodHostNames
}

// checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/container/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ type RunContainerOptions struct {
CgroupParent string
// The type of container rootfs
ReadOnly bool
// hostname for pod containers
Hostname string
}

// VolumeInfo contains information about the volume.
Expand Down
Loading

0 comments on commit a3c00aa

Please sign in to comment.