Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Fixed 1520: Cannot remove an Ended Task #1529

Merged
merged 1 commit into from
Mar 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/scheduler_event/scheduler_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
TaskDeleted = "Scheduler.TaskDeleted"
TaskStarted = "Scheduler.TaskStarted"
TaskStopped = "Scheduler.TaskStopped"
TaskEnded = "Scheduler.TaskEnded"
TaskDisabled = "Scheduler.TaskDisabled"
MetricCollected = "Scheduler.MetricsCollected"
MetricCollectionFailed = "Scheduler.MetricCollectionFailed"
Expand Down Expand Up @@ -70,6 +71,15 @@ func (e TaskStoppedEvent) Namespace() string {
return TaskStopped
}

type TaskEndedEvent struct {
TaskID string
Source string
}

func (e TaskEndedEvent) Namespace() string {
return TaskEnded
}

type TaskDisabledEvent struct {
TaskID string
Why string
Expand Down
3 changes: 2 additions & 1 deletion core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
TaskStopped: "Stopped", // stopped but resumable
TaskSpinning: "Running", // running
TaskFiring: "Running", // running (firing can happen so briefly we don't want to try and render it as a string state)
TaskEnded: "Ended", // ended, not resumable because the schedule will not fire again
TaskEnded: "Ended", // ended, but resumable if the schedule is still valid and might fire again
TaskStopping: "Stopping", // channel has been closed, wait for TaskStopped state
}
)
Expand All @@ -65,6 +65,7 @@ type TaskWatcherHandler interface {
CatchCollection([]Metric)
CatchTaskStarted()
CatchTaskStopped()
CatchTaskEnded()
CatchTaskDisabled(string)
}

Expand Down
5 changes: 2 additions & 3 deletions docs/TASKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ A task can be in the following states:
- **running:** a running task
- **stopped:** a task that is not running
- **disabled:** a task in a state not allowed to start. This happens when the task produces consecutive errors. A disabled task must be re-enabled before it can be started again.
- **ended:** a task for which the schedule is ended. At present this happens only for windowed schedule with defined _stop_timestamp_. An ended task is resumable if the schedule is still valid.


![newtaskstatediagram2](https://cloud.githubusercontent.com/assets/21182867/19282545/a4179520-8fa3-11e6-9056-4fc3aa610983.png)

![statediagram](https://cloud.githubusercontent.com/assets/11335874/23362447/0f0b9f74-fcf6-11e6-93d7-889a7ccdc45f.png)

How To | Command
----------------------------------------|------------------------
Expand Down
2 changes: 1 addition & 1 deletion mgmt/rest/client/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *Client) WatchTask(id string) *WatchTasksResult {
case rbody.TaskWatchTaskDisabled:
r.EventChan <- ste
r.Close()
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
r.EventChan <- ste
}
}
Expand Down
2 changes: 1 addition & 1 deletion mgmt/rest/rest_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func watchTask(id string, port int) *watchTaskResult {
r.eventChan <- ste.EventType
r.close()
return
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
log.Info(ste.EventType)
r.eventChan <- ste.EventType
}
Expand Down
2 changes: 2 additions & 0 deletions mgmt/rest/v1/rbody/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
ScheduledTaskType = "scheduled_task"
ScheduledTaskStartedType = "scheduled_task_started"
ScheduledTaskStoppedType = "scheduled_task_stopped"
ScheduledTaskEndedType = "scheduled_task_ended"
ScheduledTaskRemovedType = "scheduled_task_removed"
ScheduledTaskWatchingEndedType = "schedule_task_watch_ended"
ScheduledTaskEnabledType = "scheduled_task_enabled"
Expand All @@ -46,6 +47,7 @@ const (
TaskWatchTaskDisabled = "task-disabled"
TaskWatchTaskStarted = "task-started"
TaskWatchTaskStopped = "task-stopped"
TaskWatchTaskEnded = "task-ended"
)

type ScheduledTaskListReturned struct {
Expand Down
8 changes: 7 additions & 1 deletion mgmt/rest/v1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *apiV1) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P
// The client can decide to stop receiving on the stream on Task Stopped.
// We write the event to the buffer
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped:
case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded:
// A disabled task should end the streaming and close the connection
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
// Flush since we are sending nothing new
Expand Down Expand Up @@ -289,6 +289,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() {
}
}

