Skip to content

Commit

Permalink
Merge pull request moby#16957 from MHBauer/eventsservice-refactor
Browse files Browse the repository at this point in the history
refactor access to daemon member EventsService
  • Loading branch information
calavera committed Oct 19, 2015
2 parents bb085b7 + 2abf5d9 commit b27fa6c
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 14 deletions.
9 changes: 4 additions & 5 deletions api/server/router/local/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
}

enc := buildOutputEncoder(w)
d := s.daemon
es := d.EventsService
current, l := es.Subscribe()
defer es.Evict(l)

eventFilter := d.GetEventFilter(ef)
current, l, cancel := s.daemon.SubscribeToEvents()
defer cancel()

eventFilter := s.daemon.GetEventFilter(ef)
handleEvent := func(ev *jsonmessage.JSONMessage) error {
if eventFilter.Include(ev) {
if err := enc.Encode(ev); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/docker/docker/pkg/graphdb"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/docker/docker/pkg/nat"
"github.com/docker/docker/pkg/parsers/filters"
Expand Down Expand Up @@ -548,6 +549,11 @@ func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter {
return events.NewFilter(filter, daemon.GetLabels)
}

// SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events.
func (daemon *Daemon) SubscribeToEvents() ([]*jsonmessage.JSONMessage, chan interface{}, func()) {
return daemon.EventsService.Subscribe()
}

// GetLabels for a container or image id
func (daemon *Daemon) GetLabels(id string) map[string]string {
// TODO: TestCase
Expand Down
15 changes: 10 additions & 5 deletions daemon/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ func New() *Events {
}
}

// Subscribe adds new listener to events, returns slice of 64 stored last events
// channel in which you can expect new events in form of interface{}, so you
// need type assertion.
func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) {
// Subscribe adds new listener to events, returns slice of 64 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion), and a function to call
// to stop the stream of events.
func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}, func()) {
e.mu.Lock()
current := make([]*jsonmessage.JSONMessage, len(e.events))
copy(current, e.events)
l := e.pub.Subscribe()
e.mu.Unlock()
return current, l

cancel := func() {
e.Evict(l)
}
return current, l, cancel
}

// Evict evicts listener from pubsub
Expand Down
8 changes: 4 additions & 4 deletions daemon/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

func TestEventsLog(t *testing.T) {
e := New()
_, l1 := e.Subscribe()
_, l2 := e.Subscribe()
_, l1, _ := e.Subscribe()
_, l2, _ := e.Subscribe()
defer e.Evict(l1)
defer e.Evict(l2)
count := e.SubscribersCount()
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestEventsLog(t *testing.T) {

func TestEventsLogTimeout(t *testing.T) {
e := New()
_, l := e.Subscribe()
_, l, _ := e.Subscribe()
defer e.Evict(l)

c := make(chan struct{})
Expand All @@ -91,7 +91,7 @@ func TestLogEvents(t *testing.T) {
e.Log(action, id, from)
}
time.Sleep(50 * time.Millisecond)
current, l := e.Subscribe()
current, l, _ := e.Subscribe()
for i := 0; i < 10; i++ {
num := i + eventsLimit + 16
action := fmt.Sprintf("action_%d", num)
Expand Down

0 comments on commit b27fa6c

Please sign in to comment.