🔗 Connecting Functions¶
The magic happens when you chain functions together. Runnable makes this effortless.
What you already know¶
You probably chain functions like this:
def write_parameter():
df = pd.DataFrame({"x": [1, 2, 3]})
return df, 10, 3.14, "hello", SamplePydanticModel(x=10, foo="bar"), 0.95
def read_parameter(df, integer, floater, stringer, pydantic_param, score):
print(f"Received: df={len(df)} rows, integer={integer}, score={score}")
return df.mean()
# Manual chaining
df, integer, floater, stringer, pydantic_param, score = write_parameter()
result = read_parameter(df, integer, floater, stringer, pydantic_param, score)
Runnable does the chaining for you¶
Same functions, automatic parameter passing:
from runnable import Pipeline, PythonTask, pickled, metric
def main():
# Step 1: Create data with named outputs
step1 = PythonTask(
function=write_parameter,
returns=[pickled("df"), "integer", "floater", "stringer", "pydantic_param", metric("score")]
)
# Step 2: Process data - parameters matched automatically!
step2 = PythonTask(function=read_parameter)
pipeline = Pipeline(steps=[step1, step2])
pipeline.execute()
return pipeline
if __name__ == "__main__":
main()
✨ Magic: The df returned by write_parameter automatically becomes the df parameter for read_parameter.
See complete runnable code
"""
The below example shows how to set/get parameters in python
tasks of the pipeline.
The function, set_parameter, returns
- JSON serializable types
- pydantic models
- pandas dataframe, any "object" type
pydantic models are implicitly handled by runnable
but "object" types should be marked as "pickled".
Use pickled even for python data types is advised for
reasonably large collections.
Run the below example as:
python examples/03-parameters/passing_parameters_python.py
"""
from examples.common.functions import read_parameter, write_parameter
from runnable import Pipeline, PythonTask, metric, pickled
def main():
write_parameters = PythonTask(
function=write_parameter,
returns=[
pickled("df"),
"integer",
"floater",
"stringer",
"pydantic_param",
metric("score"),
],
name="set_parameter",
)
read_parameters = PythonTask(
function=read_parameter,
terminate_with_success=True,
name="get_parameters",
)
pipeline = Pipeline(
steps=[write_parameters, read_parameters],
)
_ = pipeline.execute()
return pipeline
if __name__ == "__main__":
main()
Try it now:
How it works¶
- Step 1 returns values with names:
returns=["df", "score"] - Step 2 function signature:
def analyze(df, score): - Runnable matches return names to parameter names automatically
Mix different task types¶
Python functions, notebooks, and shell scripts all work together:
from runnable import Pipeline, PythonTask, NotebookTask, ShellTask
def main():
pipeline = Pipeline(steps=[
PythonTask(function=create_data, returns=[pickled("df")]),
NotebookTask(notebook_path="process.ipynb", returns=["processed_df"]),
ShellTask(command="./analyze.sh", returns=["report_path"]),
PythonTask(function=send_email) # Gets report_path automatically
])
pipeline.execute()
return pipeline
if __name__ == "__main__":
main()
See complete runnable code
"""
You can execute this pipeline by:
python examples/02-sequential/traversal.py
A pipeline can have any "tasks" as part of it. In the
below example, we have a mix of stub, python, shell and notebook tasks.
As with simpler tasks, the stdout and stderr of each task are captured
and stored in the catalog.
"""
from examples.common.functions import hello
from runnable import NotebookTask, Pipeline, PythonTask, ShellTask, Stub
def main():
stub_task = Stub(name="hello stub") # [concept:stub-task]
python_task = PythonTask( # [concept:python-task]
name="hello python", function=hello, overrides={"argo": "smaller"}
)
shell_task = ShellTask( # [concept:shell-task]
name="hello shell",
command="echo 'Hello World!'",
)
notebook_task = NotebookTask( # [concept:notebook-task]
name="hello notebook",
notebook="examples/common/simple_notebook.ipynb",
)
# The pipeline has a mix of tasks.
# The order of execution follows the order of the tasks in the list.
pipeline = Pipeline( # [concept:pipeline]
steps=[ # (2)
stub_task, # (1)
python_task,
shell_task,
notebook_task,
]
)
pipeline.execute() # [concept:execution]
return pipeline
if __name__ == "__main__":
main()
Try it now:
Parameter matching
Return names must match parameter names. returns=["data"] → def process(data):
Parameter Type Compatibility
Parameter passing works between task types, but with important constraints based on data types and how each task type receives parameters:
How Parameters Are Passed:
| Task Type | How Parameters Are Received | Input Parameters | Output Parameters |
|---|---|---|---|
| Python | Function arguments | All types (primitive, pickled, pydantic, metric) | All types (primitive, pickled, pydantic, metric) |
| Notebook | Tagged parameter cells (variables replaced) | Python primitives only (int, str, float, list, dict) | All types (primitive, pickled, pydantic, metric) |
| Shell | Environment variables | Python primitives only (int, str, float, list, dict) | Python primitives only (int, str, float, list, dict) |
Notebook Parameter Mechanism:
Notebooks receive parameters through tagged cells where variable values are replaced:
# In your notebook's first cell (tagged as "parameters"):
count = None # This will be replaced with actual value
status = None # This will be replaced with actual value
✅ This works:
def main():
Pipeline(steps=[
PythonTask(function=extract_data, returns=["count", "status"]), # primitives →
NotebookTask(notebook="clean.ipynb", returns=["df"]), # → notebook receives via parameter cells
PythonTask(function=analyze, returns=["report"]) # → python can receive pickled df
]).execute()
if __name__ == "__main__":
main()
❌ This won't work:
Next: Understand when to use jobs vs pipelines.