🔄 Map Patterns¶
Process collections of data with the same workflow - like a for loop for pipelines.
The basic pattern¶
flowchart TD
A[Start] --> B[Map Step]
B --> C["chunks = 1, 2, 3"]
C --> D{For each chunk}
D --> E[Process chunk=1]
D --> F[Process chunk=2]
D --> G[Process chunk=3]
E --> H[Result: 10]
F --> I[Result: 20]
G --> J[Result: 30]
H --> K["Collect: 10, 20, 30"]
I --> K
J --> K
K --> L[Continue]
from runnable import Map, Pipeline, PythonTask
def main():
# Create a map step that processes each chunk
map_state = Map(
name="process_chunks",
iterate_on="chunks", # Parameter name containing [1, 2, 3]
iterate_as="chunk", # Each iteration gets chunk=1, chunk=2, chunk=3
branch=create_processing_workflow() # Same workflow runs for each chunk
)
# Collect results after all iterations
collect_results = PythonTask(function=collect_all_results, name="collect")
pipeline = Pipeline(steps=[map_state, collect_results])
pipeline.execute(parameters_file="parameters.yaml") # Contains chunks: [1, 2, 3]
return pipeline
# Helper function to create the processing workflow
def create_processing_workflow():
# Implementation details in complete example below
pass
def collect_all_results():
# Implementation details in complete example below
pass
if __name__ == "__main__":
main()
See complete runnable code
examples/07-map/map.py
"""
You can execute this pipeline by:
python examples/07-map/map.py
"""
from examples.common.functions import (
assert_default_reducer,
process_chunk,
read_processed_chunk,
)
from runnable import Map, NotebookTask, Pipeline, PythonTask, ShellTask, Stub
def iterable_branch(execute: bool = True):
"""
Use the pattern of using "execute" to control the execution of the pipeline.
The same pipeline can be run independently from the command line.
WARNING: If the execution is not controlled by "execute", the pipeline will be executed
even during the definition of the branch in parallel steps.
"""
# The python function to process a single chunk of data.
# In the example, we are multiplying the chunk by 10.
process_chunk_task_python = PythonTask(
name="execute_python",
function=process_chunk,
returns=["processed_python"],
)
# return parameters within a map branch have to be unique
# The notebook takes in the value of processed_python as an argument.
# and returns a new parameter "processed_notebook" which is 10*processed_python
process_chunk_task_notebook = NotebookTask(
name="execute_notebook",
notebook="examples/common/process_chunk.ipynb",
returns=["processed_notebook"],
)
# following the pattern, the shell takes in the value of processed_notebook as an argument.
# and returns a new parameter "processed_shell" which is 10*processed_notebook.
shell_command = """
if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \
&& [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then
echo "yaay"
else
echo "naay"
exit 1;
fi
export processed_shell=$( expr 10 '*' "$processed_notebook")
"""
process_chunk_task_shell = ShellTask(
name="execute_shell",
command=shell_command,
returns=["processed_shell"],
)
# A downstream step of process_<python, notebook, shell> which reads the parameter "processed".
# The value of processed is within the context of the branch.
# For example, for chunk=1, the value of processed_python is chunk*10 = 10
# the value of processed_notebook is processed_python*10 = 100
# the value of processed_shell is processed_notebook*10 = 1000
read_chunk = PythonTask(
name="read processed chunk",
function=read_processed_chunk,
terminate_with_success=True,
)
pipeline = Pipeline(
steps=[
process_chunk_task_python,
process_chunk_task_notebook,
process_chunk_task_shell,
read_chunk,
],
)
if execute:
pipeline.execute()
return pipeline
def main():
# Create a map state which iterates over a list of chunks.
# chunk is the value of the iterable.
map_state = Map( # [concept:map]
name="map_state",
iterate_on="chunks", # [concept:iteration-parameter]
iterate_as="chunk", # [concept:iterator-variable]
branch=iterable_branch(execute=False), # [concept:branch-pipeline]
)
# Outside of the loop, processed is a list of all the processed chunks.
# This is also called as the reduce pattern.
# the value of processed_python is [10, 20, 30]
# the value of processed_notebook is [100, 200, 300]
# the value of processed_shell is [1000, 2000, 3000]
collect = PythonTask( # [concept:collect-step]
name="collect",
function=assert_default_reducer,
)
continue_to = Stub(name="continue to")
pipeline = Pipeline(steps=[map_state, collect]) # [concept:pipeline]
pipeline.execute(parameters_file="examples/common/initial_parameters.yaml") # [concept:execution]
return pipeline
if __name__ == "__main__":
main()
Try it now:
Like writing:
chunks = [1, 2, 3]
results = []
for chunk in chunks:
result = process_chunk(chunk)
results.append(result)
The branch workflow¶
Each iteration runs this pipeline with different chunk values:
Helper function (creates the workflow for each iteration):
def create_processing_workflow():
from runnable import Pipeline, PythonTask, NotebookTask, ShellTask
# Process chunk through multiple steps
python_step = PythonTask(
function=process_chunk, # chunk=1 → processed_python=10
returns=["processed_python"]
)
notebook_step = NotebookTask(
notebook="process_chunk.ipynb", # processed_python=10 → processed_notebook=100
returns=["processed_notebook"]
)
shell_step = ShellTask(
command="./process_chunk.sh", # processed_notebook=100 → processed_shell=1000
returns=["processed_shell"]
)
return Pipeline(steps=[python_step, notebook_step, shell_step])
Each iteration runs the same pipeline structure:
flowchart LR
A[chunk] --> B[Python Task]
B --> C[Notebook Task]
C --> D[Shell Task]
D --> E[Read Task]
E --> F[returns: processed]
🔧 Custom reducers¶
By default, map collects all results into lists. Customize this with reducers:
flowchart TD
A["Results: 10, 20, 30"] --> B{Reducer}
B --> C["Default: 10, 20, 30"]
B --> D[Max: 30]
B --> E[Sum: 60]
B --> F[Count: 3]
from runnable import Map, Pipeline
def main():
# Use custom reducer to aggregate results
map_state = Map(
name="process_with_max",
iterate_on="chunks", # [1, 2, 3]
iterate_as="chunk",
reducer="lambda *x: max(x)", # Take maximum instead of collecting all
branch=create_processing_workflow()
)
pipeline = Pipeline(steps=[map_state])
pipeline.execute(parameters_file="parameters.yaml") # Contains chunks: [1, 2, 3]
return pipeline
# Results: processed_python = max(10, 20, 30) = 30
# processed_notebook = max(100, 200, 300) = 300
# processed_shell = max(1000, 2000, 3000) = 3000
if __name__ == "__main__":
main()
See complete runnable code
examples/07-map/custom_reducer.py
"""
map states allows to repeat a branch for each value of an iterable.
The below example can written, in python, as:
chunks = [1, 2, 3]
for chunk in chunks:
# Any task within the pipeline can access the value of chunk as an argument.
processed = process_chunk(chunk)
# The value of processed for every iteration is the value returned by the steps
# of the current execution. For example, the value of processed
# for chunk=1, is chunk*10 = 10 for downstream steps.
read_processed_chunk(chunk, processed)
It is possible to use a custom reducer, for example, this reducer is a max of the collection.
# Once the reducer is applied, processed is reduced to a single value.
assert processed == max(chunk * 10 for chunk in chunks)
You can execute this pipeline by:
python examples/07-map/custom_reducer.py
"""
from examples.common.functions import (
assert_custom_reducer,
process_chunk,
read_processed_chunk,
)
from runnable import Map, NotebookTask, Pipeline, PythonTask, ShellTask
def iterable_branch(execute: bool = True):
"""
Use the pattern of using "execute" to control the execution of the pipeline.
The same pipeline can be run independently from the command line.
WARNING: If the execution is not controlled by "execute", the pipeline will be executed
even during the definition of the branch in parallel steps.
"""
# The python function to process a single chunk of data.
# In the example, we are multiplying the chunk by 10.
process_chunk_task_python = PythonTask(
name="execute_python",
function=process_chunk,
returns=["processed_python"],
)
# return parameters within a map branch have to be unique
# The notebook takes in the value of processed_python as an argument.
# and returns a new parameter "processed_notebook" which is 10*processed_python
process_chunk_task_notebook = NotebookTask(
name="execute_notebook",
notebook="examples/common/process_chunk.ipynb",
returns=["processed_notebook"],
)
# following the pattern, the shell takes in the value of processed_notebook as an argument.
# and returns a new parameter "processed_shell" which is 10*processed_notebook.
shell_command = """
if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \
&& [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then
echo "yaay"
else
echo "naay"
exit 1;
fi
export processed_shell=$( expr 10 '*' "$processed_notebook")
"""
process_chunk_task_shell = ShellTask(
name="execute_shell",
command=shell_command,
returns=["processed_shell"],
)
# A downstream step of process_<python, notebook, shell> which reads the parameter "processed".
# The value of processed is within the context of the branch.
# For example, for chunk=1, the value of processed_python is chunk*10 = 10
# the value of processed_notebook is processed_python*10 = 100
# the value of processed_shell is processed_notebook*10 = 1000
read_chunk = PythonTask(
name="read processed chunk",
function=read_processed_chunk,
terminate_with_success=True,
)
pipeline = Pipeline(
steps=[
process_chunk_task_python,
process_chunk_task_notebook,
process_chunk_task_shell,
read_chunk,
],
)
if execute:
pipeline.execute()
return pipeline
def main():
# Create a map state which iterates over a list of chunks.
# chunk is the value of the iterable.
# Upon completion of the map state, all the parameters of the tasks
# within the pipeline will be processed by the reducer.
# In this case, the reducer is the max of all the processed chunks.
map_state = Map( # [concept:map-with-custom-reducer]
name="map state",
iterate_on="chunks",
iterate_as="chunk",
reducer="lambda *x: max(x)", # [concept:custom-reducer]
branch=iterable_branch(execute=False),
)
collect = PythonTask( # [concept:collect-step]
name="collect",
function=assert_custom_reducer,
terminate_with_success=True,
)
pipeline = Pipeline(steps=[map_state, collect]) # [concept:pipeline]
pipeline.execute(parameters_file="examples/common/initial_parameters.yaml") # [concept:execution]
return pipeline
if __name__ == "__main__":
main()
Try it now:
Reducer Options¶
Lambda functions:
"lambda *x: max(x)"→ Maximum value"lambda *x: sum(x)"→ Sum all values"lambda *x: len(x)"→ Count items"lambda *x: x[0]"→ Take first result only
Python functions in dot notation:
You can also reference Python functions using dot notation:
# If you have a function in a module
# my_module.py:
def custom_max_reducer(*results):
return max(results)
# Use it in Map with dot notation
map_state = Map(
name="process_with_custom_reducer",
iterate_on="chunks",
iterate_as="chunk",
reducer="my_module.custom_max_reducer", # Reference function by module path
branch=create_processing_workflow()
)
Built-in function references:
# Use built-in functions
reducer="max" # Built-in max function
reducer="sum" # Built-in sum function
reducer="len" # Built-in len function
# Use functions from standard library modules
reducer="statistics.mean" # Average of results
reducer="statistics.median" # Median of results
reducer="operator.add" # Sum using operator module
When to use map¶
Perfect for:
- Processing file collections
- Batch processing data chunks
- Cross-validation in ML
- Parameter sweeps
- A/B testing multiple variants
Example use cases:
# Process multiple datasets
iterate_on="datasets", iterate_as="dataset"
# Test hyperparameters
iterate_on="learning_rates", iterate_as="lr"
# Handle batch processing
iterate_on="file_paths", iterate_as="file_path"
Map vs Parallel
- Map: Same workflow, different data (for loop)
- Parallel: Different workflows, same time (independent tasks)
Next: Learn about conditional workflows.