Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin/kubernetes: Fix dns programming duration metric #4255

Merged
merged 13 commits into from
Dec 1, 2020
Prev Previous commit
Next Next commit
move cleanup back to toFuncs; get data reqd to record latency before …
…calling toFuncs

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
  • Loading branch information
chrisohaver committed Nov 9, 2020
commit 259dc4bb4da7b7d8832b3de49b4a0539d6718eb7
27 changes: 15 additions & 12 deletions plugin/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,20 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
apiObj runtime.Object
listWatch cache.ListWatch
to object.ToFunc
latency object.RecordLatencyFunc
latency *object.EndpointLatencyRecorder
)
if opts.useEndpointSlices {
apiObj = &discovery.EndpointSlice{}
listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
to = object.EndpointSliceToEndpoints
latency = dns.recordEndpointSliceDNSProgrammingLatency
latency = dns.EndpointSliceLatencyRecorder()
} else {
apiObj = &api.Endpoints{}
listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
to = object.ToEndpoints
latency = dns.recordEndpointDNSProgrammingLatency
latency = dns.EndpointsLatencyRecorder()
}
dns.epLister, dns.epController = object.NewIndexerInformer(
&listWatch,
Expand All @@ -172,12 +172,19 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
return &dns
}

func (dns *dnsControl) recordEndpointDNSProgrammingLatency(obj meta.Object) {
recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj)
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
return dns.SvcIndex(object.ServiceKey(o.GetName(), o.GetNamespace()))
},
}
}

func (dns *dnsControl) recordEndpointSliceDNSProgrammingLatency(obj meta.Object) {
recordDNSProgrammingLatency(dns.SvcIndex(object.ServiceKey(obj.GetLabels()[discovery.LabelServiceName], obj.GetNamespace())), obj)
func (dns *dnsControl) EndpointSliceLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
return dns.SvcIndex(object.ServiceKey(o.GetLabels()[discovery.LabelServiceName], o.GetNamespace()))
},
}
}

func podIPIndexFunc(obj interface{}) ([]string, error) {
Expand Down Expand Up @@ -517,10 +524,6 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
}
}

func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service {
return dns.SvcIndex(object.ServiceKey(endpoints.GetName(), endpoints.GetNamespace()))
}

// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
// I.e. that they have the same ready addresses, host names, ports (including protocol
// and service names for SRV)
Expand Down
18 changes: 8 additions & 10 deletions plugin/kubernetes/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,13 @@ func TestDNSProgrammingLatencyEndpointSlices(t *testing.T) {
epIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{})

dns := dnsControl{svcLister: svcIdx}
latencyFunc := dns.recordEndpointSliceDNSProgrammingLatency
svcProc := object.DefaultProcessor(object.ToService, nil)(svcIdx, cache.ResourceEventHandlerFuncs{})
epProc := object.DefaultProcessor(object.EndpointSliceToEndpoints, latencyFunc)(epIdx, cache.ResourceEventHandlerFuncs{})
epProc := object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder())(epIdx, cache.ResourceEventHandlerFuncs{})

durationSinceFunc = func(t time.Time) time.Duration {
object.DurationSinceFunc = func(t time.Time) time.Duration {
return now.Sub(t)
}
DNSProgrammingLatency.Reset()
object.DNSProgrammingLatency.Reset()

endpoints1 := []discovery.Endpoint{{
Addresses: []string{"1.2.3.4"},
Expand All @@ -84,7 +83,7 @@ func TestDNSProgrammingLatencyEndpointSlices(t *testing.T) {
createService(t, svcProc, "headless-wrong-annotation", api.ClusterIPNone)
createEndpointSlice(t, epProc, "headless-wrong-annotation", "wrong-value", nil)

if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil {
if err := testutil.CollectAndCompare(object.DNSProgrammingLatency, strings.NewReader(expected)); err != nil {
t.Error(err)
}
}
Expand All @@ -96,14 +95,13 @@ func TestDnsProgrammingLatencyEndpoints(t *testing.T) {
epIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{})

dns := dnsControl{svcLister: svcIdx}
latencyFunc := dns.recordEndpointDNSProgrammingLatency
svcProc := object.DefaultProcessor(object.ToService, nil)(svcIdx, cache.ResourceEventHandlerFuncs{})
epProc := object.DefaultProcessor(object.ToEndpoints, latencyFunc)(epIdx, cache.ResourceEventHandlerFuncs{})
epProc := object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder())(epIdx, cache.ResourceEventHandlerFuncs{})

durationSinceFunc = func(t time.Time) time.Duration {
object.DurationSinceFunc = func(t time.Time) time.Duration {
return now.Sub(t)
}
DNSProgrammingLatency.Reset()
object.DNSProgrammingLatency.Reset()

subset1 := []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
Expand All @@ -128,7 +126,7 @@ func TestDnsProgrammingLatencyEndpoints(t *testing.T) {
createService(t, svcProc, "headless-wrong-annotation", api.ClusterIPNone)
createEndpoints(t, epProc, "headless-wrong-annotation", "wrong-value", nil)

if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil {
if err := testutil.CollectAndCompare(object.DNSProgrammingLatency, strings.NewReader(expected)); err != nil {
t.Error(err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions plugin/kubernetes/object/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func ToEndpoints(obj interface{}) (interface{}, error) {
}
}

*end = api.Endpoints{}

return e, nil
}

Expand Down Expand Up @@ -138,6 +140,8 @@ func EndpointSliceToEndpoints(obj interface{}) (interface{}, error) {
}
}

*ends = discovery.EndpointSlice{}

return e, nil
}

Expand Down
28 changes: 8 additions & 20 deletions plugin/kubernetes/object/informer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package object

import (
api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
Expand All @@ -27,10 +25,15 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.
type RecordLatencyFunc func(meta.Object)

// DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion.
func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) ProcessorBuilder {
func DefaultProcessor(convert ToFunc, recordLatency *EndpointLatencyRecorder) ProcessorBuilder {
return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {
if recordLatency != nil {
if o, ok := d.Object.(meta.Object); ok {
recordLatency.init(o)
}
}
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
obj, err := convert(d.Object)
Expand All @@ -49,7 +52,7 @@ func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) Processor
h.OnAdd(obj)
}
if recordLatency != nil {
recordLatency(d.Object.(meta.Object))
recordLatency.record()
}
case cache.Deleted:
var obj interface{}
Expand All @@ -67,28 +70,13 @@ func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) Processor
}
h.OnDelete(obj)
if !ok && recordLatency != nil {
recordLatency(d.Object.(meta.Object))
recordLatency.record()
}
}
cleanObj(d.Object)
}
return nil
}
}
}

