Skip to content

Commit

Permalink
Don't verify the payload pattern in case of a response read error.
Browse files Browse the repository at this point in the history
Also, to improve the readability, refactor the HTTP request handling part of the runProbe into a separate function: httpRequest.

(Note: This change also includes some unrelated TODO cleanups.)

PiperOrigin-RevId: 193549658
  • Loading branch information
manugarg committed Apr 19, 2018
1 parent 38d3375 commit 71c84f5
Show file tree
Hide file tree
Showing 26 changed files with 76 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cloudprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (pr *Prober) init() error {
}

// Initialize servers
// TODO: Plumb init context from cmd/cloudprober.
// TODO(manugarg): Plumb init context from cmd/cloudprober.
initCtx, cancelFunc := context.WithCancel(context.TODO())
pr.Servers, err = servers.Init(initCtx, pr.c.GetServer())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
type Logger struct {
logc *logging.Client
logger *logging.Logger
// TODO: Logger should eventually embed the probe id and each probe
// TODO(manugarg): Logger should eventually embed the probe id and each probe
// should get a different Logger object (embedding that probe's probe id) but
// sharing the same logging client. We could then make probe id one of the
// metadata on all logging messages.
Expand Down
4 changes: 2 additions & 2 deletions metrics/eventmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (em *EventMetrics) AddMetric(name string, val Value) *EventMetrics {
defer em.mu.Unlock()

if _, ok := em.metrics[name]; ok {
// TODO: We should probably log such cases. We'll have to
// TODO(manugarg): We should probably log such cases. We'll have to
// plumb logger for that.
return em
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (em *EventMetrics) AddLabel(name string, val string) *EventMetrics {
em.mu.Lock()
defer em.mu.Unlock()
if _, ok := em.labels[name]; ok {
// TODO: We should probably log such cases. We'll have to
// TODO(manugarg): We should probably log such cases. We'll have to
// plumb logger for that.
return em
}
Expand Down
2 changes: 1 addition & 1 deletion metrics/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewMap(mapName string, defaultValue NumValue) *Map {
}

// GetKey returns the given key's value.
// TODO: We should probably add a way to get the list of all the keys in the
// TODO(manugarg): We should probably add a way to get the list of all the keys in the
// map.
func (m *Map) GetKey(key string) NumValue {
m.mu.RLock()
Expand Down
2 changes: 1 addition & 1 deletion metrics/proto/dist.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ message Dist {
// value. Example: 0.5,1,2,4,8.
string explicit_buckets = 1;
// Exponential buckets are not supported yet.
// TODO: Implement support for exponential buckets.
// TODO(manugarg): Implement support for exponential buckets.
ExponentialBuckets exponential_buckets = 2;
}
}
Expand Down
2 changes: 1 addition & 1 deletion probes/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
resultsChan := make(chan probeutils.ProbeResult, len(p.targets))

// This function is used by StatsKeeper to get the latest list of targets.
// TODO: Make p.targets mutex protected as it's read and written by concurrent goroutines.
// TODO(manugarg): Make p.targets mutex protected as it's read and written by concurrent goroutines.
targetsFunc := func() []string {
return p.targets
}
Expand Down
4 changes: 2 additions & 2 deletions probes/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ started for each probe run cycle, while in "server" mode, external process is
started only if it's not running already and Cloudprober communicates with it
over stdin/stdout for each probe cycle.
TODO: Add a way to test this program. Write another program that
TODO(manugarg): Add a way to test this program. Write another program that
implements the probe server protocol and use that for testing.
*/
package external
Expand Down Expand Up @@ -56,7 +56,7 @@ var (
// Note that this value impacts the effective timeout for a target as timeout
// is applied for all the targets in aggregate. For example, 100th target in
// the targets list will have the effective timeout of (timeout - 1ms).
// TODO: Make sure that the last target in the list has an impact of
// TODO(manugarg): Make sure that the last target in the list has an impact of
// less than 1% on its timeout.
TimeBetweenRequests = 10 * time.Microsecond
)
Expand Down
2 changes: 1 addition & 1 deletion probes/external/proto/server.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion probes/external/proto/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ message ProbeReply {
// The result of the probe. Cloudprober parses the payload to retrieve
// variables from it. It expects variables in the following format:
// var1 value1 (for example: total_errors 589)
// TODO: Add an option to export mapped variables, for example:
// TODO(manugarg): Add an option to export mapped variables, for example:
// client-errors map:lang java:200 python:20 golang:3
optional string payload = 3;
};
86 changes: 46 additions & 40 deletions probes/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,55 @@ func isClientTimeout(err error) bool {
return false
}

func (p *Probe) runProbe(resultsChan chan<- probeutils.ProbeResult) {
// httpRequest executes an HTTP request and updates the provided result struct.
func (p *Probe) httpRequest(req *http.Request, result *probeRunResult) {
start := time.Now()
result.total.Inc()
resp, err := p.client.Do(req)
latency := time.Since(start)

if err != nil {
if isClientTimeout(err) {
p.l.Warningf("Target:%s, URL:%s, http.runProbe: timeout error: %v", req.Host, req.URL.String(), err)
result.timeouts.Inc()
return
}
p.l.Warningf("Target(%s): client.Get: %v", req.Host, err)
return
}

respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
p.l.Warningf("Target:%s, URL:%s, http.runProbe: error in reading response from target: %v", req.Host, req.URL.String(), err)
return
}

// Calling Body.Close() allows the TCP connection to be reused.
resp.Body.Close()
result.respCodes.IncKey(fmt.Sprintf("%d", resp.StatusCode))
if p.c.GetIntegrityCheckPattern() != "" && resp.StatusCode == http.StatusOK {
err := probeutils.VerifyPayloadPattern(respBody, []byte(p.c.GetIntegrityCheckPattern()))
if err != nil {
// TODO(manugarg): Increment a counter on data corruption.
p.l.Errorf("Target:%s, URL:%s, http.runProbe: possible data corruption, response integrity check failed: %s", req.Host, req.URL.String(), err.Error())
return
}
}

result.success.Inc()
result.latency.AddFloat64(latency.Seconds() / p.opts.LatencyUnit.Seconds())
if p.c.GetExportResponseAsMetrics() {
if len(respBody) <= maxResponseSizeForMetrics {
result.respBodies.IncKey(string(respBody))
}
}
}

func (p *Probe) runProbe(resultsChan chan<- probeutils.ProbeResult) {
// Refresh the list of targets to probe.
p.targets = p.opts.Targets.List()

wg := sync.WaitGroup{}

for _, target := range p.targets {
wg.Add(1)

Expand Down Expand Up @@ -194,43 +236,7 @@ func (p *Probe) runProbe(resultsChan chan<- probeutils.ProbeResult) {
req.Host = target

for i := 0; i < int(p.c.GetRequestsPerProbe()); i++ {
start := time.Now()
result.total.Inc()
resp, err := p.client.Do(req)
latency := time.Since(start)

if err != nil {
if isClientTimeout(err) {
p.l.Warningf("Target:%s, URL:%s, http.runProbe: timeout error: %v", target, req.URL.String(), err)
result.timeouts.Inc()
} else {
p.l.Warningf("Target(%s): client.Get: %v", target, err)
}
} else {
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
p.l.Warningf("Target:%s, URL:%s, http.runProbe: error in reading response from target: %v", target, req.URL.String(), err)
}
// Calling Body.Close() allows the TCP connection to be reused.
resp.Body.Close()
result.respCodes.IncKey(fmt.Sprintf("%d", resp.StatusCode))
if p.c.GetIntegrityCheckPattern() != "" {
err := probeutils.VerifyPayloadPattern(respBody, []byte(p.c.GetIntegrityCheckPattern()))
if err != nil {
// TODO: Increment a counter on data corruption.
p.l.Errorf("Target:%s, URL:%s, http.runProbe: possible data corruption, response integrity check failed: %s", target, req.URL.String(), err.Error())
continue
}
}
result.success.Inc()
result.latency.AddFloat64(latency.Seconds() / p.opts.LatencyUnit.Seconds())
if p.c.GetExportResponseAsMetrics() {
if len(respBody) <= maxResponseSizeForMetrics {
result.respBodies.IncKey(string(respBody))
}
}
}

p.httpRequest(req, &result)
time.Sleep(time.Duration(p.c.GetRequestsIntervalMsec()) * time.Millisecond)
}
resultsChan <- result
Expand All @@ -253,7 +259,7 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
resultsChan := make(chan probeutils.ProbeResult, len(p.targets))

// This function is used by StatsKeeper to get the latest list of targets.
// TODO: Make p.targets mutex protected as it's read and written by concurrent goroutines.
// TODO(manugarg): Make p.targets mutex protected as it's read and written by concurrent goroutines.
targetsFunc := func() []string {
return p.targets
}
Expand Down
2 changes: 1 addition & 1 deletion probes/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (p *Probe) setSourceFromConfig() error {
if err != nil {
return fmt.Errorf("error getting hostname from OS: %v", err)
}
// TODO: This name should resolve for the listen method to work.
// TODO(manugarg): This name should resolve for the listen method to work.
// We should probably change "listen" to use 0.0.0.0 if p.source doesn't
// resolve.
p.source = hostname
Expand Down
2 changes: 1 addition & 1 deletion probes/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func RegisterUserDefined(name string, probe Probe) {
// RegisterProbeType registers a new probe-type. New probe types are integrated
// with the config subsystem using the protobuf extensions.
//
// TODO: Add a full example of using extensions.
// TODO(manugarg): Add a full example of using extensions.
func RegisterProbeType(extensionFieldNo int, newProbeFunc func() Probe) {
extensionMapMu.Lock()
defer extensionMapMu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion probes/udp/proto/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion probes/udp/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ message ProbeConf {
optional int32 stats_export_interval_msec = 2 [default = 10000];
// Port to send UDP Ping to (UDP Echo). Should be same as
// ProberConfig.udp_echo_server_port.
// TODO: Can we just read this from ProberConfig?
// TODO(): Can we just read this from ProberConfig?
optional int32 port = 3 [default = 31122];

// Number of sending side ports to use.
Expand Down
4 changes: 2 additions & 2 deletions probes/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
// maxTargets is the maximum number of targets supported by this probe type.
// If there are more targets, they are pruned from the list to bring targets
// list under maxTargets.
// TODO: Make it configurable with documentation on its implication
// TODO(manugarg): Make it configurable with documentation on its implication
// on resource consumption.
maxTargets = 500
)
Expand Down Expand Up @@ -312,7 +312,7 @@ func (p *Probe) runProbe() {

// Set writeTimeout such that we can go over all targets twice (to provide
// enough buffer) within a probe interval.
// TODO: Consider using per-conn goroutines to send packets over
// TODO(manugarg): Consider using per-conn goroutines to send packets over
// UDP sockets just like recvLoop().
writeTimeout := p.opts.Interval / time.Duration(2*len(p.targets))

Expand Down
2 changes: 1 addition & 1 deletion servers/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *Server) Start(ctx context.Context, dataChan chan<- *metrics.EventMetric
}

func readAndEcho(conn *net.UDPConn, l *logger.Logger) {
// TODO: We read and echo back only 4098 bytes. We should look at raising this
// TODO(manugarg): We read and echo back only 4098 bytes. We should look at raising this
// limit or making it configurable. Also of note, ReadFromUDP reads a single UDP datagram
// (up to the max size of 64K-sizeof(UDPHdr)) and discards the rest.
buf := make([]byte, 4098)
Expand Down
4 changes: 2 additions & 2 deletions surfacers/stackdriver/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func New(config *configpb.SurfacerConf, l *logger.Logger) (*SDSurfacer, error) {
l: l,
}

// TODO: Validate that the config has all the necessary
// TODO(brrcrites): Validate that the config has all the necessary
// values

// Find all the necessary information for writing metrics to Stack
Expand All @@ -103,7 +103,7 @@ func New(config *configpb.SurfacerConf, l *logger.Logger) (*SDSurfacer, error) {
}

// Create monitoring client
// TODO: Currently we don't make use of the context to timeout the
// TODO(manugarg): Currently we don't make use of the context to timeout the
// requests, but we should.
httpClient, err := google.DefaultClient(context.TODO(), monitoring.CloudPlatformScope)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion surfacers/surfacers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func initSurfacer(s *surfacerpb.SurfacerDef) (Surfacer, error) {
logName = strings.ToLower(s.GetType().String())
}

// TODO: Plumb context here too.
// TODO(manugarg): Plumb context here too.
l, err := logger.New(context.TODO(), logName)
if err != nil {
return nil, fmt.Errorf("unable to create cloud logger: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion sysvars/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func counterRuntimeVars(dataChan chan *metrics.EventMetrics, ts time.Time, m *ru
// Time since this module started.
timeSince := time.Since(startTime).Seconds()
em.AddMetric("uptime_msec", metrics.NewFloat(timeSince*1000))
// TODO: Deprecate "uptime" in favor of "uptime_msec".
// TODO(manugarg): Deprecate "uptime" in favor of "uptime_msec".
em.AddMetric("uptime", metrics.NewInt(int64(timeSince)))

// GC memory stats
Expand Down
2 changes: 1 addition & 1 deletion targets/gce/forwarding_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (
// Note that because this uses the GCLOUD API, GCE staging is unable to use this
// target type. See b/26320525 for more on this.
//
// TODO: The cache layer provided by this, instances, lameduck, and resolver
// TODO(izzycecil): The cache layer provided by this, instances, lameduck, and resolver
// are all pretty similar. RTC will need a similar cache. I should
// abstract out this whole cache layer. It will be more testable that
// way, and probably more readable, as well.
Expand Down
2 changes: 1 addition & 1 deletion targets/lameduck/lameduck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO: Add more tests after a bit of refactoring.
// TODO(manugarg): Add more tests after a bit of refactoring.

package lameduck

Expand Down
2 changes: 1 addition & 1 deletion targets/lameduck/proto/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion targets/lameduck/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ message Options {

// Lame duck targets runtime config name. An operator will create a variable
// here to mark a target as lame-ducked.
// TODO: This name needs to be changed.
// TODO(izzycecil): This name needs to be changed.
optional string runtimeconfig_name = 3 [default = "lame-duck-targets"];

// Lame duck expiration time. We ignore variables (targets) that have been
Expand Down
2 changes: 1 addition & 1 deletion targets/rtc/rtc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ Nextrow:
}
// List targets
gotlist := targs.List()
// TODO: Need to catch errors with something like this. This
// TODO(izzycecil): Need to catch errors with something like this. This
// requires looking at the logger.
// if (err != nil) != r.wantError {
// t.Errorf("%v: targs.List() gave error %q. r.wantError = %v", r.name, err, r.wantError)
Expand Down
4 changes: 2 additions & 2 deletions targets/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func New(targetsDef *targetspb.TargetsDef, ldLister lameduck.Lister, targetOpts
}
t.lister, t.resolver = s, s
case *targetspb.TargetsDef_RtcTargets:
// TODO: we should really consolidate all these metadata calls
// TODO(izzycecil): we should really consolidate all these metadata calls
// to one place.
proj, err := metadata.ProjectID()
if err != nil {
Expand Down Expand Up @@ -318,7 +318,7 @@ func getExtensionTargets(pb *targetspb.TargetsDef, l *logger.Logger) (Targets, e
// RegisterTargetsType registers a new targets type. New targets types are
// integrated with the config subsystem using the protobuf extensions.
//
// TODO: Add a full example of using extensions.
// TODO(manugarg): Add a full example of using extensions.
func RegisterTargetsType(extensionFieldNo int, newTargetsFunc func(interface{}, *logger.Logger) (Targets, error)) {
extensionMapMu.Lock()
defer extensionMapMu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ func configHandler(w http.ResponseWriter, r *http.Request) {
// Init initializes cloudprober web interface handler.
func Init() {
http.HandleFunc("/config", configHandler)
// TODO: Add a status handler that shows the running probes and
// TODO(manugarg): Add a status handler that shows the running probes and
// their configs.
}

0 comments on commit 71c84f5

Please sign in to comment.