Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds event listener to avoid sleep statements
Browse files Browse the repository at this point in the history
jcooklin authored and IzabellaRaulin committed Mar 24, 2017
1 parent 4f681cb commit 4b98fbb
Showing 3 changed files with 87 additions and 29 deletions.
11 changes: 8 additions & 3 deletions scheduler/distributed_task_test.go
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ import (
"github.com/intelsdi-x/snap/grpc/controlproxy"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/plugin/helper"
"github.com/intelsdi-x/snap/scheduler/fixtures"
"github.com/intelsdi-x/snap/scheduler/wmap"
. "github.com/smartystreets/goconvey/convey"
)
@@ -302,6 +303,8 @@ func TestDistributedSubscriptions(t *testing.T) {
})
})
Convey("Single run task", func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
count := uint(1)
interval := time.Millisecond * 100
sch := schedule.NewWindowedSchedule(interval, nil, nil, count)
@@ -323,9 +326,11 @@ func TestDistributedSubscriptions(t *testing.T) {
So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0)
})
Convey("Task should be ended after an interval", func() {
// wait for the end of the task
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for the end of the task (or timeout)
select {
case <-lse.Ended:
case <-time.After(time.Duration(interval.Nanoseconds()*int64(count)+interval.Nanoseconds()) + 1*time.Second):
}
So(t.State(), ShouldEqual, core.TaskEnded)

Convey("So all dependencies should have been usubscribed", func() {
43 changes: 43 additions & 0 deletions scheduler/fixtures/fixtures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// + build medium legacy

/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2017 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fixtures

import "github.com/intelsdi-x/gomit"
import "github.com/intelsdi-x/snap/core/scheduler_event"

type listenToSchedulerEvent struct {
Ended chan struct{}
}

// NewListenToSchedulerEvent
func NewListenToSchedulerEvent() *listenToSchedulerEvent {
return &listenToSchedulerEvent{
Ended: make(chan struct{}),
}
}

func (l *listenToSchedulerEvent) HandleGomitEvent(e gomit.Event) {
switch e.Body.(type) {
case *scheduler_event.TaskEndedEvent:
l.Ended <- struct{}{}
}
}
62 changes: 36 additions & 26 deletions scheduler/scheduler_medium_test.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ import (
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/fixtures"
"github.com/intelsdi-x/snap/scheduler/wmap"
)

@@ -250,6 +251,8 @@ func TestCreateTask(t *testing.T) {
})
})
Convey("should not error when the schedule is valid", func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
start := time.Now().Add(startWait)
stop := time.Now().Add(startWait + windowSize)
sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0)
@@ -260,11 +263,11 @@ func TestCreateTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()
Convey("the task should be ended after reaching the end of window", func() {
// wait for the end of determined window
time.Sleep(startWait + windowSize)
// wait an interval to be sure that the task state has been updated
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
}
// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)
})
@@ -290,6 +293,8 @@ func TestCreateTask(t *testing.T) {
})
})
Convey("Single run task firing on defined start time", func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
count := uint(1)
start := time.Now().Add(startWait)
sch := schedule.NewWindowedSchedule(interval, &start, nil, count)
@@ -300,11 +305,11 @@ func TestCreateTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()
Convey("the task should be ended after reaching the end of window", func() {
// wait for the end of determined window
time.Sleep(startWait)
// wait an interval to be sure that the task state has been updated
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(time.Duration(interval.Nanoseconds()*int64(count)+interval.Nanoseconds()) + 1*time.Second):
}
// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)
})
@@ -400,6 +405,8 @@ func TestStopTask(t *testing.T) {
})
})
Convey("Calling StopTask on an ended task", t, func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
start := time.Now().Add(startWait)
stop := time.Now().Add(startWait + windowSize)

@@ -412,11 +419,11 @@ func TestStopTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()

// wait for the end of determined window
time.Sleep(startWait + windowSize)
// wait an interval to be sure that the task state has been updated
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
}
// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)

@@ -486,6 +493,8 @@ func TestStartTask(t *testing.T) {
})
})
Convey("Calling StartTask on an ended windowed task", t, func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
start := time.Now().Add(startWait)
stop := time.Now().Add(startWait + windowSize)

@@ -498,11 +507,11 @@ func TestStartTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()

// wait for the end of determined window
time.Sleep(startWait + windowSize)
// wait an interval to be sure that the task state has been updated
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
}

// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)
@@ -583,6 +592,8 @@ func TestEnableTask(t *testing.T) {
})
})
Convey("Calling EnableTask on an ended task", t, func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
start := time.Now().Add(startWait)
stop := time.Now().Add(startWait + windowSize)

@@ -595,12 +606,11 @@ func TestEnableTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()

// wait for the end of determined window
time.Sleep(startWait + windowSize)
// wait an interval to be sure that the task state has been updated
/// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)

// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
}
// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)

0 comments on commit 4b98fbb

Please sign in to comment.