Skip to content

Commit

Permalink
Merge pull request kubernetes#18511 from jszczepkowski/cm-consumer
Browse files Browse the repository at this point in the history
Auto commit by PR queue bot
  • Loading branch information
k8s-merge-robot committed Dec 10, 2015
2 parents 0127c45 + aade6e7 commit ba30690
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 35 deletions.
20 changes: 11 additions & 9 deletions test/images/resource-consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Resource Consumer

## Overview

Resource Consumer is a tool which allows to generate cpu/memory utilization in a container.
The reason why it was created is testing kubernetes autoscaling.
Resource Consumer can help with autoscaling tests for:
Expand All @@ -10,8 +9,7 @@ Resource Consumer can help with autoscaling tests for:
- vertical autoscaling of pod - changing its resource limits.

## Usage

Resource Consumer starts an HTTP server and handle sended requests.
Resource Consumer starts an HTTP server and handle sent requests.
It listens on port given as a flag (default 8080).
Action of consuming resources is send to the container by a POST http request.
Each http request creates new process.
Expand All @@ -20,12 +18,10 @@ Http request handler is in file resource_consumer_handler.go
The container consumes specified amount of resources:

- CPU in millicores,
- Memory in megabytes.


- Memory in megabytes,
- Fake custom metrics.

###Consume CPU http request

- suffix "ConsumeCPU",
- parameters "millicores" and "durationSec".

Expand All @@ -36,13 +32,19 @@ and if consumption is too high binary sleeps for 10 millisecond.
One replica of Resource Consumer cannot consume more that 1 cpu.

###Consume Memory http request

- suffix "ConsumeMem",
- parameters "megabytes" and "durationSec".

Consumes specified amount of megabytes for durationSec seconds.
Consume Memory uses stress tool (stress -m 1 --vm-bytes megabytes --vm-hang 0 -t durationSec).
Request leading to consumig more memory then container limit will be ignored.
Request leading to consuming more memory then container limit will be ignored.

###Bump value of a fake custom metric
- suffix "BumpMetric",
- parameters "metric", "delta" and "durationSec".

Bumps metric with given name by delta for durationSec seconds.
Custom metrics in Prometheus format are exposed on "/metrics" endpoint.

