From a3276577d889c5951e2fe861d650a0845b249035 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 1 Apr 2015 17:37:22 -0700 Subject: [PATCH] Demonstration of etcd watch problem --- test/integration/client_test.go | 229 +++++++++++++++++++++++++++++++- 1 file changed, 227 insertions(+), 2 deletions(-) diff --git a/test/integration/client_test.go b/test/integration/client_test.go index a5e48c9cf8902..b2eba974a3b3e 100644 --- a/test/integration/client_test.go +++ b/test/integration/client_test.go @@ -19,17 +19,24 @@ limitations under the License. package integration import ( + "fmt" + "log" "net/http" "net/http/httptest" "reflect" + "runtime" + "sync" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit" ) @@ -37,7 +44,7 @@ func init() { requireEtcd() } -func TestClient(t *testing.T) { +func RunAMaster(t *testing.T) (*master.Master, *httptest.Server) { helper, err := master.NewEtcdHelper(newEtcdClient(), "v1beta1") if err != nil { t.Fatalf("unexpected error: %v", err) @@ -47,7 +54,6 @@ func TestClient(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { m.Handler.ServeHTTP(w, req) })) - defer s.Close() m = master.New(&master.Config{ EtcdHelper: helper, @@ -60,6 +66,13 @@ func TestClient(t *testing.T) { AdmissionControl: admit.NewAlwaysAdmit(), }) + return m, s +} + +func TestClient(t *testing.T) { + _, s := RunAMaster(t) + defer s.Close() + testCases := []string{ "v1beta1", "v1beta2", @@ -132,3 +145,215 @@ func TestClient(t *testing.T) { } } } + +func TestMultiWatch(t *testing.T) { + // Disable this test as long as it demonstrates a problem. + // TODO: Reenable this test when we get #6059 resolved. + return + const watcherCount = 50 + runtime.GOMAXPROCS(watcherCount) + + deleteAllEtcdKeys() + defer deleteAllEtcdKeys() + _, s := RunAMaster(t) + defer s.Close() + + ns := api.NamespaceDefault + client := client.NewOrDie(&client.Config{Host: s.URL, Version: "v1beta1"}) + + dummyEvent := func(i int) *api.Event { + name := fmt.Sprintf("unrelated-%v", i) + return &api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("%v.%x", name, time.Now().UnixNano()), + Namespace: ns, + }, + InvolvedObject: api.ObjectReference{ + Name: name, + Namespace: ns, + }, + Reason: fmt.Sprintf("unrelated change %v", i), + } + } + + type timePair struct { + t time.Time + name string + } + + receivedTimes := make(chan timePair, watcherCount*2) + watchesStarted := sync.WaitGroup{} + + // make a bunch of pods and watch them + for i := 0; i < watcherCount; i++ { + watchesStarted.Add(1) + name := fmt.Sprintf("multi-watch-%v", i) + got, err := client.Pods(ns).Create(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Labels: labels.Set{"watchlabel": name}, + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Name: "nothing", + Image: "kubernetes/pause", + }}, + }, + }) + + if err != nil { + t.Fatalf("Couldn't make %v: %v", name, err) + } + go func(name, rv string) { + w, err := client.Pods(ns).Watch( + labels.Set{"watchlabel": name}.AsSelector(), + fields.Everything(), + rv, + ) + if err != nil { + panic(fmt.Sprintf("watch error for %v: %", name, err)) + } + defer w.Stop() + watchesStarted.Done() + e, ok := <-w.ResultChan() // should get the update (that we'll do below) + if !ok { + panic(fmt.Sprintf("%v ended early?", name)) + } + if e.Type != watch.Modified { + panic(fmt.Sprintf("Got unexpected watch notification:\n%v: %+v %+v", name, e, e.Object)) + } + receivedTimes <- timePair{time.Now(), name} + }(name, got.ObjectMeta.ResourceVersion) + } + log.Printf("%v: %v pods made and watchers started", time.Now(), watcherCount) + + // wait for watches to start before we start spamming the system with + // objects below, otherwise we'll hit the watch window restriction. + watchesStarted.Wait() + + const ( + useEventsAsUnrelatedType = false + usePodsAsUnrelatedType = true + ) + + // make a bunch of unrelated changes in parallel + if useEventsAsUnrelatedType { + const unrelatedCount = 3000 + var wg sync.WaitGroup + defer wg.Wait() + changeToMake := make(chan int, unrelatedCount*2) + changeMade := make(chan int, unrelatedCount*2) + go func() { + for i := 0; i < unrelatedCount; i++ { + changeToMake <- i + } + close(changeToMake) + }() + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + i, ok := <-changeToMake + if !ok { + return + } + if _, err := client.Events(ns).Create(dummyEvent(i)); err != nil { + panic(fmt.Sprintf("couldn't make an event: %v", err)) + } + changeMade <- i + } + }() + } + + for i := 0; i < 2000; i++ { + <-changeMade + if (i+1)%50 == 0 { + log.Printf("%v: %v unrelated changes made", time.Now(), i+1) + } + } + } + if usePodsAsUnrelatedType { + const unrelatedCount = 3000 + var wg sync.WaitGroup + defer wg.Wait() + changeToMake := make(chan int, unrelatedCount*2) + changeMade := make(chan int, unrelatedCount*2) + go func() { + for i := 0; i < unrelatedCount; i++ { + changeToMake <- i + } + close(changeToMake) + }() + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + i, ok := <-changeToMake + if !ok { + return + } + name := fmt.Sprintf("unrelated-%v", i) + _, err := client.Pods(ns).Create(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: name, + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Name: "nothing", + Image: "kubernetes/pause", + }}, + }, + }) + + if err != nil { + panic(fmt.Sprintf("couldn't make unrelated pod: %v", err)) + } + changeMade <- i + } + }() + } + + for i := 0; i < 2000; i++ { + <-changeMade + if (i+1)%50 == 0 { + log.Printf("%v: %v unrelated changes made", time.Now(), i+1) + } + } + } + + // Now we still have changes being made in parallel, but at least 1000 have been made. + // Make some updates to send down the watches. + sentTimes := make(chan timePair, watcherCount*2) + for i := 0; i < watcherCount; i++ { + go func(i int) { + name := fmt.Sprintf("multi-watch-%v", i) + pod, err := client.Pods(ns).Get(name) + if err != nil { + panic(fmt.Sprintf("Couldn't get %v: %v", name, err)) + } + pod.Spec.Containers[0].Image = "kubernetes/pause:1" + sentTimes <- timePair{time.Now(), name} + if _, err := client.Pods(ns).Update(pod); err != nil { + panic(fmt.Sprintf("Couldn't make %v: %v", name, err)) + } + }(i) + } + + sent := map[string]time.Time{} + for i := 0; i < watcherCount; i++ { + tp := <-sentTimes + sent[tp.name] = tp.t + } + log.Printf("all changes made") + dur := map[string]time.Duration{} + for i := 0; i < watcherCount; i++ { + tp := <-receivedTimes + delta := tp.t.Sub(sent[tp.name]) + dur[tp.name] = delta + log.Printf("%v: %v", tp.name, delta) + } + log.Printf("all watches ended") + t.Errorf("durations: %v", dur) +}