From aade6e78b2b94fff3c5dabcf40ad875570ecffe2 Mon Sep 17 00:00:00 2001 From: Jerzy Szczepkowski Date: Thu, 10 Dec 2015 15:46:22 +0100 Subject: [PATCH] Added handling custom metrics to resource consumer. Added handling custom metrics in Prometheus format to resource consumer. --- test/images/resource-consumer/README.md | 20 +-- .../resource-consumer/resource_consumer.go | 2 +- .../resource_consumer_handler.go | 125 ++++++++++++++---- 3 files changed, 112 insertions(+), 35 deletions(-) diff --git a/test/images/resource-consumer/README.md b/test/images/resource-consumer/README.md index de2d9ba593dd6..a0b11de5c6cdd 100644 --- a/test/images/resource-consumer/README.md +++ b/test/images/resource-consumer/README.md @@ -1,7 +1,6 @@ # Resource Consumer ## Overview - Resource Consumer is a tool which allows to generate cpu/memory utilization in a container. The reason why it was created is testing kubernetes autoscaling. Resource Consumer can help with autoscaling tests for: @@ -10,8 +9,7 @@ Resource Consumer can help with autoscaling tests for: - vertical autoscaling of pod - changing its resource limits. ## Usage - -Resource Consumer starts an HTTP server and handle sended requests. +Resource Consumer starts an HTTP server and handle sent requests. It listens on port given as a flag (default 8080). Action of consuming resources is send to the container by a POST http request. Each http request creates new process. @@ -20,12 +18,10 @@ Http request handler is in file resource_consumer_handler.go The container consumes specified amount of resources: - CPU in millicores, -- Memory in megabytes. - - +- Memory in megabytes, +- Fake custom metrics. ###Consume CPU http request - - suffix "ConsumeCPU", - parameters "millicores" and "durationSec". @@ -36,13 +32,19 @@ and if consumption is too high binary sleeps for 10 millisecond. One replica of Resource Consumer cannot consume more that 1 cpu. ###Consume Memory http request - - suffix "ConsumeMem", - parameters "megabytes" and "durationSec". Consumes specified amount of megabytes for durationSec seconds. Consume Memory uses stress tool (stress -m 1 --vm-bytes megabytes --vm-hang 0 -t durationSec). -Request leading to consumig more memory then container limit will be ignored. +Request leading to consuming more memory then container limit will be ignored. + +###Bump value of a fake custom metric +- suffix "BumpMetric", +- parameters "metric", "delta" and "durationSec". + +Bumps metric with given name by delta for durationSec seconds. +Custom metrics in Prometheus format are exposed on "/metrics" endpoint. ###CURL example ```console diff --git a/test/images/resource-consumer/resource_consumer.go b/test/images/resource-consumer/resource_consumer.go index 61cf558164dba..a76dc1d186011 100644 --- a/test/images/resource-consumer/resource_consumer.go +++ b/test/images/resource-consumer/resource_consumer.go @@ -27,6 +27,6 @@ var port = flag.Int("port", 8080, "Port number.") func main() { flag.Parse() - var resourceConsumerHandler ResourceConsumerHandler + resourceConsumerHandler := NewResourceConsumerHandler() log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), resourceConsumerHandler)) } diff --git a/test/images/resource-consumer/resource_consumer_handler.go b/test/images/resource-consumer/resource_consumer_handler.go index 928064aed5da6..464a27e7580d1 100644 --- a/test/images/resource-consumer/resource_consumer_handler.go +++ b/test/images/resource-consumer/resource_consumer_handler.go @@ -21,6 +21,8 @@ import ( "net/http" "net/url" "strconv" + "sync" + "time" ) const ( @@ -30,17 +32,34 @@ const ( notGivenFunctionArgument = "not given function argument" consumeCPUAddress = "/ConsumeCPU" consumeMemAddress = "/ConsumeMem" + bumpMetricAddress = "/BumpMetric" getCurrentStatusAddress = "/GetCurrentStatus" + metricsAddress = "/metrics" millicoresQuery = "millicores" megabytesQuery = "megabytes" + metricNameQuery = "metric" + deltaQuery = "delta" durationSecQuery = "durationSec" ) -type ResourceConsumerHandler struct{} +type ResourceConsumerHandler struct { + metrics map[string]float64 + metricsLock sync.Mutex +} + +func NewResourceConsumerHandler() ResourceConsumerHandler { + return ResourceConsumerHandler{metrics: map[string]float64{}} +} func (handler ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + // handle exposing metrics in Prometheus format (both GET & POST) + if req.URL.Path == metricsAddress { + handler.handleMetrics(w) + return + } if req.Method != "POST" { http.Error(w, badRequest, http.StatusBadRequest) + return } // parsing POST request data and URL data if err := req.ParseForm(); err != nil { @@ -62,6 +81,11 @@ func (handler ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *htt handler.handleGetCurrentStatus(w) return } + // handle bumpMetric + if req.URL.Path == bumpMetricAddress { + handler.handleBumpMetric(w, req.Form) + return + } http.Error(w, unknownFunction, http.StatusNotFound) } @@ -72,21 +96,20 @@ func (handler ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, q if durationSecString == "" || millicoresString == "" { http.Error(w, notGivenFunctionArgument, http.StatusBadRequest) return - } else { - // convert data (strings to ints) for consumeCPU - durationSec, durationSecError := strconv.Atoi(durationSecString) - millicores, millicoresError := strconv.Atoi(millicoresString) - if durationSecError != nil || millicoresError != nil { - http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) - return - } - go ConsumeCPU(millicores, durationSec) - fmt.Fprintln(w, consumeCPUAddress[1:]) - fmt.Fprintln(w, millicores, millicoresQuery) - fmt.Fprintln(w, durationSec, durationSecQuery) + } + // convert data (strings to ints) for consumeCPU + durationSec, durationSecError := strconv.Atoi(durationSecString) + millicores, millicoresError := strconv.Atoi(millicoresString) + if durationSecError != nil || millicoresError != nil { + http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) + return } + go ConsumeCPU(millicores, durationSec) + fmt.Fprintln(w, consumeCPUAddress[1:]) + fmt.Fprintln(w, millicores, millicoresQuery) + fmt.Fprintln(w, durationSec, durationSecQuery) } func (handler ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) { @@ -96,19 +119,20 @@ func (handler ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, q if durationSecString == "" || megabytesString == "" { http.Error(w, notGivenFunctionArgument, http.StatusBadRequest) return - } else { - // convert data (strings to ints) for consumeMem - durationSec, durationSecError := strconv.Atoi(durationSecString) - megabytes, megabytesError := strconv.Atoi(megabytesString) - if durationSecError != nil || megabytesError != nil { - http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) - return - } - go ConsumeMem(megabytes, durationSec) - fmt.Fprintln(w, consumeMemAddress[1:]) - fmt.Fprintln(w, megabytes, megabytesQuery) - fmt.Fprintln(w, durationSec, durationSecQuery) } + + // convert data (strings to ints) for consumeMem + durationSec, durationSecError := strconv.Atoi(durationSecString) + megabytes, megabytesError := strconv.Atoi(megabytesString) + if durationSecError != nil || megabytesError != nil { + http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) + return + } + + go ConsumeMem(megabytes, durationSec) + fmt.Fprintln(w, consumeMemAddress[1:]) + fmt.Fprintln(w, megabytes, megabytesQuery) + fmt.Fprintln(w, durationSec, durationSecQuery) } func (handler ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) { @@ -116,3 +140,54 @@ func (handler ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWri fmt.Fprintln(w, "Warning: not implemented!") fmt.Fprint(w, getCurrentStatusAddress[1:]) } + +func (handler ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) { + handler.metricsLock.Lock() + defer handler.metricsLock.Unlock() + for k, v := range handler.metrics { + fmt.Fprintf(w, "# HELP %s info message.\n", k) + fmt.Fprintf(w, "# TYPE %s gauge\n", k) + fmt.Fprintf(w, "%s %f\n", k, v) + } +} + +func (handler ResourceConsumerHandler) bumpMetric(metric string, delta float64, duration time.Duration) { + handler.metricsLock.Lock() + if _, ok := handler.metrics[metric]; ok { + handler.metrics[metric] += delta + } else { + handler.metrics[metric] = delta + } + handler.metricsLock.Unlock() + + time.Sleep(duration) + + handler.metricsLock.Lock() + handler.metrics[metric] -= delta + handler.metricsLock.Unlock() +} + +func (handler ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) { + // geting string data for handleBumpMetric + metric := query.Get(metricNameQuery) + deltaString := query.Get(deltaQuery) + durationSecString := query.Get(durationSecQuery) + if durationSecString == "" || metric == "" || deltaString == "" { + http.Error(w, notGivenFunctionArgument, http.StatusBadRequest) + return + } + + // convert data (strings to ints/floats) for handleBumpMetric + durationSec, durationSecError := strconv.Atoi(durationSecString) + delta, deltaError := strconv.ParseFloat(deltaString, 64) + if durationSecError != nil || deltaError != nil { + http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) + return + } + + go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second) + fmt.Fprintln(w, bumpMetricAddress[1:]) + fmt.Fprintln(w, metric, metricNameQuery) + fmt.Fprintln(w, delta, deltaQuery) + fmt.Fprintln(w, durationSec, durationSecQuery) +}