Skip to content

Commit

Permalink
Add additional label support to DNS and UDP_LISTENER probes.
Browse files Browse the repository at this point in the history
Also, while here fix the results channel length in UDP_LISTENER probe.

PiperOrigin-RevId: 279110793
  • Loading branch information
manugarg committed Nov 8, 2019
1 parent 51cb969 commit d6fb625
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 27 deletions.
12 changes: 9 additions & 3 deletions probes/common/statskeeper/statskeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/google/cloudprober/metrics"
"github.com/google/cloudprober/probes/options"
"github.com/google/cloudprober/targets/endpoint"
)

// ProbeResult represents results of a probe run.
Expand Down Expand Up @@ -56,7 +57,7 @@ type ProbeResult interface {
// targets for exporting results, instead of getting a static list in the
// arguments. We do that as the list of targets is usually dynamic and is
// updated on a regular basis.
func StatsKeeper(ctx context.Context, ptype, name string, opts *options.Options, targetsFunc func() []string, resultsChan <-chan ProbeResult, dataChan chan<- *metrics.EventMetrics) {
func StatsKeeper(ctx context.Context, ptype, name string, opts *options.Options, targetsFunc func() []endpoint.Endpoint, resultsChan <-chan ProbeResult, dataChan chan<- *metrics.EventMetrics) {
targetMetrics := make(map[string]*metrics.EventMetrics)
exportTicker := time.NewTicker(opts.StatsExportInterval)
defer exportTicker.Stop()
Expand All @@ -76,12 +77,17 @@ func StatsKeeper(ctx context.Context, ptype, name string, opts *options.Options,
}
case ts := <-exportTicker.C:
for _, t := range targetsFunc() {
em := targetMetrics[t]
em := targetMetrics[t.Name]
if em != nil {
em.AddLabel("ptype", ptype)
em.AddLabel("probe", name)
em.AddLabel("dst", t)
em.AddLabel("dst", t.Name)
em.Timestamp = ts

for _, al := range opts.AdditionalLabels {
em.AddLabel(al.KeyValueForTarget(t.Name))
}

if opts.LogMetrics != nil {
opts.LogMetrics(em)
}
Expand Down
15 changes: 8 additions & 7 deletions probes/common/statskeeper/statskeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/google/cloudprober/metrics"
"github.com/google/cloudprober/probes/options"
"github.com/google/cloudprober/targets/endpoint"
)

// probeRunResult captures the results of a single probe run. The way we work with
Expand Down Expand Up @@ -55,9 +56,9 @@ func (prr probeRunResult) Target() string {
}

func TestStatsKeeper(t *testing.T) {
targets := []string{
"target1",
"target2",
targets := []endpoint.Endpoint{
{Name: "target1"},
{Name: "target2"},
}
pType := "test"
pName := "testProbe"
Expand All @@ -67,7 +68,7 @@ func TestStatsKeeper(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

targetsFunc := func() []string {
targetsFunc := func() []endpoint.Endpoint {
return targets
}
dataChan := make(chan *metrics.EventMetrics, len(targets))
Expand All @@ -77,8 +78,8 @@ func TestStatsKeeper(t *testing.T) {
}
go StatsKeeper(ctx, pType, pName, opts, targetsFunc, resultsChan, dataChan)

for _, t := range targets {
prr := newProbeRunResult(t)
for _, target := range targets {
prr := newProbeRunResult(target.Name)
prr.sent.Inc()
prr.rcvd.Inc()
prr.rtt.IncBy(metrics.NewInt(20000))
Expand All @@ -90,7 +91,7 @@ func TestStatsKeeper(t *testing.T) {
em := <-dataChan
var foundTarget bool
for _, target := range targets {
if em.Label("dst") == target {
if em.Label("dst") == target.Name {
foundTarget = true
break
}
Expand Down
25 changes: 18 additions & 7 deletions probes/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/google/cloudprober/probes/common/statskeeper"
configpb "github.com/google/cloudprober/probes/dns/proto"
"github.com/google/cloudprober/probes/options"
"github.com/google/cloudprober/targets/endpoint"
"github.com/google/cloudprober/validators"
"github.com/miekg/dns"
)
Expand Down Expand Up @@ -74,7 +75,7 @@ type Probe struct {
l *logger.Logger

// book-keeping params
targets []string
targets []endpoint.Endpoint
msg *dns.Msg
client Client
}
Expand Down Expand Up @@ -107,6 +108,16 @@ func (prr probeRunResult) Target() string {
return prr.target
}

func (p *Probe) updateTargets() {
p.targets = p.opts.Targets.ListEndpoints()

for _, target := range p.targets {
for _, al := range p.opts.AdditionalLabels {
al.UpdateForTarget(target.Name, target.Labels)
}
}
}

// Init initializes the probe with the given params.
func (p *Probe) Init(name string, opts *options.Options) error {
c, ok := opts.ProbeConf.(*configpb.ProbeConf)
Expand All @@ -119,7 +130,7 @@ func (p *Probe) Init(name string, opts *options.Options) error {
if p.l = opts.Logger; p.l == nil {
p.l = &logger.Logger{}
}
p.targets = p.opts.Targets.List()
p.updateTargets()

// I believe these objects are safe for concurrent use by multiple goroutines
// (although the documentation doesn't explicitly say so). It uses locks
Expand Down Expand Up @@ -188,19 +199,19 @@ func (p *Probe) validateResponse(resp *dns.Msg, target string, result *probeRunR

func (p *Probe) runProbe(resultsChan chan<- statskeeper.ProbeResult) {
// Refresh the list of targets to probe.
p.targets = p.opts.Targets.List()
p.updateTargets()

wg := sync.WaitGroup{}
for _, target := range p.targets {
wg.Add(1)

// Launch a separate goroutine for each target.
// Write probe results to the "resultsChan" channel.
go func(target string, resultsChan chan<- statskeeper.ProbeResult) {
go func(target endpoint.Endpoint, resultsChan chan<- statskeeper.ProbeResult) {
defer wg.Done()

result := probeRunResult{
target: target,
target: target.Name,
validationFailure: validators.ValidationFailureMap(p.opts.Validators),
}

Expand All @@ -210,7 +221,7 @@ func (p *Probe) runProbe(resultsChan chan<- statskeeper.ProbeResult) {
result.latency = metrics.NewFloat(0)
}

fullTarget := net.JoinHostPort(target, "53")
fullTarget := net.JoinHostPort(target.Name, "53")
result.total.Inc()
resp, latency, err := p.client.Exchange(p.msg, fullTarget)

Expand Down Expand Up @@ -239,7 +250,7 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)

// This function is used by StatsKeeper to get the latest list of targets.
// TODO(manugarg): Make p.targets mutex protected as it's read and written by concurrent goroutines.
targetsFunc := func() []string {
targetsFunc := func() []endpoint.Endpoint {
return p.targets
}

Expand Down
6 changes: 3 additions & 3 deletions probes/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (*mockClient) setSourceIP(net.IP) {}

func runProbe(t *testing.T, testName string, p *Probe, total, success int64) {
p.client = new(mockClient)
p.targets = p.opts.Targets.List()
p.targets = p.opts.Targets.ListEndpoints()

resultsChan := make(chan statskeeper.ProbeResult, len(p.targets))
p.runProbe(resultsChan)
Expand All @@ -82,9 +82,9 @@ func runProbe(t *testing.T, testName string, p *Probe, total, success int64) {
t.Errorf("test(%s): result mismatch got (total, success) = (%d, %d), want (%d, %d)",
testName, result.total.Int64(), result.success.Int64(), total, success)
}
if result.Target() != target {
if result.Target() != target.Name {
t.Errorf("test(%s): unexpected target in probe result. got: %s, want: %s",
testName, result.Target(), target)
testName, result.Target(), target.Name)
}
}
}
Expand Down
31 changes: 25 additions & 6 deletions probes/udplistener/udplistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/google/cloudprober/metrics"
"github.com/google/cloudprober/probes/common/statskeeper"
"github.com/google/cloudprober/probes/options"
"github.com/google/cloudprober/targets/endpoint"

configpb "github.com/google/cloudprober/probes/udplistener/proto"
udpsrv "github.com/google/cloudprober/servers/udp"
Expand All @@ -71,7 +72,7 @@ type Probe struct {
echoMode bool

// map target name to flow state.
targets []string
targets []endpoint.Endpoint
fsm *message.FlowStateMap

// Process and output results synchronization.
Expand Down Expand Up @@ -144,6 +145,16 @@ func (prr probeRunResult) Metrics() *metrics.EventMetrics {
AddMetric("delayed", &prr.delayed)
}

func (p *Probe) updateTargets() {
p.targets = p.opts.Targets.ListEndpoints()

for _, target := range p.targets {
for _, al := range p.opts.AdditionalLabels {
al.UpdateForTarget(target.Name, target.Labels)
}
}
}

// Init initializes the probe with the given params.
func (p *Probe) Init(name string, opts *options.Options) error {
c, ok := opts.ProbeConf.(*configpb.ProbeConf)
Expand Down Expand Up @@ -188,15 +199,15 @@ func (p *Probe) cleanup() {
// initProbeRunResults empties the current probe results objects, updates the
// list of targets and builds a new result object for each target.
func (p *Probe) initProbeRunResults() {
p.targets = p.opts.Targets.List()
p.updateTargets()
if p.echoMode && len(p.targets) > maxTargets {
p.l.Warningf("too many targets (got %d > max %d), responses might be slow.", len(p.targets), maxTargets)
}

p.res = make(map[string]*probeRunResult)
for _, target := range p.targets {
p.res[target] = &probeRunResult{
target: target,
p.res[target.Name] = &probeRunResult{
target: target.Name,
}
}
}
Expand Down Expand Up @@ -358,8 +369,16 @@ func (p *Probe) probeLoop(ctx context.Context, resultsChan chan<- statskeeper.Pr

// Start starts and runs the probe indefinitely.
func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics) {
resultsChan := make(chan statskeeper.ProbeResult, len(p.targets))
targetsFunc := func() []string {
p.updateTargets()

// Make sure we don't create zero length results channel.
minResultsChLen := 10
resultsChLen := len(p.targets)
if resultsChLen < minResultsChLen {
resultsChLen = minResultsChLen
}
resultsChan := make(chan statskeeper.ProbeResult, resultsChLen)
targetsFunc := func() []endpoint.Endpoint {
return p.targets
}

Expand Down
2 changes: 1 addition & 1 deletion probes/udplistener/udplistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func runProbe(ctx context.Context, t *testing.T, inp *inputState) ([]int, chan s
}
port := p.conn.LocalAddr().(*net.UDPAddr).Port

p.targets = p.opts.Targets.List()
p.updateTargets()
resultsChan := make(chan statskeeper.ProbeResult, 10)
go p.probeLoop(ctx, resultsChan)
time.Sleep(interval) // Wait for echo loop to be active.
Expand Down

0 comments on commit d6fb625

Please sign in to comment.