Skip to content

Commit

Permalink
Merge pull request kubernetes#812 from smarterclayton/sync_loop_max_wait
Browse files Browse the repository at this point in the history
Kubelet should have a max think time before auto resync
  • Loading branch information
lavalamp committed Aug 7, 2014
2 parents 9ede472 + d7f4671 commit 14c379d
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 20 deletions.
2 changes: 0 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func startComponents(manifestURL string) (apiServerURL string) {
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(cfg1.Sync, 3*time.Second)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), http.DefaultServeMux, "localhost", 10250)
}, 0)
Expand All @@ -127,7 +126,6 @@ func startComponents(manifestURL string) (apiServerURL string) {
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd"))
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(cfg2.Sync, 3*time.Second)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251)
}, 0)
Expand Down
8 changes: 2 additions & 6 deletions cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,12 @@ func main() {
dockerClient,
cadvisorClient,
etcdClient,
*rootDirectory)
*rootDirectory,
*syncFrequency)

// start the kubelet
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)

// resynchronize periodically
// TODO: make this part of PodConfig so that it is only delivered after syncFrequency has elapsed without
// an update
go util.Forever(cfg.Sync, *syncFrequency)

// start the kubelet server
if *enableServer {
go util.Forever(func() {
Expand Down
31 changes: 19 additions & 12 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ func NewMainKubelet(
dc DockerInterface,
cc CadvisorInterface,
ec tools.EtcdClient,
rd string) *Kubelet {
rd string,
ri time.Duration) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
cadvisorClient: cc,
etcdClient: ec,
rootDirectory: rd,
resyncInterval: ri,
podWorkers: newPodWorkers(),
}
}
Expand All @@ -79,19 +81,21 @@ func NewMainKubelet(
// TODO: add more integration tests, and expand parameter list as needed.
func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
dockerPuller: &FakeDockerPuller{},
podWorkers: newPodWorkers(),
hostname: hn,
dockerClient: dc,
dockerPuller: &FakeDockerPuller{},
resyncInterval: 3 * time.Second,
podWorkers: newPodWorkers(),
}
}

// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
dockerClient DockerInterface
rootDirectory string
podWorkers podWorkers
hostname string
dockerClient DockerInterface
rootDirectory string
podWorkers podWorkers
resyncInterval time.Duration

// Optional, no events will be sent without it
etcdClient tools.EtcdClient
Expand Down Expand Up @@ -561,14 +565,15 @@ func filterHostPortConflicts(pods []Pod) []Pod {
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
var pods []Pod
for {
var pods []Pod
select {
case u := <-updates:
switch u.Op {
case SET:
glog.Infof("Containers changed [%s]", kl.hostname)
pods = u.Pods
pods = filterHostPortConflicts(pods)

case UPDATE:
//TODO: implement updates of containers
Expand All @@ -578,10 +583,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
default:
panic("syncLoop does not support incremental changes")
}
case <-time.After(kl.resyncInterval):
if pods == nil {
continue
}
}

pods = filterHostPortConflicts(pods)

err := handler.SyncPods(pods)
if err != nil {
glog.Errorf("Couldn't sync containers : %v", err)
Expand Down

0 comments on commit 14c379d

Please sign in to comment.