Skip to content

Pipeline

In runnable, we use the words

  • workflows and pipeline interchangeably.
  • nodes, steps interchangeably.

A workflow is a sequence of steps to perform.

Composite pipelines

runnable pipelines are composable. For example, a pipeline can have a parallel node which in itself has many pipelines running in parallel.


Concept

A visual example of a workflow:

stateDiagram-v2
    direction lr
    state "hello stub" as start_at
    state "hello python" as step_2
    state "hello notebook" as step_3
    state "hello shell" as step_4
    state "Success" as success


    [*] --> start_at
    start_at --> step_2 : #9989;
    step_2 --> step_3 : #9989;
    step_3 --> step_4 : #9989;
    step_4 --> success : #9989;
    success --> [*]
Traversal

Start at hello stub.

If it is successful, go to next step of the pipeline until we reach the success state.

Any failure in execution of step would, by default, go to the fail state.

Syntax

The above pipeline can be written in runnable as below. It is a mixed bag of python functions, notebook, shell and stub.

API Documentation

  • The first step of the steps is the start of the workflow.
  • The order of execution follows the order of the tasks in the list.
  • The terminal nodes success and fail are added automatically.
"""
You can execute this pipeline by:

    python examples/02-sequential/traversal.py

    A pipeline can have any "tasks" as part of it. In the
    below example, we have a mix of stub, python, shell and notebook tasks.

    As with simpler tasks, the stdout and stderr of each task are captured
    and stored in the catalog.

    .catalog
    └── cold-jennings-1534
        ├── examples
        │   └── common
        │       └── simple_notebook_out.ipynb
        ├── hello_notebook.execution.log
        ├── hello_python.execution.log
        └── hello_shell.execution.log

    4 directories, 4 files

"""

from examples.common.functions import hello
from runnable import NotebookTask, Pipeline, PythonTask, ShellTask, Stub


def main():
    stub_task = Stub(name="hello stub")

    python_task = PythonTask(
        name="hello python",
        function=hello,
    )

    shell_task = ShellTask(
        name="hello shell",
        command="echo 'Hello World!'",
    )

    notebook_task = NotebookTask(
        name="hello notebook",
        notebook="examples/common/simple_notebook.ipynb",
        terminate_with_success=True,
    )

    # The pipeline has a mix of tasks.
    # The order of execution follows the order of the tasks in the list.
    pipeline = Pipeline(
        steps=[  # (2)
            stub_task,  # (1)
            python_task,
            shell_task,
            notebook_task,
        ]
    )

    pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()
  1. Start the pipeline.
  2. The order of the steps is the execution order
  • The first step is the step corresponding to start_at
  • The mapping defined in the steps.
  • The next step after a successful execution of a step.
  • success as next node implies successful execution of the pipeline.
dag:
  description: |
    A pipeline can have any "tasks" as part of it. In the
    below example, we have a mix of stub, python, shell and notebook tasks.

    As with simpler tasks, the stdout and stderr of each task are captured
    and stored in the catalog.

    For example, the catalog structure for this execution would be:

    .catalog
    └── cold-jennings-1534
        ├── examples
           └── common
               └── simple_notebook_out.ipynb
        ├── hello_notebook.execution.log
        ├── hello_python.execution.log
        └── hello_shell.execution.log

    4 directories, 4 files

    The notebook simple_notebook_out.ipynb has the captured stdout of "Hello World!".

    You can run this pipeline as:
      runnable execute -f examples/02-sequential/traversal.yaml

  start_at: hello stub # (1)
  steps:
    hello stub:
      type: stub
      next: hello python # (2)
    hello python:
      type: task
      command_type: python
      command: examples.common.functions.hello # dotted path to the function.
      next: hello shell
    hello shell:
      type: task
      command_type: shell
      command: echo "Hello World!" # Command to run
      next: hello notebook
    hello notebook:
      type: task
      command_type: notebook
      command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project.
      next: success # (3)
  1. Start the pipeline at this step.
  2. State the next node, if it succeeds.
  3. Add the success and fail nodes.


on failure

By default, any failure during the execution of step will traverse to fail node marking the execution as failed.

The fail node is implicitly added to the pipeline.

This behavior can be over-ridden to follow a different path based on expected failures.

