Skip to content

Commit

Permalink
[probes.grpc] Use discovered target's IP for connection
Browse files Browse the repository at this point in the history
  • Loading branch information
manugarg committed Mar 10, 2023
1 parent f1b578e commit f48ad5d
Showing 1 changed file with 51 additions and 27 deletions.
78 changes: 51 additions & 27 deletions probes/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync"
"time"

"github.com/cloudprober/cloudprober/common/iputils"
"github.com/cloudprober/cloudprober/common/oauth"
"github.com/cloudprober/cloudprober/common/tlsconfig"
"github.com/cloudprober/cloudprober/logger"
Expand Down Expand Up @@ -196,33 +197,32 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context) {

activeTargets := make(map[string]bool)
// Create results structure and start probe loop for new targets.
for _, tgtEp := range newTargets {
tgt := tgtEp.Name
if tgtEp.Port > 0 {
tgt = net.JoinHostPort(tgtEp.Name, strconv.Itoa(tgtEp.Port))
}
activeTargets[tgt] = true
if _, ok := p.results[tgt]; ok {
for _, target := range newTargets {
key := target.Key()

activeTargets[key] = true
if _, ok := p.results[key]; ok {
continue
}
updatedTargets[tgt] = "ADD"
p.results[tgt] = p.newResult(tgt)

updatedTargets[key] = "ADD"
p.results[key] = p.newResult(key)
probeCtx, probeCancelFunc := context.WithCancel(ctx)
for i := 0; i < int(p.c.GetNumConns()); i++ {
go p.oneTargetLoop(probeCtx, tgt, i, p.results[tgt])
go p.oneTargetLoop(probeCtx, target, i, p.results[key])
}
p.cancelFuncs[tgt] = probeCancelFunc
p.cancelFuncs[key] = probeCancelFunc
}

// Stop probing for deleted targets by invoking cancelFunc.
for tgt := range p.results {
if activeTargets[tgt] {
for key := range p.results {
if activeTargets[key] {
continue
}
p.cancelFuncs[tgt]()
updatedTargets[tgt] = "DELETE"
delete(p.results, tgt)
delete(p.cancelFuncs, tgt)
p.cancelFuncs[key]()
updatedTargets[key] = "DELETE"
delete(p.results, key)
delete(p.cancelFuncs, key)
}
p.targets = newTargets
}
Expand All @@ -232,7 +232,20 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context) {
// connection error. On success, it returns a client immediately.
// Interval between connects is controlled by connect_timeout_msec, defaulting
// to probe timeout.
func (p *Probe) connectWithRetry(ctx context.Context, tgt, msgPattern string, result *probeRunResult) *grpc.ClientConn {
func (p *Probe) connectWithRetry(ctx context.Context, target endpoint.Endpoint, msgPattern string, result *probeRunResult) *grpc.ClientConn {
addr := target.Name
if target.IP != nil {
if p.opts.IPVersion == 0 || iputils.IPVersion(target.IP) == p.opts.IPVersion {
addr = target.IP.String()
} else {
p.l.Warningf("target IP (%v) doesn't match probe IP version (%d), letting system resolve it", target.IP, p.opts.IPVersion)
}
}

if target.Port > 0 {
addr = net.JoinHostPort(addr, strconv.Itoa(target.Port))
}

connectTimeout := p.opts.Timeout
if p.c.GetConnectTimeoutMsec() > 0 {
connectTimeout = time.Duration(p.c.GetConnectTimeoutMsec()) * time.Millisecond
Expand All @@ -249,9 +262,9 @@ func (p *Probe) connectWithRetry(ctx context.Context, tgt, msgPattern string, re
connCtx, cancelFunc := context.WithTimeout(ctx, connectTimeout)

if uriScheme := p.c.GetUriScheme(); uriScheme != "" {
tgt = uriScheme + tgt
addr = uriScheme + addr
}
conn, err = grpc.DialContext(connCtx, tgt, p.dialOpts...)
conn, err = grpc.DialContext(connCtx, addr, p.dialOpts...)

cancelFunc()
if err != nil {
Expand Down Expand Up @@ -292,11 +305,11 @@ func (p *Probe) healthCheckProbe(ctx context.Context, conn *grpc.ClientConn, msg
}

// oneTargetLoop connects to and then continuously probes a single target.
func (p *Probe) oneTargetLoop(ctx context.Context, tgt string, index int, result *probeRunResult) {
msgPattern := fmt.Sprintf("%s,%s%s,%03d", p.src, p.c.GetUriScheme(), tgt, index)
func (p *Probe) oneTargetLoop(ctx context.Context, tgt endpoint.Endpoint, index int, result *probeRunResult) {
msgPattern := fmt.Sprintf("%s,%s%s,%03d", p.src, p.c.GetUriScheme(), tgt.Name, index)

for _, al := range p.opts.AdditionalLabels {
al.UpdateForTarget(endpoint.Endpoint{Name: tgt}, "", 0)
al.UpdateForTarget(tgt, "", 0)
}

conn := p.connectWithRetry(ctx, tgt, msgPattern, result)
Expand Down Expand Up @@ -423,20 +436,31 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
}

// Output results.
for targetName, result := range p.results {
for _, target := range p.targets {
result, ok := p.results[target.Key()]
if !ok {
continue
}

result.Lock()
em := metrics.NewEventMetrics(ts).
AddMetric("total", result.total.Clone()).
AddMetric("success", result.success.Clone()).
AddMetric(p.opts.LatencyMetricName, result.latency.Clone()).
AddMetric("connecterrors", result.connectErrors.Clone()).
AddLabel("ptype", "grpc").
AddLabel("probe", p.name).
AddLabel("dst", targetName)
AddLabel("probe", p.name)
result.Unlock()

if target.Port == 0 {
em.AddLabel("dst", target.Name)
} else {
em.AddLabel("dst", net.JoinHostPort(target.Name, strconv.Itoa(target.Port)))
}

em.LatencyUnit = p.opts.LatencyUnit
for _, al := range p.opts.AdditionalLabels {
em.AddLabel(al.KeyValueForTarget(endpoint.Endpoint{Name: targetName}))
em.AddLabel(al.KeyValueForTarget(target))
}
p.opts.LogMetrics(em)
dataChan <- em
Expand Down

0 comments on commit f48ad5d

Please sign in to comment.