Skip to content

Commit

Permalink
Issue queries in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
satnam6502 committed Apr 1, 2015
1 parent 6bdb3bf commit 7049bb6
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 24 deletions.
5 changes: 4 additions & 1 deletion test/soak/serve_hostnames/Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
all:
go build ./serve_hostnames.go
go build serve_hostnames.go

clean:
rm -rf serve_hostnames
79 changes: 56 additions & 23 deletions test/soak/serve_hostnames/serve_hostnames.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ import (
)

var (
queriesAverage = flag.Int("queries", 10, "Number of hostname queries to make in each iteration per pod on average")
queriesAverage = flag.Int("queries", 20, "Number of hostname queries to make in each iteration per pod on average")
podsPerNode = flag.Int("pods_per_node", 1, "Number of serve_hostname pods per node")
upTo = flag.Int("up_to", 1, "Number of iterations or -1 for no limit")
maxPar = flag.Int("max_par", 500, "Maximum number of queries in flight")
)

const (
deleteTimeout = 2 * time.Minute
endpointTimeout = 5 * time.Minute
nodeListTimeout = 2 * time.Minute
podCreateTimeout = 2 * time.Minute
podStartTimeout = 30 * time.Minute
serviceCreateTimeout = 2 * time.Minute
Expand All @@ -73,16 +75,23 @@ func main() {
glog.Fatalf("Failed to make client: %v", err)
}

nodes, err := c.Nodes().List()
var nodes *api.NodeList
for start := time.Now(); time.Since(start) < nodeListTimeout; time.Sleep(2 * time.Second) {
nodes, err = c.Nodes().List()
if err == nil {
break
}
glog.Warningf("Failed to list nodes: %v", err)
}
if err != nil {
glog.Fatalf("Failed to list nodes: %v", err)
glog.Fatalf("Giving up trying to list nodes: %v", err)
}

if len(nodes.Items) == 0 {
glog.Fatalf("Failed to find any nodes.")
}

glog.Infof("Nodes found on this cluster:")
glog.Infof("Found %d nodes on this cluster:", len(nodes.Items))
for i, node := range nodes.Items {
glog.Infof("%d: %s", i, node.Name)
}
Expand Down Expand Up @@ -130,7 +139,7 @@ func main() {
}
// Clean up service
defer func() {
glog.Infof("Cleaning up service %s/server-hostnames", ns)
glog.Infof("Cleaning up service %s/serve-hostnames", ns)
// Make several attempts to delete the service.
for start := time.Now(); time.Since(start) < deleteTimeout; time.Sleep(1 * time.Second) {
if err := c.Services(ns).Delete(svc.Name); err == nil {
Expand Down Expand Up @@ -209,9 +218,9 @@ func main() {
}
if pod.Status.Phase != api.PodRunning {
glog.Warningf("Gave up waiting on pod %s/%s to be running (saw %v)", ns, podName, pod.Status.Phase)
return
} else {
glog.Infof("%s/%s is running", ns, podName)
}
glog.Infof("%s/%s is running", ns, podName)
}

// Wait for the endpoints to propagate.
Expand Down Expand Up @@ -239,26 +248,50 @@ func main() {

// Repeatedly make requests.
for iteration := 0; iteration != *upTo; iteration++ {
responses := make(map[string]int, *podsPerNode*len(nodes.Items))
responseChan := make(chan string, queries)
// Use a channel of size *maxPar to throttle the number
// of in-flight requests to avoid overloading the service.
inFlight := make(chan struct{}, *maxPar)
start := time.Now()
for q := 0; q < queries; q++ {
t := time.Now()
hostname, err := c.Get().
Namespace(ns).
Prefix("proxy").
Resource("services").
Name("serve-hostnames").
DoRaw()
glog.V(4).Infof("Proxy call in namespace %s took %v", ns, time.Since(t))
if err != nil {
glog.Infof("Call failed during iteration %d query %d : %v", iteration, q, err)
} else {
responses[string(hostname)]++
go func() {
inFlight <- struct{}{}
t := time.Now()
hostname, err := c.Get().
Namespace(ns).
Prefix("proxy").
Resource("services").
Name("serve-hostnames").
DoRaw()
glog.V(4).Infof("Proxy call in namespace %s took %v", ns, time.Since(t))
i := iteration
query := q
if err != nil {
glog.Warningf("Call failed during iteration %d query %d : %v", i, query, err)
// If the query failed return a string which starts with a character
// that can't be part of a hostname.
responseChan <- fmt.Sprintf("!failed in iteration %d to issue query %d: %v", i, query, err)
} else {
responseChan <- string(hostname)
}
<-inFlight
}()
}
responses := make(map[string]int, *podsPerNode*len(nodes.Items))
missing := 0
for q := 0; q < queries; q++ {
r := <-responseChan
glog.V(4).Infof("Got response from %s", r)
responses[r]++
// If the returned hostname starts with '!' then it indicates
// an error response.
if len(r) > 0 && r[0] == '!' {
glog.V(3).Infof("Got response %s", r)
missing++
}

}
for k, v := range responses {
glog.V(4).Infof("%s: %d ", k, v)
if missing > 0 {
glog.Warningf("Missing %d responses out of %d", missing, queries)
}
// Report any nodes that did not respond.
for n, node := range nodes.Items {
Expand Down

0 comments on commit 7049bb6

Please sign in to comment.