Skip to content

Commit

Permalink
Scheduler fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
xetorthio committed Aug 10, 2017
1 parent ce8ec8a commit 0bc0342
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,8 @@ func (s *scheduler) Start() {
s.Schedule(session)
})
s.event.On(event.SESSION_END, func(sessionId string, args ...interface{}) {
session, err := s.storage.SessionGet(sessionId)
if err != nil {
log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err)
return
}
err = s.Unschedule(session)
session := &types.Session{Id: sessionId}
err := s.Unschedule(session)
if err != nil {
log.Println(err)
return
Expand Down Expand Up @@ -146,7 +142,18 @@ func (s *scheduler) cron(ctx context.Context, session *scheduledSession) {
}

func (s *scheduler) processSession(ctx context.Context, session *types.Session) {
for _, ins := range session.Instances {
updatedSession, err := s.storage.SessionGet(session.Id)
if err != nil {
if storage.NotFound(err) {
log.Printf("Session [%s] was not found in storage. Unscheduling.\n", session.Id)
s.Unschedule(session)
} else {
log.Printf("Cannot process session. Got %s\n", err)
}
return
}

for _, ins := range updatedSession.Instances {
go s.processInstance(ctx, ins)
}
}
Expand All @@ -169,6 +176,9 @@ func (s *scheduler) Schedule(session *types.Session) error {
scheduledSession.cancel = cancel
scheduledSession.ticker = time.NewTicker(1 * time.Second)
go s.cron(ctx, scheduledSession)

log.Printf("Scheduled session [%s]\n", session.Id)

return nil
}

Expand All @@ -185,5 +195,7 @@ func (s *scheduler) Unschedule(session *types.Session) error {
scheduledSession.ticker.Stop()
delete(s.scheduledSessions, session.Id)

log.Printf("Unscheduled session [%s]\n", session.Id)

return nil
}

0 comments on commit 0bc0342

Please sign in to comment.