Workflow: adding controller and address 2nd round review
This commit is contained in:
@@ -30,7 +30,7 @@ Documentation for other releases can be found at
|
|||||||
|
|
||||||
## Abstract
|
## Abstract
|
||||||
|
|
||||||
A proposal to introduce [workflow](https://en.wikipedia.org/wiki/Workflow_management_system)
|
This proposal introduces [workflow](https://en.wikipedia.org/wiki/Workflow_management_system)
|
||||||
functionality in kubernetes.
|
functionality in kubernetes.
|
||||||
Workflows (aka [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) workflows
|
Workflows (aka [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) workflows
|
||||||
since _tasks_ are organized in a Directed Acyclic Graph) are ubiquitous
|
since _tasks_ are organized in a Directed Acyclic Graph) are ubiquitous
|
||||||
@@ -57,7 +57,7 @@ dependency has not been satisified.
|
|||||||
|
|
||||||
## Implementation
|
## Implementation
|
||||||
|
|
||||||
This proposal introduces a new REST resource `Workflow`. A `Workflow` is represented as a
|
In this proposal a new REST resource `Workflow` is introduced. A `Workflow` is represented as a
|
||||||
[graph](https://en.wikipedia.org/wiki/Graph_(mathematics)), more specifically as a DAG.
|
[graph](https://en.wikipedia.org/wiki/Graph_(mathematics)), more specifically as a DAG.
|
||||||
Vertices of the graph represent steps of the workflow. The workflow steps are represented via a
|
Vertices of the graph represent steps of the workflow. The workflow steps are represented via a
|
||||||
`WorkflowStep`<sup>1</sup> resource.
|
`WorkflowStep`<sup>1</sup> resource.
|
||||||
@@ -66,38 +66,15 @@ The edges of the graph represent _dependecies_. To represent edges there is no e
|
|||||||
The basic idea of this proposal consists in creation of each step postponing execution
|
The basic idea of this proposal consists in creation of each step postponing execution
|
||||||
until all predecessors' steps run to completion.
|
until all predecessors' steps run to completion.
|
||||||
|
|
||||||
|
|
||||||
### Postponing execution
|
|
||||||
|
|
||||||
At the time of writing, to defer execution there are two discussions in the community:
|
|
||||||
[#17305](https://github.com/kubernetes/kubernetes/pull/17305): an
|
|
||||||
_initializer_ is a dynamically registered object which permits to select a custom controller
|
|
||||||
to be applied to a resource. The controller verifies the dependencies.
|
|
||||||
The controller checks are applied before the resource is created (even API validated).
|
|
||||||
Using a proper controller one may defer creation of the resource until prerequisites
|
|
||||||
are satisfied. Even if not completed [#17305](https://github.com/kubernetes/kubernetes/pull/17305)
|
|
||||||
already introduces a _dependency_ concept
|
|
||||||
([see this comment](https://github.com/kubernetes/kubernetes/pull/17305#discussion_r45007826))
|
|
||||||
which could be reused to implement `Workflow`. In
|
|
||||||
[#1899](https://github.com/kubernetes/kubernetes/issues/1899):
|
|
||||||
some use-cases to wait for specific conditions (`complete`, `ready`) are presented.
|
|
||||||
|
|
||||||
|
|
||||||
### Detecting run to completion
|
|
||||||
|
|
||||||
To detect run to completion for the resource inside the graph the resource needs to implement
|
|
||||||
in `status` the slice of `condition`s. [See](../../docs/devel/api-conventions.md#objects)
|
|
||||||
and [#7856](https://github.com/kubernetes/kubernetes/issues/7856).
|
|
||||||
|
|
||||||
### Workflow
|
### Workflow
|
||||||
|
|
||||||
A new resource will be introduced in the API. A `Workflow` is a graph.
|
A new resource will be introduced in the API. A `Workflow` is a graph.
|
||||||
In the simplest case it's a graph of `Job`s but it can also
|
In the simplest case it's a graph of `Job`s but it can also
|
||||||
be a graph of other entities (for example cross-cluster objects or other `Workflow`s).
|
be a graph of other entities (for example cross-cluster objects or other `Workflow`s).
|
||||||
|
|
||||||
|
|
||||||
```go
|
```go
|
||||||
// Workflow is a directed acyclic graph
|
|
||||||
|
// Workflow is a DAG workflow
|
||||||
type Workflow struct {
|
type Workflow struct {
|
||||||
unversioned.TypeMeta `json:",inline"`
|
unversioned.TypeMeta `json:",inline"`
|
||||||
|
|
||||||
@@ -123,84 +100,37 @@ type WorkflowList struct {
|
|||||||
// Items is the list of Workflow
|
// Items is the list of Workflow
|
||||||
Items []Workflow `json:"items"`
|
Items []Workflow `json:"items"`
|
||||||
}
|
}
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
#### `WorkflowSpec`
|
|
||||||
|
|
||||||
```go
|
|
||||||
// WorkflowSpec contains Workflow specification
|
// WorkflowSpec contains Workflow specification
|
||||||
type WorkflowSpec struct {
|
type WorkflowSpec struct {
|
||||||
// Optional duration in seconds relative to the startTime that the job may be active
|
// Standard object's metadata.
|
||||||
// before the system tries to terminate it; value must be positive integer
|
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
|
||||||
|
api.ObjectMeta `json:"metadata,omitempty"`
|
||||||
|
|
||||||
|
//ActiveDealineSeconds contains
|
||||||
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`
|
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`
|
||||||
|
|
||||||
// Steps contains the vertices of the workflow graph. The key of the map is a string
|
// Steps is a map containing the workflow steps. Key of the
|
||||||
// to uniquely identify the step. Steps order is defined by their dependencies.
|
// map is a string which uniquely identifies the workflow step
|
||||||
Steps map[string]WorkflowStep `json:"steps,omitempty"`
|
Steps map[string]WorkflowStep `json:"steps,omitempty"`
|
||||||
|
|
||||||
|
// Selector for created jobs (if any)
|
||||||
|
Selector *LabelSelector `json:"selector,omitempty"`
|
||||||
}
|
}
|
||||||
```
|
|
||||||
|
|
||||||
* `spec.steps`: is a map of `WorkflowStep`s. _Key_ of the map is a string which identifies the step.
|
|
||||||
|
|
||||||
|
|
||||||
### `WorkflowStep`
|
|
||||||
|
|
||||||
The `WorkflowStep` resource acts as a [union](https://en.wikipedia.org/wiki/Tagged_union) of `JobSpec` and `ObjectReference`.
|
|
||||||
|
|
||||||
```go
|
|
||||||
// WorkflowStep contains necessary information to identifiy the node of the workflow graph
|
// WorkflowStep contains necessary information to identifiy the node of the workflow graph
|
||||||
type WorkflowStep struct {
|
type WorkflowStep struct {
|
||||||
// JobTemplate contains the job specificaton that should be run in this Workflow.
|
// JobTemplate contains the job specificaton that should be run in this Workflow.
|
||||||
// Only one between externalRef and jobTemplate can be set.
|
// Only one between externalRef and jobTemplate can be set.
|
||||||
JobTemplate JobSpec `json:"jobTemplate,omitempty"`
|
JobTemplate *JobTemplateSpec `json:"jobTemplate,omitempty"`
|
||||||
|
|
||||||
// External contains a reference to another schedulable resource.
|
// External contains a reference to another schedulable resource.
|
||||||
// Only one between ExternalRef and JobTemplate can be set.
|
// Only one between ExternalRef and JobTemplate can be set.
|
||||||
ExternalRef api.ObjectReference `json:"externalRef,omitempty"`
|
ExternalRef *api.ObjectReference `json:"externalRef,omitempty"`
|
||||||
|
|
||||||
// Dependecies represent dependecies of the current workflow step
|
// Dependecies represent dependecies of the current workflow step.
|
||||||
Dependencies ObjectDependencies `json:"dependencies,omitempty"`
|
// Irrilevant for ExteranlRef step
|
||||||
}
|
Dependencies []string `json:"dependencies,omitempty"`
|
||||||
```
|
|
||||||
|
|
||||||
* `workflowStep.jobSpec` contains the specification of the job to be executed.
|
|
||||||
* `workflowStep.externalRef` contains a reference to external resources (for example another `Workflow`).
|
|
||||||
*
|
|
||||||
|
|
||||||
```go
|
|
||||||
type ObjectDependencies struct {
|
|
||||||
// DependeciesRef is a slice of unique identifier of the step (key of the spec.steps map)
|
|
||||||
DependencyIDs []string `json:"dependencyIDs,omitempty"`
|
|
||||||
ControllerRef *ObjectReference `json:"controllerRef,omitempty"`
|
|
||||||
//...
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
* `dependencies.dependencyIDs`: is a slice with a list of _step_ that must run to completion.
|
|
||||||
* `dependencies.controllerRef`: will contain the controller for the current `WorkflowStep`. As a first
|
|
||||||
|
|
||||||
|
|
||||||
This approach permits to implement other kinds of controller, for example data availability
|
|
||||||
or other external event. In a first implementation `dependencies.controllerRef` will implement only
|
|
||||||
the logic to check all dependencies ran to completion: since at the beginning only `Workflow` and `Job`
|
|
||||||
can be composed the only thing needed to implement is the ability to check wether a `Job` or
|
|
||||||
a `Workflow` runs to completion.
|
|
||||||
Our understanding is that detecting the type of object and an approach similar to what
|
|
||||||
is implemented in `pkg/client/unversioned/conditions.go` and `pkg/kubectl/scale.go` for _desiredReplicas_ can
|
|
||||||
be used to to detect if a _step_ must be started.
|
|
||||||
|
|
||||||
|
|
||||||
### `WorkflowStatus`
|
|
||||||
|
|
||||||
```go
|
|
||||||
// WorkflowStatus contains the current status of the Workflow
|
|
||||||
type WorkflowStatus struct {
|
|
||||||
// Conditions represent the latest available observations of an object's current state.
|
|
||||||
Conditions []WorkflowCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
|
|
||||||
|
|
||||||
// Statuses represent status of different steps
|
|
||||||
Statuses map[string]WorkflowStepStatus `json:statuses`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkflowConditionType string
|
type WorkflowConditionType string
|
||||||
@@ -211,32 +141,111 @@ const (
|
|||||||
WorkflowComplete WorkflowConditionType = "Complete"
|
WorkflowComplete WorkflowConditionType = "Complete"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WorkflowCondition describes current state of a workflow.
|
|
||||||
type WorkflowCondition struct {
|
type WorkflowCondition struct {
|
||||||
// Type of workflow condition, currently only Complete.
|
// Type of workflow condition, currently only Complete.
|
||||||
Type WorkflowConditionType `json:"type"`
|
Type WorkflowConditionType `json:"type"`
|
||||||
|
|
||||||
// Status of the condition, one of True, False, Unknown.
|
// Status of the condition, one of True, False, Unknown.
|
||||||
Status api.ConditionStatus `json:"status"`
|
Status api.ConditionStatus `json:"status"`
|
||||||
|
|
||||||
// Last time the condition was checked.
|
// Last time the condition was checked.
|
||||||
LastProbeTime unversioned.Time `json:"lastProbeTime,omitempty"`
|
LastProbeTime unversioned.Time `json:"lastProbeTime,omitempty"`
|
||||||
|
|
||||||
// Last time the condition transited from one status to another.
|
// Last time the condition transited from one status to another.
|
||||||
LastTransitionTime unversioned.Time `json:"lastTransitionTime,omitempty"`
|
LastTransitionTime unversioned.Time `json:"lastTransitionTime,omitempty"`
|
||||||
|
|
||||||
// (brief) reason for the condition's last transition.
|
// (brief) reason for the condition's last transition.
|
||||||
Reason string `json:"reason,omitempty"`
|
Reason string `json:"reason,omitempty"`
|
||||||
|
|
||||||
// Human readable message indicating details about last transition.
|
// Human readable message indicating details about last transition.
|
||||||
Message string `json:"message,omitempty"`
|
Message string `json:"message,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// WorkflowStepStatus contains the status of a WorkflowStep
|
// WorkflowStatus represents the
|
||||||
|
type WorkflowStatus struct {
|
||||||
|
// Conditions represent the latest available observations of an object's current state.
|
||||||
|
Conditions []WorkflowCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
|
||||||
|
|
||||||
|
// StartTime represents time when the job was acknowledged by the Workflow controller
|
||||||
|
// It is not guaranteed to be set in happens-before order across separate operations.
|
||||||
|
// It is represented in RFC3339 form and is in UTC.
|
||||||
|
// StartTime doesn't consider startime of `ExternalReference`
|
||||||
|
StartTime *unversioned.Time `json:"startTime,omitempty"`
|
||||||
|
|
||||||
|
// CompletionTime represents time when the workflow was completed. It is not guaranteed to
|
||||||
|
// be set in happens-before order across separate operations.
|
||||||
|
// It is represented in RFC3339 form and is in UTC.
|
||||||
|
CompletionTime *unversioned.Time `json:"completionTime,omitempty"`
|
||||||
|
|
||||||
|
// Statuses represent status of different steps
|
||||||
|
Statuses map[string]WorkflowStepStatus `json:statuses`
|
||||||
|
}
|
||||||
|
|
||||||
|
// WorkflowStepStatus contains necessary information for the step status
|
||||||
type WorkflowStepStatus struct {
|
type WorkflowStepStatus struct {
|
||||||
// ObjectReference contains the reference to the resource
|
//Complete is set to true when resource run to complete
|
||||||
ObjectReference api.ObjectReference `json:"objectReference,omitempty"`
|
Complete bool `json:"complete"`
|
||||||
|
|
||||||
|
// Reference of the step resource
|
||||||
|
Reference api.ObjectReference `json:"reference"`
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
* `status.statuses`: is a map of `WorkflowStepStatus`es. _Key_ of the map is a string which identifies the step.
|
`JobTemplateSpec` is already introduced by
|
||||||
_Keys_ are the same used in `spec.steps`.
|
ScheduledJob controller proposal](https://github.com/kubernetes/kubernete2s/pull/11980).
|
||||||
* `status.conditions`: is a slice of `WorkflowCondition`s. [see #7856](https://github.com/kubernetes/kubernetes/issues/7856)
|
Reported for readability:
|
||||||
|
|
||||||
|
```go
|
||||||
|
type JobTemplateSpec struct {
|
||||||
|
// Standard object's metadata.
|
||||||
|
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
|
||||||
|
api.ObjectMeta
|
||||||
|
|
||||||
|
// Spec is a structure defining the expected behavior of a job.
|
||||||
|
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status
|
||||||
|
Spec JobSpec
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Controller
|
||||||
|
|
||||||
|
Workflow controller will watch `Workflow` objects and any `Job` objects created by the workflow.
|
||||||
|
the `Job`s objects created in each step.
|
||||||
|
Each step can contain either another `Workflow` referenced via `workflowStep.ExternalRef`
|
||||||
|
or a `Job` created via `workflowStep.jobTemplate`.
|
||||||
|
For each non finished workflow (similarly to Job, Workflow completion is detected iterating
|
||||||
|
over all the `status.conditions` condition) we check if deadline is not expired.
|
||||||
|
If deadline expired the workfow is terminated.
|
||||||
|
If deadline didn't expires the workflow controller iterates over all workflow steps:
|
||||||
|
- If step has a status (retrieved via step name (map key) in the `status.statuses`
|
||||||
|
map check whether the step is already completed.
|
||||||
|
- If step is completed nothing is done.
|
||||||
|
- If step is not completed two sub-cases should be analyzed:
|
||||||
|
* Step containing workflow: check wether workflow terminated and eventually update
|
||||||
|
the `status.statuses[name].complete` entry if applicable
|
||||||
|
* Step containing job: check whether job needs to be started or is already started.
|
||||||
|
- A step/job is started if it has no dependecies or all its dependencies are already
|
||||||
|
terminated. Workflow controller adds some labels to the Job.
|
||||||
|
This will permit to obtain the workflow each job belongs to (via `spec.Selector`).
|
||||||
|
The step name is equally inserted as a label in each job.
|
||||||
|
- If the job is already running, a completion check is performed. If the job terminated
|
||||||
|
(checked via conditions `job.status`) the field `status.statusues[name].complete` is updated.
|
||||||
|
- When all steps are complete: `complete` condition is added to `status.condition` and the
|
||||||
|
`status.completionTime` is updated. At this point, workflow is finished.
|
||||||
|
|
||||||
|
|
||||||
|
## Changing a Workflow
|
||||||
|
|
||||||
|
### Updating
|
||||||
|
|
||||||
|
User can modify a workflow only if the `step`s under modification are not already running.
|
||||||
|
|
||||||
|
|
||||||
|
### Deleting
|
||||||
|
|
||||||
|
Users can cancel a workflow by deleting it before it's completed. All running jobs will be deleted.
|
||||||
|
Other workflows referenced in steps will not be deleted as they are not owned by the parent workflow.
|
||||||
|
|
||||||
|
|
||||||
## Events
|
## Events
|
||||||
|
|
||||||
@@ -247,6 +256,12 @@ The events associated to `Workflow`s will be:
|
|||||||
* WorkflowEnded
|
* WorkflowEnded
|
||||||
* WorkflowDeleted
|
* WorkflowDeleted
|
||||||
|
|
||||||
|
## Kubectl
|
||||||
|
|
||||||
|
Kubectl will be modified to display workflows. More particulary the `describe` command
|
||||||
|
will display all the steps with their status. Steps will be topologically sorted and
|
||||||
|
each dependency will be decorated with its status (wether or not step is waitin for
|
||||||
|
dependency).
|
||||||
|
|
||||||
## Future evolution
|
## Future evolution
|
||||||
|
|
||||||
@@ -254,11 +269,10 @@ In the future we may want to extend _Workflow_ with other kinds of resources, mo
|
|||||||
support a more general template to create other resources.
|
support a more general template to create other resources.
|
||||||
One of the major functionalities missing here is the ability to set a recurring `Workflow` (cron-like),
|
One of the major functionalities missing here is the ability to set a recurring `Workflow` (cron-like),
|
||||||
similar to the `ScheduledJob` [#11980](https://github.com/kubernetes/kubernetes/pull/11980) for `Job`.
|
similar to the `ScheduledJob` [#11980](https://github.com/kubernetes/kubernetes/pull/11980) for `Job`.
|
||||||
If the scheduled job is able
|
If the scheduled job is able to support
|
||||||
to support [various resources](https://github.com/kubernetes/kubernetes/pull/11980#discussion_r46729699)
|
[various resources](https://github.com/kubernetes/kubernetes/pull/11980#discussion_r46729699)
|
||||||
`Workflow` will benefit from the _schedule_ functionality of `Job`.
|
`Workflow` will benefit from the _schedule_ functionality of `Job`.
|
||||||
|
|
||||||
|
|
||||||
### Relevant use cases out of scope of this proposal
|
### Relevant use cases out of scope of this proposal
|
||||||
|
|
||||||
* As an admin I want to set quota on workflow resources
|
* As an admin I want to set quota on workflow resources
|
||||||
|
Reference in New Issue
Block a user