Skip to content

Parameters

parameters are data that can be passed from one task to another.

Concept

For example, in the below snippet, the parameters x and y are passed from generate to consume.

x, y = generate() # returns x and y as output
consume(x, y) # consumes x, y as input arguments.

The data types of x and y can be:

  • JSON serializable: int, string, float, list, dict including pydantic models.
  • Objects: Any dill friendly objects.

Compatibility

Below table summarizes the input/output types of different task types. For ex: notebooks can only take JSON serializable parameters as input but can return json/pydantic/objects.

Task Input Output
python json, pydantic, object via function arguments json, pydantic, object as returns
notebook json via cell tagged with parameters json, pydantic, object as returns
shell json via environment variables json environmental variables as returns

Project parameters

Project parameters can be defined using a yaml file. These parameters can then be over-ridden by tasks of the pipeline.

They can also be provided by environment variables prefixed by RUNNABLE_PRM_. Environmental variables over-ride yaml parameters.

Type casting

Annotating the arguments of python function ensures the right data type of arguments.

It is advised to cast the parameters in notebook tasks or shell.

Deeply nested yaml objects are supported.

integer: 1
floater : 3.14
stringer : hello
pydantic_param:
  x: 10
  foo: bar

chunks: [1, 2, 3]

The yaml formatted parameters can also be defined as:

export runnable_PRM_integer="1"
export runnable_PRM_floater="3.14"
export runnable_PRM_stringer="hello"
export runnable_PRM_pydantic_param="{'x': 10, 'foo': bar}"
export runnable_PRM_chunks="[1, 2, 3]"

Parameters defined by environment variables override parameters defined by yaml. This can be useful to do a quick experimentation without changing code.

Accessing parameters

The functions have arguments that correspond to the project parameters.

Without annotations for nested params, they are sent in as dictionary.

"""
The below example showcases setting up known initial parameters for a pipeline
of only python tasks

The initial parameters as defined in the yaml file are:
    simple: 1
    complex_param:
        x: 10
        y: "hello world!!"

runnable allows using pydantic models for deeply nested parameters and
casts appropriately based on annotation. eg: read_initial_params_as_pydantic

If no annotation is provided, the parameter is assumed to be a dictionary.
eg: read_initial_params_as_json

You can set the initial parameters from environment variables as well.
eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable

Run this pipeline as:
    python examples/03-parameters/static_parameters_python.py

"""

import os

from examples.common.functions import (
    read_initial_params_as_json,
    read_initial_params_as_pydantic,
)
from runnable import Pipeline, PythonTask


def main():
    """
    Signature of read_initial_params_as_pydantic
    def read_initial_params_as_pydantic(
        integer: int,
        floater: float,
        stringer: str,
        pydantic_param: ComplexParams,
        envvar: str,
    ):
    """
    read_params_as_pydantic = PythonTask(
        function=read_initial_params_as_pydantic,
        name="read_params_as_pydantic",
    )

    """
    Signature of read_initial_params_as_json
    def read_initial_params_as_json(
        integer: int,
        floater: float,
        stringer: str,
        pydantic_param: Dict[str, Union[int, str]],
    ):
    """
    read_params_as_json = PythonTask(
        function=read_initial_params_as_json,
        terminate_with_success=True,
        name="read_params_json",
    )

    pipeline = Pipeline(
        steps=[read_params_as_pydantic, read_params_as_json],
    )

    _ = pipeline.execute(parameters_file="examples/common/initial_parameters.yaml")

    return pipeline


if __name__ == "__main__":
    # Any parameter prefixed by "RUNNABLE_PRM_" will be picked up by runnable
    os.environ["RUNNABLE_PRM_envvar"] = "from env"
    main()

The notebook has cell tagged with parameters which are substituted at run time.

The shell script has access to them as environmental variables.

"""
The below example showcases setting up known initial parameters for a pipeline
of notebook and shell based commands.

The initial parameters as defined in the yaml file are:
    integer: 1
    floater : 3.14
    stringer : hello
    pydantic_param:
        x: 10
        foo: bar

runnable exposes the nested parameters as dictionary for notebook based tasks
and as a json string for the shell based tasks.

You can set the initial parameters from environment variables as well.
eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable


Run this pipeline as:
    python examples/03-parameters/static_parameters_non_python.py
"""

from runnable import NotebookTask, Pipeline, ShellTask


