Skip to content

Commit

Permalink
Merge pull request kubernetes#6257 from lavalamp/fix6
Browse files Browse the repository at this point in the history
Add a test verifying that etcd watches work even when there's a bunch of unrelated changes
  • Loading branch information
dchen1107 committed Apr 2, 2015
2 parents 96bdee8 + a327657 commit 6ac59c8
Showing 1 changed file with 227 additions and 2 deletions.
229 changes: 227 additions & 2 deletions test/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,32 @@ limitations under the License.
package integration

import (
"fmt"
"log"
"net/http"
"net/http/httptest"
"reflect"
"runtime"
"sync"
"testing"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit"
)

func init() {
requireEtcd()
}

func TestClient(t *testing.T) {
func RunAMaster(t *testing.T) (*master.Master, *httptest.Server) {
helper, err := master.NewEtcdHelper(newEtcdClient(), "v1beta1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand All @@ -47,7 +54,6 @@ func TestClient(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
m.Handler.ServeHTTP(w, req)
}))
defer s.Close()

m = master.New(&master.Config{
EtcdHelper: helper,
Expand All @@ -60,6 +66,13 @@ func TestClient(t *testing.T) {
AdmissionControl: admit.NewAlwaysAdmit(),
})

return m, s
}

func TestClient(t *testing.T) {
_, s := RunAMaster(t)
defer s.Close()

testCases := []string{
"v1beta1",
"v1beta2",
Expand Down Expand Up @@ -132,3 +145,215 @@ func TestClient(t *testing.T) {
}
}
}

func TestMultiWatch(t *testing.T) {
// Disable this test as long as it demonstrates a problem.
// TODO: Reenable this test when we get #6059 resolved.
return
const watcherCount = 50
runtime.GOMAXPROCS(watcherCount)

deleteAllEtcdKeys()
defer deleteAllEtcdKeys()
_, s := RunAMaster(t)
defer s.Close()

ns := api.NamespaceDefault
client := client.NewOrDie(&client.Config{Host: s.URL, Version: "v1beta1"})

dummyEvent := func(i int) *api.Event {
name := fmt.Sprintf("unrelated-%v", i)
return &api.Event{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%v.%x", name, time.Now().UnixNano()),
Namespace: ns,
},
InvolvedObject: api.ObjectReference{
Name: name,
Namespace: ns,
},
Reason: fmt.Sprintf("unrelated change %v", i),
}
}

type timePair struct {
t time.Time
name string
}

receivedTimes := make(chan timePair, watcherCount*2)
watchesStarted := sync.WaitGroup{}

// make a bunch of pods and watch them
for i := 0; i < watcherCount; i++ {
watchesStarted.Add(1)
name := fmt.Sprintf("multi-watch-%v", i)
got, err := client.Pods(ns).Create(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
Labels: labels.Set{"watchlabel": name},
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "nothing",
Image: "kubernetes/pause",
}},
},
})

if err != nil {
t.Fatalf("Couldn't make %v: %v", name, err)
}
go func(name, rv string) {
w, err := client.Pods(ns).Watch(
labels.Set{"watchlabel": name}.AsSelector(),
fields.Everything(),
rv,
)
if err != nil {
panic(fmt.Sprintf("watch error for %v: %", name, err))
}
defer w.Stop()
watchesStarted.Done()
e, ok := <-w.ResultChan() // should get the update (that we'll do below)
if !ok {
panic(fmt.Sprintf("%v ended early?", name))
}
if e.Type != watch.Modified {
panic(fmt.Sprintf("Got unexpected watch notification:\n%v: %+v %+v", name, e, e.Object))
}
receivedTimes <- timePair{time.Now(), name}
}(name, got.ObjectMeta.ResourceVersion)
}
log.Printf("%v: %v pods made and watchers started", time.Now(), watcherCount)

// wait for watches to start before we start spamming the system with
// objects below, otherwise we'll hit the watch window restriction.
watchesStarted.Wait()

const (
useEventsAsUnrelatedType = false
usePodsAsUnrelatedType = true
)

// make a bunch of unrelated changes in parallel
if useEventsAsUnrelatedType {
const unrelatedCount = 3000
var wg sync.WaitGroup
defer wg.Wait()
changeToMake := make(chan int, unrelatedCount*2)
changeMade := make(chan int, unrelatedCount*2)
go func() {
for i := 0; i < unrelatedCount; i++ {
changeToMake <- i
}
close(changeToMake)
}()
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
i, ok := <-changeToMake
if !ok {
return
}
if _, err := client.Events(ns).Create(dummyEvent(i)); err != nil {
panic(fmt.Sprintf("couldn't make an event: %v", err))
}
changeMade <- i
}
}()
}

for i := 0; i < 2000; i++ {
<-changeMade
if (i+1)%50 == 0 {
log.Printf("%v: %v unrelated changes made", time.Now(), i+1)
}
}
}
if usePodsAsUnrelatedType {
const unrelatedCount = 3000
var wg sync.WaitGroup
defer wg.Wait()
changeToMake := make(chan int, unrelatedCount*2)
changeMade := make(chan int, unrelatedCount*2)
go func() {
for i := 0; i < unrelatedCount; i++ {
changeToMake <- i
}
close(changeToMake)
}()
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
i, ok := <-changeToMake
if !ok {
return
}
name := fmt.Sprintf("unrelated-%v", i)
_, err := client.Pods(ns).Create(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "nothing",
Image: "kubernetes/pause",
}},
},
})

if err != nil {
panic(fmt.Sprintf("couldn't make unrelated pod: %v", err))
}
changeMade <- i
}
}()
}

for i := 0; i < 2000; i++ {
<-changeMade
if (i+1)%50 == 0 {
log.Printf("%v: %v unrelated changes made", time.Now(), i+1)
}
}
}

// Now we still have changes being made in parallel, but at least 1000 have been made.
// Make some updates to send down the watches.
sentTimes := make(chan timePair, watcherCount*2)
for i := 0; i < watcherCount; i++ {
go func(i int) {
name := fmt.Sprintf("multi-watch-%v", i)
pod, err := client.Pods(ns).Get(name)
if err != nil {
panic(fmt.Sprintf("Couldn't get %v: %v", name, err))
}
pod.Spec.Containers[0].Image = "kubernetes/pause:1"
sentTimes <- timePair{time.Now(), name}
if _, err := client.Pods(ns).Update(pod); err != nil {
panic(fmt.Sprintf("Couldn't make %v: %v", name, err))
}
}(i)
}

sent := map[string]time.Time{}
for i := 0; i < watcherCount; i++ {
tp := <-sentTimes
sent[tp.name] = tp.t
}
log.Printf("all changes made")
dur := map[string]time.Duration{}
for i := 0; i < watcherCount; i++ {
tp := <-receivedTimes
delta := tp.t.Sub(sent[tp.name])
dur[tp.name] = delta
log.Printf("%v: %v", tp.name, delta)
}
log.Printf("all watches ended")
t.Errorf("durations: %v", dur)
}

0 comments on commit 6ac59c8

Please sign in to comment.