diff --git a/probes/grpc/grpc.go b/probes/grpc/grpc.go index d55da041a63..14b17cb8539 100644 --- a/probes/grpc/grpc.go +++ b/probes/grpc/grpc.go @@ -30,6 +30,7 @@ import ( "sync" "time" + "github.com/cloudprober/cloudprober/common/iputils" "github.com/cloudprober/cloudprober/common/oauth" "github.com/cloudprober/cloudprober/common/tlsconfig" "github.com/cloudprober/cloudprober/logger" @@ -196,33 +197,32 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context) { activeTargets := make(map[string]bool) // Create results structure and start probe loop for new targets. - for _, tgtEp := range newTargets { - tgt := tgtEp.Name - if tgtEp.Port > 0 { - tgt = net.JoinHostPort(tgtEp.Name, strconv.Itoa(tgtEp.Port)) - } - activeTargets[tgt] = true - if _, ok := p.results[tgt]; ok { + for _, target := range newTargets { + key := target.Key() + + activeTargets[key] = true + if _, ok := p.results[key]; ok { continue } - updatedTargets[tgt] = "ADD" - p.results[tgt] = p.newResult(tgt) + + updatedTargets[key] = "ADD" + p.results[key] = p.newResult(key) probeCtx, probeCancelFunc := context.WithCancel(ctx) for i := 0; i < int(p.c.GetNumConns()); i++ { - go p.oneTargetLoop(probeCtx, tgt, i, p.results[tgt]) + go p.oneTargetLoop(probeCtx, target, i, p.results[key]) } - p.cancelFuncs[tgt] = probeCancelFunc + p.cancelFuncs[key] = probeCancelFunc } // Stop probing for deleted targets by invoking cancelFunc. - for tgt := range p.results { - if activeTargets[tgt] { + for key := range p.results { + if activeTargets[key] { continue } - p.cancelFuncs[tgt]() - updatedTargets[tgt] = "DELETE" - delete(p.results, tgt) - delete(p.cancelFuncs, tgt) + p.cancelFuncs[key]() + updatedTargets[key] = "DELETE" + delete(p.results, key) + delete(p.cancelFuncs, key) } p.targets = newTargets } @@ -232,7 +232,20 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context) { // connection error. On success, it returns a client immediately. // Interval between connects is controlled by connect_timeout_msec, defaulting // to probe timeout. -func (p *Probe) connectWithRetry(ctx context.Context, tgt, msgPattern string, result *probeRunResult) *grpc.ClientConn { +func (p *Probe) connectWithRetry(ctx context.Context, target endpoint.Endpoint, msgPattern string, result *probeRunResult) *grpc.ClientConn { + addr := target.Name + if target.IP != nil { + if p.opts.IPVersion == 0 || iputils.IPVersion(target.IP) == p.opts.IPVersion { + addr = target.IP.String() + } else { + p.l.Warningf("target IP (%v) doesn't match probe IP version (%d), letting system resolve it", target.IP, p.opts.IPVersion) + } + } + + if target.Port > 0 { + addr = net.JoinHostPort(addr, strconv.Itoa(target.Port)) + } + connectTimeout := p.opts.Timeout if p.c.GetConnectTimeoutMsec() > 0 { connectTimeout = time.Duration(p.c.GetConnectTimeoutMsec()) * time.Millisecond @@ -249,9 +262,9 @@ func (p *Probe) connectWithRetry(ctx context.Context, tgt, msgPattern string, re connCtx, cancelFunc := context.WithTimeout(ctx, connectTimeout) if uriScheme := p.c.GetUriScheme(); uriScheme != "" { - tgt = uriScheme + tgt + addr = uriScheme + addr } - conn, err = grpc.DialContext(connCtx, tgt, p.dialOpts...) + conn, err = grpc.DialContext(connCtx, addr, p.dialOpts...) cancelFunc() if err != nil { @@ -292,11 +305,11 @@ func (p *Probe) healthCheckProbe(ctx context.Context, conn *grpc.ClientConn, msg } // oneTargetLoop connects to and then continuously probes a single target. -func (p *Probe) oneTargetLoop(ctx context.Context, tgt string, index int, result *probeRunResult) { - msgPattern := fmt.Sprintf("%s,%s%s,%03d", p.src, p.c.GetUriScheme(), tgt, index) +func (p *Probe) oneTargetLoop(ctx context.Context, tgt endpoint.Endpoint, index int, result *probeRunResult) { + msgPattern := fmt.Sprintf("%s,%s%s,%03d", p.src, p.c.GetUriScheme(), tgt.Name, index) for _, al := range p.opts.AdditionalLabels { - al.UpdateForTarget(endpoint.Endpoint{Name: tgt}, "", 0) + al.UpdateForTarget(tgt, "", 0) } conn := p.connectWithRetry(ctx, tgt, msgPattern, result) @@ -423,7 +436,12 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics) } // Output results. - for targetName, result := range p.results { + for _, target := range p.targets { + result, ok := p.results[target.Key()] + if !ok { + continue + } + result.Lock() em := metrics.NewEventMetrics(ts). AddMetric("total", result.total.Clone()). @@ -431,12 +449,18 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics) AddMetric(p.opts.LatencyMetricName, result.latency.Clone()). AddMetric("connecterrors", result.connectErrors.Clone()). AddLabel("ptype", "grpc"). - AddLabel("probe", p.name). - AddLabel("dst", targetName) + AddLabel("probe", p.name) result.Unlock() + + if target.Port == 0 { + em.AddLabel("dst", target.Name) + } else { + em.AddLabel("dst", net.JoinHostPort(target.Name, strconv.Itoa(target.Port))) + } + em.LatencyUnit = p.opts.LatencyUnit for _, al := range p.opts.AdditionalLabels { - em.AddLabel(al.KeyValueForTarget(endpoint.Endpoint{Name: targetName})) + em.AddLabel(al.KeyValueForTarget(target)) } p.opts.LogMetrics(em) dataChan <- em