Skip to content

Commit

Permalink
Make each new instance of kubelet generate a new event channel (inste…
Browse files Browse the repository at this point in the history
…ad of reusing existing).
  • Loading branch information
saad-ali committed Mar 30, 2015
1 parent 10b4fe6 commit e0f71cb
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 152 deletions.
6 changes: 4 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
if err != nil {
glog.Fatalf("Couldn't create scheduler config: %v", err)
}
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"})
eventBroadcaster.StartRecordingToSink(cl.Events(""))
scheduler.New(schedulerConfig).Run()

endpoints := service.NewEndpointController(cl)
Expand All @@ -221,8 +224,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
}}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{},
record.FromSource(api.EventSource{Component: "controllermanager"}), 10, 5*time.Minute)
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
nodeController.Run(5*time.Second, true, false)
cadvisorInterface := new(cadvisor.Fake)

Expand Down
4 changes: 1 addition & 3 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
Expand Down Expand Up @@ -178,8 +177,7 @@ func (s *CMServer) Run(_ []string) error {
}

nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
kubeClient, kubeletClient, record.FromSource(api.EventSource{Component: "controllermanager"}),
s.RegisterRetryCount, s.PodEvictionTimeout)
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus)

resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
Expand Down
10 changes: 6 additions & 4 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,15 @@ func SimpleKubelet(client *client.Client,
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig) {
kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride)
kcfg.Recorder = record.FromSource(api.EventSource{Component: "kubelet", Host: kcfg.Hostname})
eventBroadcaster := record.NewBroadcaster()
kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.Hostname})
eventBroadcaster.StartLogging(glog.Infof)
if kcfg.KubeClient != nil {
kubelet.SetupEventSending(kcfg.KubeClient, kcfg.Hostname)
glog.Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(kcfg.KubeClient.Events(""))
} else {
glog.Infof("No api server defined - no events will be sent.")
glog.Infof("No api server defined - no events will be sent to API server.")
}
kubelet.SetupLogging()
kubelet.SetupCapabilities(kcfg.AllowPrivileged, kcfg.HostNetworkSources)

credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)
Expand Down
4 changes: 1 addition & 3 deletions cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
Expand Down Expand Up @@ -129,8 +128,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
}
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient,
record.FromSource(api.EventSource{Component: "controllermanager"}), 10, 5*time.Minute)
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute)
nodeController.Run(10*time.Second, true, true)

endpoints := service.NewEndpointController(cl)
Expand Down
188 changes: 107 additions & 81 deletions pkg/client/record/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const maxTriesPerEvent = 12

var sleepDuration = 10 * time.Second

const maxQueuedEvents = 1000

// EventSink knows how to store events (client.Client implements it.)
// EventSink must respect the namespace that will be embedded in 'event'.
// It is assumed that EventSink will return the same sorts of errors as
Expand All @@ -44,50 +46,94 @@ type EventSink interface {
Update(event *api.Event) (*api.Event, error)
}

var emptySource = api.EventSource{}
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it will
// be used to automate handling of events, so imagine people writing switch statements to
// handle them. You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, reason, message string)

// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, reason, messageFmt string, args ...interface{})
}

// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartEventWatcher starts sending events recieved from this EventBroadcaster to the given
// event handler function. The return value can be ignored or used to stop recording, if
// desired.
StartEventWatcher(eventHandler func(*api.Event)) watch.Interface

// StartRecordingToSink starts sending events recieved from this EventBroadcaster to the given
// sink. The return value can be ignored or used to stop recording, if desired.
StartRecordingToSink(sink EventSink) watch.Interface

// StartLogging starts sending events recieved from this EventBroadcaster to the given logging
// function. The return value can be ignored or used to stop recording, if desired.
StartLogging(logf func(format string, args ...interface{})) watch.Interface

// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(source api.EventSource) EventRecorder
}

// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)}
}

type eventBroadcasterImpl struct {
*watch.Broadcaster
}