func (t *TaskWatchHandler) CatchTaskEnded() {
t.mChan <- rbody.StreamedTaskEvent{
EventType: rbody.TaskWatchTaskEnded,
}
}

func (t *TaskWatchHandler) CatchTaskDisabled(why string) {
t.mChan <- rbody.StreamedTaskEvent{
EventType: rbody.TaskWatchTaskDisabled,
Expand Down
9 changes: 8 additions & 1 deletion mgmt/rest/v2/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
TaskWatchTaskDisabled = "task-disabled"
TaskWatchTaskStarted = "task-started"
TaskWatchTaskStopped = "task-stopped"
TaskWatchTaskEnded = "task-ended"
)

// The amount of time to buffer streaming events before flushing in seconds
Expand Down Expand Up @@ -95,7 +96,7 @@ func (s *apiV2) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P
// The client can decide to stop receiving on the stream on Task Stopped.
// We write the event to the buffer
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
case TaskWatchTaskDisabled, TaskWatchTaskStopped:
case TaskWatchTaskDisabled, TaskWatchTaskStopped, TaskWatchTaskEnded:
// A disabled task should end the streaming and close the connection
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
// Flush since we are sending nothing new
Expand Down Expand Up @@ -165,6 +166,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() {
}
}

func (t *TaskWatchHandler) CatchTaskEnded() {
t.mChan <- StreamedTaskEvent{
EventType: TaskWatchTaskEnded,
}
}

func (t *TaskWatchHandler) CatchTaskDisabled(why string) {
t.mChan <- StreamedTaskEvent{
EventType: TaskWatchTaskDisabled,
Expand Down
29 changes: 29 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ var (
ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started.")
// ErrTaskDisabledNotStoppable - The error message for when a task is disabled and cannot be stopped
ErrTaskDisabledNotStoppable = errors.New("Task is disabled. Only running tasks can be stopped.")
// ErrTaskEndedNotStoppable - The error message for when a task is ended and cannot be stopped
ErrTaskEndedNotStoppable = errors.New("Task is ended. Only running tasks can be stopped.")
)

type schedulerState int
Expand Down Expand Up @@ -471,6 +473,7 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError {
serror.New(ErrTaskDisabledNotRunnable),
}
}

if t.state == core.TaskFiring || t.state == core.TaskSpinning {
logger.WithFields(log.Fields{
"task-id": t.ID(),
Expand All @@ -481,6 +484,16 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError {
}
}

// Ensure the schedule is valid at this point and time.
if err := t.schedule.Validate(); err != nil {
errs := []serror.SnapError{
serror.New(err),
}
f := buildErrorsLog(errs, logger)
f.Error("schedule passed not valid")
return errs
}

// Group dependencies by the node they live on
// and subscribe to them.
depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics)
Expand Down Expand Up @@ -559,6 +572,14 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError {
return []serror.SnapError{
serror.New(ErrTaskAlreadyStopped),
}
case core.TaskEnded:
logger.WithFields(log.Fields{
"task-id": t.ID(),
"task-state": t.State(),
}).Error("task is already ended")
return []serror.SnapError{
serror.New(ErrTaskEndedNotStoppable),
}
case core.TaskDisabled:
logger.WithFields(log.Fields{
"task-id": t.ID(),
Expand Down Expand Up @@ -768,6 +789,14 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) {
"task-id": v.TaskID,
}).Debug("event received")
s.taskWatcherColl.handleTaskStopped(v.TaskID)
case *scheduler_event.TaskEndedEvent:
log.WithFields(log.Fields{
"_module": "scheduler-events",
"_block": "handle-events",
"event-namespace": e.Namespace(),
"task-id": v.TaskID,
}).Debug("event received")
s.taskWatcherColl.handleTaskEnded(v.TaskID)
case *scheduler_event.TaskDisabledEvent:
log.WithFields(log.Fields{
"_module": "scheduler-events",
Expand Down
Loading