func cleanObj(i interface{}) {
switch item := i.(type) {
case *discovery.EndpointSlice:
*item = discovery.EndpointSlice{}
case *api.Endpoints:
*item = api.Endpoints{}
case *api.Service:
*item = api.Service{}
case *api.Pod:
*item = api.Pod{}
}
return
}

const defaultResyncPeriod = 0
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package kubernetes
package object

import (
"time"

"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -25,42 +26,50 @@ var (
// * headless_without_selector
DNSProgrammingLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: plugin.Namespace,
Subsystem: pluginName,
Subsystem: "kubernetes",
Name: "dns_programming_duration_seconds",
// From 1 millisecond to ~17 minutes.
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
Help: "Histogram of the time (in seconds) it took to program a dns instance.",
}, []string{"service_kind"})

// durationSinceFunc returns the duration elapsed since the given time.
// DurationSinceFunc returns the duration elapsed since the given time.
// Added as a global variable to allow injection for testing.
durationSinceFunc = time.Since
DurationSinceFunc = time.Since
)

func recordDNSProgrammingLatency(svcs []*object.Service, endpoints meta.Object) {
// getLastChangeTriggerTime is the time.Time value of the EndpointsLastChangeTriggerTime
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
var lastChangeTriggerTime time.Time
stringVal, ok := endpoints.GetAnnotations()[api.EndpointsLastChangeTriggerTime]
// EndpointLatencyRecorder records latency metric for endpoint objects
type EndpointLatencyRecorder struct {
TT time.Time
ServiceFunc func(meta.Object) []*Service
Services []*Service
}

func (l *EndpointLatencyRecorder) init(o meta.Object) {
l.Services = l.ServiceFunc(o)
l.TT = time.Time{}
stringVal, ok := o.GetAnnotations()[api.EndpointsLastChangeTriggerTime]
if ok {
ts, err := time.Parse(time.RFC3339Nano, stringVal)
tt, err := time.Parse(time.RFC3339Nano, stringVal)
if err != nil {
log.Warningf("DnsProgrammingLatency cannot be calculated for Endpoints '%s/%s'; invalid %q annotation RFC3339 value of %q",
endpoints.GetNamespace(), endpoints.GetName(), api.EndpointsLastChangeTriggerTime, stringVal)
// In case of error val = time.Zero, which is ignored in the upstream code.
o.GetNamespace(), o.GetName(), api.EndpointsLastChangeTriggerTime, stringVal)
// In case of error val = time.Zero, which is ignored downstream.
}
lastChangeTriggerTime = ts
l.TT = tt
}
}

func (l *EndpointLatencyRecorder) record() {
// isHeadless indicates whether the endpoints object belongs to a headless
// service (i.e. clusterIp = None). Note that this can be a false negatives if the service
// informer is lagging, i.e. we may not see a recently created service. Given that the services
// don't change very often (comparing to much more frequent endpoints changes), cases when this method
// will return wrong answer should be relatively rare. Because of that we intentionally accept this
// flaw to keep the solution simple.
isHeadless := len(svcs) == 1 && svcs[0].ClusterIP == api.ClusterIPNone
isHeadless := len(l.Services) == 1 && l.Services[0].ClusterIP == api.ClusterIPNone

if endpoints == nil || !isHeadless || lastChangeTriggerTime.IsZero() {
if !isHeadless || l.TT.IsZero() {
return
}

Expand All @@ -69,5 +78,5 @@ func recordDNSProgrammingLatency(svcs []*object.Service, endpoints meta.Object)
// LastChangeTriggerTime annotation is set). It means that the corresponding service is a
// "headless service with selector".
DNSProgrammingLatency.WithLabelValues("headless_with_selector").
Observe(durationSinceFunc(lastChangeTriggerTime).Seconds())
Observe(DurationSinceFunc(l.TT).Seconds())
}
3 changes: 3 additions & 0 deletions plugin/kubernetes/object/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func ToPod(obj interface{}) (interface{}, error) {
// during add/update event processing
return pod, errPodTerminating
}

*apiPod = api.Pod{}

return pod, nil
}

Expand Down
2 changes: 2 additions & 0 deletions plugin/kubernetes/object/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func ToService(obj interface{}) (interface{}, error) {

}

*svc = api.Service{}

return s, nil
}

Expand Down