on failure success

step 1 fails as the function raises an exception. step 4 is an alternate node to a successful execution.

step 4 is the step to execution in case of the failure.

try:
    raise_exception()
except:
    # suppress exception
    do_something()
"""
This pipeline showcases handling failures in a pipeline.

The path taken if none of the steps failed:
step_1 -> step_2 -> step_3 -> success

step_1 is a python function that raises an exception.
And we can instruct the pipeline to execute step_4 if step_1 fails
and then eventually succeed too.
step_1 -> step_4 -> success

This pattern is handy when you are expecting a failure of a step
and have ways to handle it.

Run this pipeline:
    python examples/02-sequential/on_failure_succeed.py
"""

from examples.common.functions import raise_ex
from runnable import Pipeline, PythonTask, Stub


def main():
    step_1 = PythonTask(name="step 1", function=raise_ex)  # This will fail

    step_2 = Stub(name="step 2")

    step_3 = Stub(name="step 3", terminate_with_success=True)
    step_4 = Stub(name="step 4", terminate_with_success=True)  # (1)

    step_1.on_failure = step_4.name

    pipeline = Pipeline(
        steps=[step_1, step_2, step_3, [step_4]],
    )
    pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()
  1. terminate_with_success is true traverses to success node.
dag:
  description: |
    This pipeline showcases handling failures in a pipeline.

    The path taken if none of the steps failed:
    step_1 -> step_2 -> step_3 -> success

    step_1 is a python function that raises an exception.
    And we can instruct the pipeline to execute step_4 if step_1 fails
    and then eventually fail.
    step_1 -> step_4 -> success

    This pattern is handy when you are expecting a failure of a step
    and have ways to handle it.

    Run this pipeline as:
      runnable execute -f examples/02-sequential/on_failure_succeed.yaml
  start_at: step_1
  steps:
    step_1:
      type: task
      command_type: shell
      command: exit 1 # This will fail!
      next: step_2
      on_failure: step_4
    step_2:
      type: stub
      next: step_3
    step_3:
      type: stub
      next: success
    step_4:
      type: stub
      next: success

On failure fail

step 1 fails as the function raises an exception. step 4 is an alternate node to a successful execution.

step 4 is the step to execution in case of the failure.

try:
    raise_exception()
except:
    # raise exception after doing something.
    do_something()
    raise
"""
This pipeline showcases handling failures in a pipeline.

The path taken if none of the steps failed:
step_1 -> step_2 -> step_3 -> success

step_1 is a python function that raises an exception.
And we can instruct the pipeline to execute step_4 if step_1 fails
and then eventually fail.
step_1 -> step_4 -> fail

This pattern is handy when you need to do something before eventually
failing (eg: sending a notification, updating status, etc...)

Run this pipeline as:
    python examples/02-sequential/on_failure_fail.py
"""

from examples.common.functions import raise_ex
from runnable import Pipeline, PythonTask, Stub


def main():
    step_1 = PythonTask(name="step 1", function=raise_ex)  # This will fail

    step_2 = Stub(name="step 2")

    step_3 = Stub(name="step 3", terminate_with_success=True)
    step_4 = Stub(name="step 4", terminate_with_failure=True)  # (1)

    step_1.on_failure = step_4.name

    pipeline = Pipeline(
        steps=[step_1, step_2, step_3],
    )
    pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()
  1. terminate_with_failure is true traverses to fail node.
dag:
  description: |
    This pipeline showcases handling failures in a pipeline.

    The path taken if none of the steps failed:
    step_1 -> step_2 -> step_3 -> success

    step_1 is a python function that raises an exception.
    And we can instruct the pipeline to execute step_4 if step_1 fails
    and then eventually fail.
    step_1 -> step_4 -> fail

    This pattern is handy when you need to do something before eventually
    failing (eg: sending a notification, updating status, etc...)

    Run this pipeline as:
      runnable execute -f examples/02-sequential/default_fail.yaml
  start_at: step_1
  steps:
    step_1:
      type: task
      command_type: shell
      command: exit 1 # This will fail!
      next: step_2
      on_failure: step_4
    step_2:
      type: stub
      next: step_3
    step_3:
      type: stub
      next: success
    step_4:
      type: stub
      next: fail