// StartRecording starts sending events to a sink. Call once while initializing
// your binary. Subsequent calls will be ignored. The return value can be ignored
// or used to stop recording, if desired.
// StartRecordingToSink starts sending events recieved from the specified eventBroadcaster to the given sink.
// The return value can be ignored or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval
func StartRecording(sink EventSink) watch.Interface {
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
return GetEvents(func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy

previousEvent := getEvent(event)
updateExistingEvent := previousEvent.Count > 0
if updateExistingEvent {
event.Count = previousEvent.Count + 1
event.FirstTimestamp = previousEvent.FirstTimestamp
event.Name = previousEvent.Name
event.ResourceVersion = previousEvent.ResourceVersion
}

tries := 0
for {
if recordEvent(sink, event, updateExistingEvent) {
break
}
tries++
if tries >= maxTriesPerEvent {
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
return eventBroadcaster.StartEventWatcher(
func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy

previousEvent := getEvent(event)
updateExistingEvent := previousEvent.Count > 0
if updateExistingEvent {
event.Count = previousEvent.Count + 1
event.FirstTimestamp = previousEvent.FirstTimestamp
event.Name = previousEvent.Name
event.ResourceVersion = previousEvent.ResourceVersion
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)

tries := 0
for {
if recordEvent(sink, event, updateExistingEvent) {
break
}
tries++
if tries >= maxTriesPerEvent {
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}
})
})
}

// recordEvent attempts to write event to a sink. It returns true if the event
Expand Down Expand Up @@ -131,22 +177,23 @@ func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) boo
return false
}

// StartLogging just logs local events, using the given logging function. The
// return value can be ignored or used to stop logging, if desired.
func StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return GetEvents(func(e *api.Event) {
logf("Event(%#v): reason: '%v' %v", e.InvolvedObject, e.Reason, e.Message)
})
// StartLogging starts sending events recieved from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return eventBroadcaster.StartEventWatcher(
func(e *api.Event) {
logf("Event(%#v): reason: '%v' %v", e.InvolvedObject, e.Reason, e.Message)
})
}

// GetEvents lets you see *local* events. Convenience function for testing. The
// return value can be ignored or used to stop logging, if desired.
func GetEvents(f func(*api.Event)) watch.Interface {
w := events.Watch()
// StartEventWatcher starts sending events recieved from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*api.Event)) watch.Interface {
watcher := eventBroadcaster.Watch()
go func() {
defer util.HandleCrash()
for {
watchEvent, open := <-w.ResultChan()
watchEvent, open := <-watcher.ResultChan()
if !open {
return
}
Expand All @@ -156,58 +203,37 @@ func GetEvents(f func(*api.Event)) watch.Interface {
// ever happen.
continue
}
f(event)
eventHandler(event)
}
}()
return w
}

const maxQueuedEvents = 1000

var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)

// EventRecorder knows how to record events for an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it will
// be used to automate handling of events, so imagine people writing switch statements to
// handle them. You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, reason, message string)

// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, reason, messageFmt string, args ...interface{})
return watcher
}

// FromSource returns an EventRecorder that records events with the
// given event source.
func FromSource(source api.EventSource) EventRecorder {
return &recorderImpl{source}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(source api.EventSource) EventRecorder {
return &recorderImpl{source, eventBroadcaster.Broadcaster}
}

type recorderImpl struct {
source api.EventSource
*watch.Broadcaster
}

func (i *recorderImpl) Event(object runtime.Object, reason, message string) {
func (recorder *recorderImpl) Event(object runtime.Object, reason, message string) {
ref, err := api.GetReference(object)
if err != nil {
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v'", object, err, reason, message)
return
}

e := makeEvent(ref, reason, message)
e.Source = i.source
event := makeEvent(ref, reason, message)
event.Source = recorder.source

events.Action(watch.Added, e)
recorder.Action(watch.Added, event)
}

func (i *recorderImpl) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
i.Event(object, reason, fmt.Sprintf(messageFmt, args...))
func (recorder *recorderImpl) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, reason, fmt.Sprintf(messageFmt, args...))
}

func makeEvent(ref *api.ObjectReference, reason, message string) *api.Event {
Expand Down
Loading

0 comments on commit e0f71cb

Please sign in to comment.