def main():
    read_params_in_notebook = NotebookTask(
        name="read_params_in_notebook",
        notebook="examples/common/read_parameters.ipynb",
    )

    shell_command = """
    if [ "$integer" = 1 ] \
    && [ "$floater" = 3.14 ] \
    && [ "$stringer" = "hello" ] \
    && [ "$pydantic_param" = '{"x": 10, "foo": "bar"}' ]; then
        echo "yaay"
        exit 0;
    else
        echo "naay"
        exit 1;
    fi
    """
    read_params_in_shell = ShellTask(
        name="read_params_in_shell",
        command=shell_command,
        terminate_with_success=True,
    )

    pipeline = Pipeline(
        steps=[read_params_in_notebook, read_params_in_shell],
    )

    _ = pipeline.execute(parameters_file="examples/common/initial_parameters.yaml")

    return pipeline


if __name__ == "__main__":
    main()

Access & returns

access

The access of parameters returned by upstream tasks is similar to project parameters

returns

Tasks can return parameters which can then be accessed by downstream tasks.

The syntax is inspired by:

def generate():
    ...
    return x, y

def consume(x, y):
    ...

x, y = generate() # returns x and y as output
consume(x, y) # consumes x, y as input arguments.

and implemented in runnable as:

from runnable import PythonTask
# The returns syntax can be used for notebook and shell scripts too.
generate_task = PythonTask(function="generate", returns=["x", "y"])
consume_task = PythonTask(function="consume")
generate:
type: task
command: generate
next: consume
returns:
    - name: x
    - name: y
consume:
...

order of returns

The order of returns should match the order of the python function returning them.

marking returns as metric or object

JSON style parameters can be marked as a metric in python functions, notebook, shell. Metric parameters can be accessed as normal parameters in downstream steps.

Returns marked as pickled in python functions, notebook are serialized using dill.

Example

import pandas as pd

# Assuming a function return a pandas dataframe and a score
def generate():
    ...
    return df, score

# Downstream step consuming the df and score
def consume(df: pd.Dataframe, score: float):
    ...
from runnable import metric, pickled, PythonTask

generate_task = PythonTask(function="generate",
                    returns=[pickled("df"),  # pickle df
                            metric("score")]) # mark score as metric

consume_task = PythonTask(function="consume")
generate:
type: task
command: generate
next: consume
returns:
    - name: df
      kind: object
    - name: score
      kind: metric
consume:
...

Complete Example

"""
The below example shows how to set/get parameters in python
tasks of the pipeline.

The function, set_parameter, returns
    - JSON serializable types
    - pydantic models
    - pandas dataframe, any "object" type

pydantic models are implicitly handled by runnable
but "object" types should be marked as "pickled".

Use pickled even for python data types is advised for
reasonably large collections.

Run the below example as:
    python examples/03-parameters/passing_parameters_python.py

"""

from examples.common.functions import read_parameter, write_parameter
from runnable import Pipeline, PythonTask, metric, pickled


def main():
    write_parameters = PythonTask(
        function=write_parameter,
        returns=[
            pickled("df"),
            "integer",
            "floater",
            "stringer",
            "pydantic_param",
            metric("score"),
        ],
        name="set_parameter",
    )

    read_parameters = PythonTask(
        function=read_parameter,
        terminate_with_success=True,
        name="get_parameters",
    )

    pipeline = Pipeline(
        steps=[write_parameters, read_parameters],
    )

    _ = pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()
dag:
  description: |
    The below example shows how to set/get parameters in python
    tasks of the pipeline.

    The function, set_parameter, returns
        - JSON serializable
        - pydantic models
        - pandas dataframe, any "object" type

    pydantic models are implicitly handled by runnable
    but "object" types should be marked as "pickled".

    Use pickled even for python data types is advised for
    reasonably large collections.

    Run the pipeline as:
      runnable execute -f examples/03-parameters/passing_parameters_python.yaml
  start_at: write_parameters
  steps:
    write_parameters:
      type: task
      command: examples.common.functions.write_parameter
      returns:
        - name: df
          kind: object
        - name: integer
        - name: floater
        - name: stringer
        - name: pydantic_param
        - name: score
          kind: metric

      next: read_parameters
    read_parameters:
      type: task
      command: examples.common.functions.read_parameter
      next: success

To access parameters, the cell should be tagged with parameters. Only JSON style parameters can be injected in.

Any python variable defined during the execution of the notebook matching the name in returns is inferred as a parameter. The variable can be either JSON type or objects.

"""
Demonstrates passing parameters to and from a notebook.

runnable can extract JSON serializable types, pydantic models, objects from notebook.
eg: write_parameters_from_notebook

But can only inject JSON type parameters to a notebook.
eg: read_parameters_in_notebook
pydantic parameters are injected as dictionary.

Run the below example as:
    python examples/03-parameters/passing_parameters_notebook.py

"""

from examples.common.functions import read_parameter
from runnable import NotebookTask, Pipeline, PythonTask, metric, pickled


