Skip to content

🔗 Connecting Functions

The magic happens when you chain functions together. Runnable makes this effortless.

What you already know

You probably chain functions like this:

def write_parameter():
    df = pd.DataFrame({"x": [1, 2, 3]})
    return df, 10, 3.14, "hello", SamplePydanticModel(x=10, foo="bar"), 0.95

def read_parameter(df, integer, floater, stringer, pydantic_param, score):
    print(f"Received: df={len(df)} rows, integer={integer}, score={score}")
    return df.mean()

# Manual chaining
df, integer, floater, stringer, pydantic_param, score = write_parameter()
result = read_parameter(df, integer, floater, stringer, pydantic_param, score)

Runnable does the chaining for you

Same functions, automatic parameter passing:

from runnable import Pipeline, PythonTask, pickled, metric

def main():
    # Step 1: Create data with named outputs
    step1 = PythonTask(
        function=write_parameter,
        returns=[pickled("df"), "integer", "floater", "stringer", "pydantic_param", metric("score")]
    )

    # Step 2: Process data - parameters matched automatically!
    step2 = PythonTask(function=read_parameter)

    pipeline = Pipeline(steps=[step1, step2])
    pipeline.execute()
    return pipeline

if __name__ == "__main__":
    main()

Magic: The df returned by write_parameter automatically becomes the df parameter for read_parameter.

See complete runnable code
examples/03-parameters/passing_parameters_python.py
"""
The below example shows how to set/get parameters in python
tasks of the pipeline.

The function, set_parameter, returns
    - JSON serializable types
    - pydantic models
    - pandas dataframe, any "object" type

pydantic models are implicitly handled by runnable
but "object" types should be marked as "pickled".

Use pickled even for python data types is advised for
reasonably large collections.

Run the below example as:
    python examples/03-parameters/passing_parameters_python.py

"""

from examples.common.functions import read_parameter, write_parameter
from runnable import Pipeline, PythonTask, metric, pickled


def main():
    write_parameters = PythonTask(
        function=write_parameter,
        returns=[
            pickled("df"),
            "integer",
            "floater",
            "stringer",
            "pydantic_param",
            metric("score"),
        ],
        name="set_parameter",
    )

    read_parameters = PythonTask(
        function=read_parameter,
        terminate_with_success=True,
        name="get_parameters",
    )

    pipeline = Pipeline(
        steps=[write_parameters, read_parameters],
    )

    _ = pipeline.execute()

    return pipeline


if __name__ == "__main__":
    main()

Try it now:

uv run examples/03-parameters/passing_parameters_python.py

How it works

  1. Step 1 returns values with names: returns=["df", "score"]
  2. Step 2 function signature: def analyze(df, score):
  3. Runnable matches return names to parameter names automatically

Mix different task types

Python functions, notebooks, and shell scripts all work together:

from runnable import Pipeline, PythonTask, NotebookTask, ShellTask

def main():
    pipeline = Pipeline(steps=[
        PythonTask(function=create_data, returns=[pickled("df")]),
        NotebookTask(notebook_path="process.ipynb", returns=["processed_df"]),
        ShellTask(command="./analyze.sh", returns=["report_path"]),
        PythonTask(function=send_email)  # Gets report_path automatically
    ])
    pipeline.execute()
    return pipeline

if __name__ == "__main__":
    main()
See complete runnable code
examples/02-sequential/traversal.py
"""
You can execute this pipeline by:

    python examples/02-sequential/traversal.py

A pipeline can have any "tasks" as part of it. In the
below example, we have a mix of stub, python, shell and notebook tasks.

As with simpler tasks, the stdout and stderr of each task are captured
and stored in the catalog.
"""

from examples.common.functions import hello
from runnable import NotebookTask, Pipeline, PythonTask, ShellTask, Stub


def main():
    stub_task = Stub(name="hello stub")  # [concept:stub-task]

    python_task = PythonTask(  # [concept:python-task]
        name="hello python", function=hello, overrides={"argo": "smaller"}
    )

    shell_task = ShellTask(  # [concept:shell-task]
        name="hello shell",
        command="echo 'Hello World!'",
    )

    notebook_task = NotebookTask(  # [concept:notebook-task]
        name="hello notebook",
        notebook="examples/common/simple_notebook.ipynb",
    )

    # The pipeline has a mix of tasks.
    # The order of execution follows the order of the tasks in the list.
    pipeline = Pipeline(  # [concept:pipeline]
        steps=[  # (2)
            stub_task,  # (1)
            python_task,
            shell_task,
            notebook_task,
        ]
    )

    pipeline.execute()  # [concept:execution]

    return pipeline


if __name__ == "__main__":
    main()

Try it now:

uv run examples/02-sequential/traversal.py

Parameter matching

Return names must match parameter names. returns=["data"]def process(data):

Parameter Type Compatibility

Parameter passing works between task types, but with important constraints based on data types and how each task type receives parameters:

How Parameters Are Passed:

Task Type How Parameters Are Received Input Parameters Output Parameters
Python Function arguments All types (primitive, pickled, pydantic, metric) All types (primitive, pickled, pydantic, metric)
Notebook Tagged parameter cells (variables replaced) Python primitives only (int, str, float, list, dict) All types (primitive, pickled, pydantic, metric)
Shell Environment variables Python primitives only (int, str, float, list, dict) Python primitives only (int, str, float, list, dict)

Notebook Parameter Mechanism:

Notebooks receive parameters through tagged cells where variable values are replaced:

# In your notebook's first cell (tagged as "parameters"):
count = None      # This will be replaced with actual value
status = None     # This will be replaced with actual value

✅ This works:

def main():
    Pipeline(steps=[
        PythonTask(function=extract_data, returns=["count", "status"]),    # primitives →
        NotebookTask(notebook="clean.ipynb", returns=["df"]),              # → notebook receives via parameter cells
        PythonTask(function=analyze, returns=["report"])                   # → python can receive pickled df
    ]).execute()

if __name__ == "__main__":
    main()

❌ This won't work:

def main():
    Pipeline(steps=[
        PythonTask(function=create_model, returns=[pickled("model")]),     # pickled object →
        NotebookTask(notebook="use_model.ipynb")                           # → notebook can't receive pickled objects!
    ]).execute()

if __name__ == "__main__":
    main()

Next: Understand when to use jobs vs pipelines.