Skip to content

Commit

Permalink
Workflow: adding controller and address 2nd round review
Browse files Browse the repository at this point in the history
  • Loading branch information
sdminonne committed Jan 29, 2016
1 parent 8b3895d commit e8cf1dc
Showing 1 changed file with 143 additions and 129 deletions.
272 changes: 143 additions & 129 deletions docs/proposals/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Documentation for other releases can be found at

## Abstract

A proposal to introduce [workflow](https://en.wikipedia.org/wiki/Workflow_management_system)
This proposal introduces [workflow](https://en.wikipedia.org/wiki/Workflow_management_system)
functionality in kubernetes.
Workflows (aka [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) workflows
since _tasks_ are organized in a Directed Acyclic Graph) are ubiquitous
Expand All @@ -57,7 +57,7 @@ dependency has not been satisified.

## Implementation

This proposal introduces a new REST resource `Workflow`. A `Workflow` is represented as a
In this proposal a new REST resource `Workflow` is introduced. A `Workflow` is represented as a
[graph](https://en.wikipedia.org/wiki/Graph_(mathematics)), more specifically as a DAG.
Vertices of the graph represent steps of the workflow. The workflow steps are represented via a
`WorkflowStep`<sup>1</sup> resource.
Expand All @@ -66,177 +66,186 @@ The edges of the graph represent _dependecies_. To represent edges there is no e
The basic idea of this proposal consists in creation of each step postponing execution
until all predecessors' steps run to completion.


### Postponing execution

At the time of writing, to defer execution there are two discussions in the community:
[#17305](https://github.com/kubernetes/kubernetes/pull/17305): an
_initializer_ is a dynamically registered object which permits to select a custom controller
to be applied to a resource. The controller verifies the dependencies.
The controller checks are applied before the resource is created (even API validated).
Using a proper controller one may defer creation of the resource until prerequisites
are satisfied. Even if not completed [#17305](https://github.com/kubernetes/kubernetes/pull/17305)
already introduces a _dependency_ concept
([see this comment](https://github.com/kubernetes/kubernetes/pull/17305#discussion_r45007826))
which could be reused to implement `Workflow`. In
[#1899](https://github.com/kubernetes/kubernetes/issues/1899):
some use-cases to wait for specific conditions (`complete`, `ready`) are presented.


### Detecting run to completion

To detect run to completion for the resource inside the graph the resource needs to implement
in `status` the slice of `condition`s. [See](../../docs/devel/api-conventions.md#objects)
and [#7856](https://github.com/kubernetes/kubernetes/issues/7856).

### Workflow

A new resource will be introduced in the API. A `Workflow` is a graph.
In the simplest case it's a graph of `Job`s but it can also
be a graph of other entities (for example cross-cluster objects or other `Workflow`s).


```go
// Workflow is a directed acyclic graph

// Workflow is a DAG workflow
type Workflow struct {
unversioned.TypeMeta `json:",inline"`
unversioned.TypeMeta `json:",inline"`

// Standard object's metadata.
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata.
api.ObjectMeta `json:"metadata,omitempty"`
// Standard object's metadata.
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata.
api.ObjectMeta `json:"metadata,omitempty"`

// Spec defines the expected behavior of a Workflow. More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status.
Spec WorkflowSpec `json:"spec,omitempty"`
// Spec defines the expected behavior of a Workflow. More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status.
Spec WorkflowSpec `json:"spec,omitempty"`

// Status represents the current status of the Workflow. More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status.
Status WorkflowStatus `json:"status,omitempty"`
// Status represents the current status of the Workflow. More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status.
Status WorkflowStatus `json:"status,omitempty"`
}

// WorkflowList implements list of Workflow.
type WorkflowList struct {
unversioned.TypeMeta `json:",inline"`
unversioned.TypeMeta `json:",inline"`

// Standard list metadata
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
unversioned.ListMeta `json:"metadata,omitempty"`
// Standard list metadata
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
unversioned.ListMeta `json:"metadata,omitempty"`

// Items is the list of Workflow
Items []Workflow `json:"items"`
// Items is the list of Workflow
Items []Workflow `json:"items"`
}
```


#### `WorkflowSpec`

```go
// WorkflowSpec contains Workflow specification
type WorkflowSpec struct {
// Optional duration in seconds relative to the startTime that the job may be active
// before the system tries to terminate it; value must be positive integer
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`
// Standard object's metadata.
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
api.ObjectMeta `json:"metadata,omitempty"`

// Steps contains the vertices of the workflow graph. The key of the map is a string
// to uniquely identify the step. Steps order is defined by their dependencies.
Steps map[string]WorkflowStep `json:"steps,omitempty"`
}
```
//ActiveDealineSeconds contains
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`

* `spec.steps`: is a map of `WorkflowStep`s. _Key_ of the map is a string which identifies the step.
// Steps is a map containing the workflow steps. Key of the
// map is a string which uniquely identifies the workflow step
Steps map[string]WorkflowStep `json:"steps,omitempty"`

// Selector for created jobs (if any)
Selector *LabelSelector `json:"selector,omitempty"`
}

### `WorkflowStep`

The `WorkflowStep` resource acts as a [union](https://en.wikipedia.org/wiki/Tagged_union) of `JobSpec` and `ObjectReference`.

```go
// WorkflowStep contains necessary information to identifiy the node of the workflow graph
type WorkflowStep struct {
// JobTemplate contains the job specificaton that should be run in this Workflow.
// Only one between externalRef and jobTemplate can be set.
JobTemplate JobSpec `json:"jobTemplate,omitempty"`
// JobTemplate contains the job specificaton that should be run in this Workflow.
// Only one between externalRef and jobTemplate can be set.
JobTemplate *JobTemplateSpec `json:"jobTemplate,omitempty"`

// External contains a reference to another schedulable resource.
// Only one between ExternalRef and JobTemplate can be set.
ExternalRef api.ObjectReference `json:"externalRef,omitempty"`
// External contains a reference to another schedulable resource.
// Only one between ExternalRef and JobTemplate can be set.
ExternalRef *api.ObjectReference `json:"externalRef,omitempty"`

// Dependecies represent dependecies of the current workflow step
Dependencies ObjectDependencies `json:"dependencies,omitempty"`
// Dependecies represent dependecies of the current workflow step.
// Irrilevant for ExteranlRef step
Dependencies []string `json:"dependencies,omitempty"`
}
```

* `workflowStep.jobSpec` contains the specification of the job to be executed.
* `workflowStep.externalRef` contains a reference to external resources (for example another `Workflow`).
*

```go
type ObjectDependencies struct {
// DependeciesRef is a slice of unique identifier of the step (key of the spec.steps map)
DependencyIDs []string `json:"dependencyIDs,omitempty"`
ControllerRef *ObjectReference `json:"controllerRef,omitempty"`
//...
}
```
type WorkflowConditionType string

* `dependencies.dependencyIDs`: is a slice with a list of _step_ that must run to completion.
* `dependencies.controllerRef`: will contain the controller for the current `WorkflowStep`. As a first
// These are valid conditions of a workflow.
const (
// WorkflowComplete means the workflow has completed its execution.
WorkflowComplete WorkflowConditionType = "Complete"
)

type WorkflowCondition struct {
// Type of workflow condition, currently only Complete.
Type WorkflowConditionType `json:"type"`

This approach permits to implement other kinds of controller, for example data availability
or other external event. In a first implementation `dependencies.controllerRef` will implement only
the logic to check all dependencies ran to completion: since at the beginning only `Workflow` and `Job`
can be composed the only thing needed to implement is the ability to check wether a `Job` or
a `Workflow` runs to completion.
Our understanding is that detecting the type of object and an approach similar to what
is implemented in `pkg/client/unversioned/conditions.go` and  `pkg/kubectl/scale.go` for _desiredReplicas_ can
be used to to detect if a _step_ must be started.
// Status of the condition, one of True, False, Unknown.
Status api.ConditionStatus `json:"status"`

// Last time the condition was checked.
LastProbeTime unversioned.Time `json:"lastProbeTime,omitempty"`

### `WorkflowStatus`
// Last time the condition transited from one status to another.
LastTransitionTime unversioned.Time `json:"lastTransitionTime,omitempty"`

```go
// WorkflowStatus contains the current status of the Workflow
type WorkflowStatus struct {
// Conditions represent the latest available observations of an object's current state.
Conditions []WorkflowCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
// (brief) reason for the condition's last transition.
Reason string `json:"reason,omitempty"`

// Statuses represent status of different steps
Statuses map[string]WorkflowStepStatus `json:statuses`
// Human readable message indicating details about last transition.
Message string `json:"message,omitempty"`
}

type WorkflowConditionType string
// WorkflowStatus represents the
type WorkflowStatus struct {
// Conditions represent the latest available observations of an object's current state.
Conditions []WorkflowCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`

// StartTime represents time when the job was acknowledged by the Workflow controller
// It is not guaranteed to be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
// StartTime doesn't consider startime of `ExternalReference`
StartTime *unversioned.Time `json:"startTime,omitempty"`

// CompletionTime represents time when the workflow was completed. It is not guaranteed to
// be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
CompletionTime *unversioned.Time `json:"completionTime,omitempty"`

// Statuses represent status of different steps
Statuses map[string]WorkflowStepStatus `json:statuses`
}

// These are valid conditions of a workflow.
const (
// WorkflowComplete means the workflow has completed its execution.
WorkflowComplete WorkflowConditionType = "Complete"
)
// WorkflowStepStatus contains necessary information for the step status
type WorkflowStepStatus struct {
//Complete is set to true when resource run to complete
Complete bool `json:"complete"`

// WorkflowCondition describes current state of a workflow.
type WorkflowCondition struct {
// Type of workflow condition, currently only Complete.
Type WorkflowConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
Status api.ConditionStatus `json:"status"`
// Last time the condition was checked.
LastProbeTime unversioned.Time `json:"lastProbeTime,omitempty"`
// Last time the condition transited from one status to another.
LastTransitionTime unversioned.Time `json:"lastTransitionTime,omitempty"`
// (brief) reason for the condition's last transition.
Reason string `json:"reason,omitempty"`
// Human readable message indicating details about last transition.
Message string `json:"message,omitempty"`
// Reference of the step resource
Reference api.ObjectReference `json:"reference"`
}
```

// WorkflowStepStatus contains the status of a WorkflowStep
type WorkflowStepStatus struct {
// ObjectReference contains the reference to the resource
ObjectReference api.ObjectReference `json:"objectReference,omitempty"`
`JobTemplateSpec` is already introduced by
ScheduledJob controller proposal](https://github.com/kubernetes/kubernete2s/pull/11980).
Reported for readability:

```go
type JobTemplateSpec struct {
// Standard object's metadata.
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
api.ObjectMeta

// Spec is a structure defining the expected behavior of a job.
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status
Spec JobSpec
}
```

* `status.statuses`: is a map of `WorkflowStepStatus`es. _Key_ of the map is a string which identifies the step.
_Keys_ are the same used in `spec.steps`.
* `status.conditions`: is a slice of `WorkflowCondition`s. [see #7856](https://github.com/kubernetes/kubernetes/issues/7856)
## Controller

Workflow controller will watch `Workflow` objects and any `Job` objects created by the workflow.
the `Job`s objects created in each step.
Each step can contain either another `Workflow` referenced via `workflowStep.ExternalRef`
or a `Job` created via `workflowStep.jobTemplate`.
For each non finished workflow (similarly to Job, Workflow completion is detected iterating
over all the `status.conditions` condition) we check if deadline is not expired.
If deadline expired the workfow is terminated.
If deadline didn't expires the workflow controller iterates over all workflow steps:
- If step has a status (retrieved via step name (map key) in the `status.statuses`
map check whether the step is already completed.
- If step is completed nothing is done.
- If step is not completed two sub-cases should be analyzed:
* Step containing workflow: check wether workflow terminated and eventually update
the `status.statuses[name].complete` entry if applicable
* Step containing job: check whether job needs to be started or is already started.
- A step/job is started if it has no dependecies or all its dependencies are already
terminated. Workflow controller adds some labels to the Job.
This will permit to obtain the workflow each job belongs to (via `spec.Selector`).
The step name is equally inserted as a label in each job.
- If the job is already running, a completion check is performed. If the job terminated
(checked via conditions `job.status`) the field `status.statusues[name].complete` is updated.
- When all steps are complete: `complete` condition is added to `status.condition` and the
`status.completionTime` is updated. At this point, workflow is finished.


## Changing a Workflow

### Updating

User can modify a workflow only if the `step`s under modification are not already running.


### Deleting

Users can cancel a workflow by deleting it before it's completed. All running jobs will be deleted.
Other workflows referenced in steps will not be deleted as they are not owned by the parent workflow.


## Events

Expand All @@ -247,18 +256,23 @@ The events associated to `Workflow`s will be:
* WorkflowEnded
* WorkflowDeleted

## Kubectl

Kubectl will be modified to display workflows. More particulary the `describe` command
will display all the steps with their status. Steps will be topologically sorted and
each dependency will be decorated with its status (wether or not step is waitin for
dependency).

## Future evolution

In the future we may want to extend _Workflow_ with other kinds of resources, modifying `WorkflowStep` to
support a more general template to create other resources.
One of the major functionalities missing here is the ability to set a recurring `Workflow` (cron-like),
similar to the `ScheduledJob` [#11980](https://github.com/kubernetes/kubernetes/pull/11980) for `Job`.
If the scheduled job is able
to support [various resources](https://github.com/kubernetes/kubernetes/pull/11980#discussion_r46729699)
If the scheduled job is able to support
[various resources](https://github.com/kubernetes/kubernetes/pull/11980#discussion_r46729699)
`Workflow` will benefit from the _schedule_ functionality of `Job`.


### Relevant use cases out of scope of this proposal

* As an admin I want to set quota on workflow resources
Expand Down

0 comments on commit e8cf1dc

Please sign in to comment.