def main():
    write_parameters_from_notebook = NotebookTask(
        notebook="examples/common/write_parameters.ipynb",
        returns=[
            pickled("df"),
            "integer",
            "floater",
            "stringer",
            "pydantic_param",
            metric("score"),
        ],
        name="set_parameter",
    )

    read_parameters = PythonTask(
        function=read_parameter,
        name="get_parameters",
    )

    read_parameters_in_notebook = NotebookTask(
        notebook="examples/common/read_parameters.ipynb",
        terminate_with_success=True,
        name="read_parameters_in_notebook",
    )

    pipeline = Pipeline(
        steps=[
            write_parameters_from_notebook,
            read_parameters,
            read_parameters_in_notebook,
        ],
    )

    _ = pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()
dag:
  description: |
    Demonstrates passing parameters to and from a notebook.

    runnable can extract JSON serializable types, pydantic models, objects from notebook.
    eg: write_parameters_from_notebook

    But can only inject JSON type parameters to a notebook.
    eg: read_parameters_in_notebook
    pydantic parameters are injected as dictionary.

    Run the below example as:
        runnable execute examples/03-parameters/passing_parameters_notebook.yaml
  start_at: write_parameters_from_notebook
  steps:
    write_parameters_from_notebook:
      type: task
      command_type: notebook
      command: examples/common/write_parameters.ipynb
      returns:
        - name: df
          kind: object
        - name: integer
        - name: floater
        - name: stringer
        - name: pydantic_param
        - name: score
          kind: metric
      next: read_parameters
    read_parameters:
      type: task
      command: examples.common.functions.read_parameter
      next: read_parameters_in_notebook
    read_parameters_in_notebook:
      type: task
      command_type: notebook
      command: examples/common/read_parameters.ipynb
      next: success

Shell tasks can only access/return JSON style parameters

"""
Demonstrates passing parameters to and from shell scripts.

We can extract only JSON serializable parameters from shell scripts.
eg: write_parameters_in_shell

We can only read json style parameters from shell scripts.
eg: read_parameters_in_shell
pydantic parameters are injected as json.

Run the below example as:
    python examples/03-parameters/passing_parameters_shell.py

"""

from examples.common.functions import read_unpickled_parameter
from runnable import Pipeline, PythonTask, ShellTask, metric


def main():
    export_env_command = """
    export integer=1
    export floater=3.14
    export stringer="hello"
    export pydantic_param='{"x": 10, "foo": "bar"}'
    export score=0.9
    """
    write_parameters_in_shell = ShellTask(
        command=export_env_command,
        returns=[
            "integer",
            "floater",
            "stringer",
            "pydantic_param",
            metric("score"),
        ],
        name="write_parameter",
    )

    read_parameters = PythonTask(
        function=read_unpickled_parameter,
        name="read_parameters",
    )

    read_parameters_command = """
    if [ "$integer" = 1 ] \
        && [ "$floater" = 3.14 ] \
        && [ "$stringer" = "hello" ] \
        && [ "$pydantic_param" = '{"x": 10, "foo": "bar"}' ]; then
            echo "yaay"
            exit 0;
        else
            echo "naay"
            exit 1;
    fi
    """
    read_parameters_in_shell = ShellTask(
        name="read_parameters_in_shell",
        command=read_parameters_command,
        terminate_with_success=True,
    )

    pipeline = Pipeline(
        steps=[write_parameters_in_shell, read_parameters, read_parameters_in_shell],
    )

    _ = pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()
dag:
  description: |
    Demonstrates passing parameters to and from shell scripts.

    We can extract only json style parameters from shell scripts.
    eg: write_parameters_in_shell

    We can only read json style parameters from shell scripts.
    eg: read_parameters_in_shell
    pydantic parameters are injected as json.

    Run the pipeline as:
      runnable execute -f examples/03-parameters/passing_parameters_shell.yaml

  start_at: write_parameters_in_shell
  steps:
    write_parameters_in_shell:
      type: task
      command_type: shell
      command: |
        export integer=1
        export floater=3.14
        export stringer="hello"
        export pydantic_param='{"x": 10, "foo": "bar"}'
        export score=0.9
      returns:
        - name: integer
        - name: floater
        - name: stringer
        - name: pydantic_param
        - name: score
      next: read_parameters
    read_parameters:
      type: task
      command: examples.common.functions.read_unpickled_parameter
      next: read_parameters_in_shell
    read_parameters_in_shell:
      type: task
      command_type: shell
      command: |
        if [ "$integer" = 1 ] \
          && [ "$floater" = 3.14 ] \
          && [ "$stringer" = "hello" ] \
          && [ "$pydantic_param" = '{"x": 10, "foo": "bar"}' ]; then
              echo "yaay"
              exit 0;
        else
              echo "naay"
              exit 1;
        fi
      next: success