Skip to content

Tasks

Task nodes are the execution units of the pipeline.

They can be python functions, notebooks, shell scripts or stubs

In the below examples, highlighted lines of the code are the relevant bits while the rest of the python code (or yaml) defines and executes a pipeline that executes the python function/notebook/shell script/stubs.


Python functions

Uses python functions as tasks.

API Documentation

Example

Structuring

It is best to keep the application specific functions in a different module than the pipeline definition, if you are using Python SDK.

"""
You can execute this pipeline by:

    python examples/01-tasks/python_tasks.py

The stdout of "Hello World!" would be captured as execution
log and stored in the catalog.

An example of the catalog structure:

.catalog
└── baked-heyrovsky-0602
    └── hello.execution.log

2 directories, 1 file


The hello.execution.log has the captured stdout of "Hello World!".
"""

from examples.common.functions import hello
from runnable import Pipeline, PythonTask


def main():
    # Create a tasks which calls the function "hello"
    # If this step executes successfully,
    # the pipeline will terminate with success
    hello_task = PythonTask(
        name="hello",
        function=hello,
        terminate_with_success=True,
    )

    # The pipeline has only one step.
    pipeline = Pipeline(steps=[hello_task])

    pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()

Dotted path

Assuming the below project structure:

  • The command for the outer_function should be outer_functions.outer_function

  • The command for inner_function should be module_inner.inner_functions.inner_function

    ..
    ├── outer_functions.py
    │   ├── outer_function()
    ├── module_inner
    │   ├── inner_functions.py
    │   |    ├── inner_function()
    ..
    
dag:
  description: |
    You can run this pipeline by:
       runnable execute -f examples/01-tasks/python_tasks.yaml

       The stdout of "Hello World!" would be captured as
       execution log and stored in the catalog.

        An example of the catalog structure:

        .catalog
        └── baked-heyrovsky-0602
            └── hello.execution.log

        2 directories, 1 file

        The hello.execution.log has the captured stdout of "Hello World!".
  start_at: hello_task
  steps:
    hello_task:
      type: task
      command: examples.common.functions.hello # dotted path to the function.
      next: success

Notebook

Jupyter notebooks are supported as tasks. We internally use Ploomber engine for executing notebooks.

The output is saved to the same location as the input notebook but with _out post-fixed to the name of the notebook and is also saved in the catalog for logging and ease of debugging.

API Documentation

Example

"""
You can execute this pipeline by:

    python examples/01-tasks/notebook.py

The notebook is executed in the same environment so any installed packages are available for the
notebook.

Upon successful execution, the output notebook with cell outputs is stored in the catalog.
For example, the catalog structure for this execution would be:

.catalog
└── meek-rosalind-0853
    ├── examples
    │   └── common
    │       └── simple_notebook_out.ipynb
    └── notebook.execution.log

The notebook simple_notebook_<step name>_out.ipynb has the captured stdout of "Hello World!".
"""

from runnable import NotebookTask, Pipeline


def main():
    # Execute the notebook present in examples/common/simple_notebook.ipynb.
    # The path is relative to the project root.
    # If this step executes successfully, the pipeline will terminate with success
    hello_task = NotebookTask(
        name="hello",
        notebook="examples/common/simple_notebook.ipynb",
        terminate_with_success=True,
    )

    # The pipeline has only one step.
    pipeline = Pipeline(steps=[hello_task])

    pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()
dag:
  description: |
    This is a sample pipeline with one step that executes a notebook.

    The notebook is executed in the same environment so any installed
    packages are available for the notebook.

    Upon successful execution, the output notebook with cell outputs
    is stored in the catalog.

    For example, the catalog structure for this execution would be:

    .catalog
    └── meek-rosalind-0853
        ├── examples
           └── common
               └── simple_notebook_out.ipynb
        └── notebook.execution.log

    The notebook simple_notebook_<step_name>_out.ipynb has the captured stdout of "Hello World!".

    You can run this pipeline as:
      runnable execute -f examples/01-tasks/notebook.yaml

  start_at: notebook
  steps:
    notebook:
      type: task
      command_type: notebook
      command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project.
      next: success

Shell

Python functions and Jupyter notebooks provide a rich interface to the python ecosystem while shell provides a interface to non-python executables.

API Documentation

Example

"""
You can execute this pipeline by:

    python examples/01-tasks/scripts.py

The command can be anything that can be executed in a shell.
The stdout/stderr of the execution is captured as execution log and stored in the catalog.

For example:

.catalog
└── seasoned-perlman-1355
    └── hello.execution.log

"""

from runnable import Pipeline, ShellTask


def main():
    # If this step executes successfully, the pipeline will terminate with success
    hello_task = ShellTask(
        name="hello",
        command="echo 'Hello World!'",
        terminate_with_success=True,
    )

    # The pipeline has only one step.
    pipeline = Pipeline(steps=[hello_task])

    pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()
dag:
  description: |
    This is a sample pipeline with one step that
    executes a shell command.

    You can run this pipeline by:
    runnable execute -f examples/01-tasks/scripts.yaml

    For example:
    .catalog
    └── seasoned-perlman-1355
        └── hello.execution.log

  start_at: shell
  steps:
    shell:
      type: task
      command_type: shell
      command: echo "hello world!!" # The path is relative to the root of the project.
      next: success

Stub

Stub nodes in runnable are just like pass or ... in python code. It is a placeholder and useful when you want to debug ordesign your pipeline.

Stub nodes can take arbitrary number of parameters and is always a success.

API Documentation

Example

Intuition

Designing a pipeline is similar to writing a modular program. Stub nodes are handy to create a placeholder for some step that will be implemented in the future.

During debugging, changing a node to stub will let you focus on the actual bug without having to execute the additional steps.

"""
This is a simple pipeline that does 3 steps in sequence.

    step 1 >> step 2 >> step 3 >> success

    All the steps are mocked and they will just pass through.
    Use this pattern to define the skeleton of your pipeline
    and flesh out the steps later.

    Note that you can give any arbitrary keys to the steps
    (like step 2).
    This is handy to mock steps within mature pipelines.

    You can run this pipeline by:
       python examples/01-tasks/stub.py
"""

from runnable import Pipeline, Stub


def main():
    # this will always succeed
    step1 = Stub(name="step1")

    # It takes arbitrary arguments
    # Useful for temporarily silencing steps within
    # mature pipelines
    step2 = Stub(name="step2", what="is this thing")

    step3 = Stub(name="step3", terminate_with_success=True)

    pipeline = Pipeline(steps=[step1, step2, step3])

    pipeline.execute()

    # A function that creates pipeline should always return a
    # Pipeline object
    return pipeline


if __name__ == "__main__":
    main()
dag:
  description: |
    This is a simple pipeline that does 3 steps in sequence.

    step 1 >> step 2 >> step 3 >> success

    All the steps are mocked and they will just pass through.
    Use this pattern to define the skeleton of your pipeline
    and flesh out the steps later.

    Note that you can give any arbitrary keys to the steps
    (like step 2).
    This is handy to mock steps within mature pipelines.

    You can run this pipeline by:
       runnable execute -f examples/01-tasks/stub.yaml
  start_at: step 1
  steps:
    step 1:
      type: stub # This will always succeed
      next: step 2
    step 2:
      type: stub
      what: is this thing? # It takes arbitrary keys
      It: does not matter!!
      next: step 3
    step 3:
      type: stub
      next: success