diff --git a/probes/grpc/grpc.go b/probes/grpc/grpc.go
index d55da041a63..14b17cb8539 100644
--- a/probes/grpc/grpc.go
+++ b/probes/grpc/grpc.go
@@ -30,6 +30,7 @@ import (
"sync"
"time"
+ "github.com/cloudprober/cloudprober/common/iputils"
"github.com/cloudprober/cloudprober/common/oauth"
"github.com/cloudprober/cloudprober/common/tlsconfig"
"github.com/cloudprober/cloudprober/logger"
@@ -196,33 +197,32 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context) {
activeTargets := make(map[string]bool)
// Create results structure and start probe loop for new targets.
- for _, tgtEp := range newTargets {
- tgt := tgtEp.Name
- if tgtEp.Port > 0 {
- tgt = net.JoinHostPort(tgtEp.Name, strconv.Itoa(tgtEp.Port))
- }
- activeTargets[tgt] = true
- if _, ok := p.results[tgt]; ok {
+ for _, target := range newTargets {
+ key := target.Key()
+
+ activeTargets[key] = true
+ if _, ok := p.results[key]; ok {
continue
}
- updatedTargets[tgt] = "ADD"
- p.results[tgt] = p.newResult(tgt)
+
+ updatedTargets[key] = "ADD"
+ p.results[key] = p.newResult(key)
probeCtx, probeCancelFunc := context.WithCancel(ctx)
for i := 0; i < int(p.c.GetNumConns()); i++ {
- go p.oneTargetLoop(probeCtx, tgt, i, p.results[tgt])
+ go p.oneTargetLoop(probeCtx, target, i, p.results[key])
}
- p.cancelFuncs[tgt] = probeCancelFunc
+ p.cancelFuncs[key] = probeCancelFunc
}
// Stop probing for deleted targets by invoking cancelFunc.
- for tgt := range p.results {
- if activeTargets[tgt] {
+ for key := range p.results {
+ if activeTargets[key] {
continue
}
- p.cancelFuncs[tgt]()
- updatedTargets[tgt] = "DELETE"
- delete(p.results, tgt)
- delete(p.cancelFuncs, tgt)
+ p.cancelFuncs[key]()
+ updatedTargets[key] = "DELETE"
+ delete(p.results, key)
+ delete(p.cancelFuncs, key)
}
p.targets = newTargets
}
@@ -232,7 +232,20 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context) {
// connection error. On success, it returns a client immediately.
// Interval between connects is controlled by connect_timeout_msec, defaulting
// to probe timeout.
-func (p *Probe) connectWithRetry(ctx context.Context, tgt, msgPattern string, result *probeRunResult) *grpc.ClientConn {
+func (p *Probe) connectWithRetry(ctx context.Context, target endpoint.Endpoint, msgPattern string, result *probeRunResult) *grpc.ClientConn {
+ addr := target.Name
+ if target.IP != nil {
+ if p.opts.IPVersion == 0 || iputils.IPVersion(target.IP) == p.opts.IPVersion {
+ addr = target.IP.String()
+ } else {
+ p.l.Warningf("target IP (%v) doesn't match probe IP version (%d), letting system resolve it", target.IP, p.opts.IPVersion)
+ }
+ }
+
+ if target.Port > 0 {
+ addr = net.JoinHostPort(addr, strconv.Itoa(target.Port))
+ }
+
connectTimeout := p.opts.Timeout
if p.c.GetConnectTimeoutMsec() > 0 {
connectTimeout = time.Duration(p.c.GetConnectTimeoutMsec()) * time.Millisecond
@@ -249,9 +262,9 @@ func (p *Probe) connectWithRetry(ctx context.Context, tgt, msgPattern string, re
connCtx, cancelFunc := context.WithTimeout(ctx, connectTimeout)
if uriScheme := p.c.GetUriScheme(); uriScheme != "" {
- tgt = uriScheme + tgt
+ addr = uriScheme + addr
}
- conn, err = grpc.DialContext(connCtx, tgt, p.dialOpts...)
+ conn, err = grpc.DialContext(connCtx, addr, p.dialOpts...)
cancelFunc()
if err != nil {
@@ -292,11 +305,11 @@ func (p *Probe) healthCheckProbe(ctx context.Context, conn *grpc.ClientConn, msg
}
// oneTargetLoop connects to and then continuously probes a single target.
-func (p *Probe) oneTargetLoop(ctx context.Context, tgt string, index int, result *probeRunResult) {
- msgPattern := fmt.Sprintf("%s,%s%s,%03d", p.src, p.c.GetUriScheme(), tgt, index)
+func (p *Probe) oneTargetLoop(ctx context.Context, tgt endpoint.Endpoint, index int, result *probeRunResult) {
+ msgPattern := fmt.Sprintf("%s,%s%s,%03d", p.src, p.c.GetUriScheme(), tgt.Name, index)
for _, al := range p.opts.AdditionalLabels {
- al.UpdateForTarget(endpoint.Endpoint{Name: tgt}, "", 0)
+ al.UpdateForTarget(tgt, "", 0)
}
conn := p.connectWithRetry(ctx, tgt, msgPattern, result)
@@ -423,7 +436,12 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
}
// Output results.
- for targetName, result := range p.results {
+ for _, target := range p.targets {
+ result, ok := p.results[target.Key()]
+ if !ok {
+ continue
+ }
+
result.Lock()
em := metrics.NewEventMetrics(ts).
AddMetric("total", result.total.Clone()).
@@ -431,12 +449,18 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
AddMetric(p.opts.LatencyMetricName, result.latency.Clone()).
AddMetric("connecterrors", result.connectErrors.Clone()).
AddLabel("ptype", "grpc").
- AddLabel("probe", p.name).
- AddLabel("dst", targetName)
+ AddLabel("probe", p.name)
result.Unlock()
+
+ if target.Port == 0 {
+ em.AddLabel("dst", target.Name)
+ } else {
+ em.AddLabel("dst", net.JoinHostPort(target.Name, strconv.Itoa(target.Port)))
+ }
+
em.LatencyUnit = p.opts.LatencyUnit
for _, al := range p.opts.AdditionalLabels {
- em.AddLabel(al.KeyValueForTarget(endpoint.Endpoint{Name: targetName}))
+ em.AddLabel(al.KeyValueForTarget(target))
}
p.opts.LogMetrics(em)
dataChan <- em