###CURL example
```console
Expand Down
2 changes: 1 addition & 1 deletion test/images/resource-consumer/resource_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ var port = flag.Int("port", 8080, "Port number.")

func main() {
flag.Parse()
var resourceConsumerHandler ResourceConsumerHandler
resourceConsumerHandler := NewResourceConsumerHandler()
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), resourceConsumerHandler))
}
125 changes: 100 additions & 25 deletions test/images/resource-consumer/resource_consumer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"
)

const (
Expand All @@ -30,17 +32,34 @@ const (
notGivenFunctionArgument = "not given function argument"
consumeCPUAddress = "/ConsumeCPU"
consumeMemAddress = "/ConsumeMem"
bumpMetricAddress = "/BumpMetric"
getCurrentStatusAddress = "/GetCurrentStatus"
metricsAddress = "/metrics"
millicoresQuery = "millicores"
megabytesQuery = "megabytes"
metricNameQuery = "metric"
deltaQuery = "delta"
durationSecQuery = "durationSec"
)

type ResourceConsumerHandler struct{}
type ResourceConsumerHandler struct {
metrics map[string]float64
metricsLock sync.Mutex
}

func NewResourceConsumerHandler() ResourceConsumerHandler {
return ResourceConsumerHandler{metrics: map[string]float64{}}
}

func (handler ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// handle exposing metrics in Prometheus format (both GET & POST)
if req.URL.Path == metricsAddress {
handler.handleMetrics(w)
return
}
if req.Method != "POST" {
http.Error(w, badRequest, http.StatusBadRequest)
return
}
// parsing POST request data and URL data
if err := req.ParseForm(); err != nil {
Expand All @@ -62,6 +81,11 @@ func (handler ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *htt
handler.handleGetCurrentStatus(w)
return
}
// handle bumpMetric
if req.URL.Path == bumpMetricAddress {
handler.handleBumpMetric(w, req.Form)
return
}
http.Error(w, unknownFunction, http.StatusNotFound)
}

Expand All @@ -72,21 +96,20 @@ func (handler ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, q
if durationSecString == "" || millicoresString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
return
} else {
// convert data (strings to ints) for consumeCPU
durationSec, durationSecError := strconv.Atoi(durationSecString)
millicores, millicoresError := strconv.Atoi(millicoresString)
if durationSecError != nil || millicoresError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeCPU(millicores, durationSec)
fmt.Fprintln(w, consumeCPUAddress[1:])
fmt.Fprintln(w, millicores, millicoresQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}

// convert data (strings to ints) for consumeCPU
durationSec, durationSecError := strconv.Atoi(durationSecString)
millicores, millicoresError := strconv.Atoi(millicoresString)
if durationSecError != nil || millicoresError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}

go ConsumeCPU(millicores, durationSec)
fmt.Fprintln(w, consumeCPUAddress[1:])
fmt.Fprintln(w, millicores, millicoresQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}

func (handler ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) {
Expand All @@ -96,23 +119,75 @@ func (handler ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, q
if durationSecString == "" || megabytesString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
return
} else {
// convert data (strings to ints) for consumeMem
durationSec, durationSecError := strconv.Atoi(durationSecString)
megabytes, megabytesError := strconv.Atoi(megabytesString)
if durationSecError != nil || megabytesError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeMem(megabytes, durationSec)
fmt.Fprintln(w, consumeMemAddress[1:])
fmt.Fprintln(w, megabytes, megabytesQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}

// convert data (strings to ints) for consumeMem
durationSec, durationSecError := strconv.Atoi(durationSecString)
megabytes, megabytesError := strconv.Atoi(megabytesString)
if durationSecError != nil || megabytesError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}

go ConsumeMem(megabytes, durationSec)
fmt.Fprintln(w, consumeMemAddress[1:])
fmt.Fprintln(w, megabytes, megabytesQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}

func (handler ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) {
GetCurrentStatus()
fmt.Fprintln(w, "Warning: not implemented!")
fmt.Fprint(w, getCurrentStatusAddress[1:])
}

func (handler ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) {
handler.metricsLock.Lock()
defer handler.metricsLock.Unlock()
for k, v := range handler.metrics {
fmt.Fprintf(w, "# HELP %s info message.\n", k)
fmt.Fprintf(w, "# TYPE %s gauge\n", k)
fmt.Fprintf(w, "%s %f\n", k, v)
}
}

func (handler ResourceConsumerHandler) bumpMetric(metric string, delta float64, duration time.Duration) {
handler.metricsLock.Lock()
if _, ok := handler.metrics[metric]; ok {
handler.metrics[metric] += delta
} else {
handler.metrics[metric] = delta
}
handler.metricsLock.Unlock()

time.Sleep(duration)

handler.metricsLock.Lock()
handler.metrics[metric] -= delta
handler.metricsLock.Unlock()
}

func (handler ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) {
// geting string data for handleBumpMetric
metric := query.Get(metricNameQuery)
deltaString := query.Get(deltaQuery)
durationSecString := query.Get(durationSecQuery)
if durationSecString == "" || metric == "" || deltaString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
return
}

// convert data (strings to ints/floats) for handleBumpMetric
durationSec, durationSecError := strconv.Atoi(durationSecString)
delta, deltaError := strconv.ParseFloat(deltaString, 64)
if durationSecError != nil || deltaError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}

go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second)
fmt.Fprintln(w, bumpMetricAddress[1:])
fmt.Fprintln(w, metric, metricNameQuery)
fmt.Fprintln(w, delta, deltaQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}

0 comments on commit ba30690

Please sign in to comment.