Skip to content

🔄 Map Patterns

Process collections of data with the same workflow - like a for loop for pipelines.

The basic pattern

flowchart TD
    A[Start] --> B[Map Step]
    B --> C["chunks = 1, 2, 3"]
    C --> D{For each chunk}
    D --> E[Process chunk=1]
    D --> F[Process chunk=2]
    D --> G[Process chunk=3]
    E --> H[Result: 10]
    F --> I[Result: 20]
    G --> J[Result: 30]
    H --> K["Collect: 10, 20, 30"]
    I --> K
    J --> K
    K --> L[Continue]
from runnable import Map, Pipeline, PythonTask

def main():
    # Create a map step that processes each chunk
    map_state = Map(
        name="process_chunks",
        iterate_on="chunks",           # Parameter name containing [1, 2, 3]
        iterate_as="chunk",           # Each iteration gets chunk=1, chunk=2, chunk=3
        branch=create_processing_workflow()  # Same workflow runs for each chunk
    )

    # Collect results after all iterations
    collect_results = PythonTask(function=collect_all_results, name="collect")

    pipeline = Pipeline(steps=[map_state, collect_results])
    pipeline.execute(parameters_file="parameters.yaml")  # Contains chunks: [1, 2, 3]
    return pipeline

# Helper function to create the processing workflow
def create_processing_workflow():
    # Implementation details in complete example below
    pass

def collect_all_results():
    # Implementation details in complete example below
    pass

if __name__ == "__main__":
    main()
See complete runnable code
examples/07-map/map.py
"""
You can execute this pipeline by:

    python examples/07-map/map.py
"""

from examples.common.functions import (
    assert_default_reducer,
    process_chunk,
    read_processed_chunk,
)
from runnable import Map, NotebookTask, Pipeline, PythonTask, ShellTask, Stub


def iterable_branch(execute: bool = True):
    """
    Use the pattern of using "execute" to control the execution of the pipeline.

    The same pipeline can be run independently from the command line.

    WARNING: If the execution is not controlled by "execute", the pipeline will be executed
    even during the definition of the branch in parallel steps.
    """
    # The python function to process a single chunk of data.
    # In the example, we are multiplying the chunk by 10.
    process_chunk_task_python = PythonTask(
        name="execute_python",
        function=process_chunk,
        returns=["processed_python"],
    )

    # return parameters within a map branch have to be unique
    # The notebook takes in the value of processed_python as an argument.
    # and returns a new parameter "processed_notebook" which is 10*processed_python
    process_chunk_task_notebook = NotebookTask(
        name="execute_notebook",
        notebook="examples/common/process_chunk.ipynb",
        returns=["processed_notebook"],
    )

    # following the pattern, the shell takes in the value of processed_notebook as an argument.
    # and returns a new parameter "processed_shell" which is 10*processed_notebook.
    shell_command = """
    if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \
        && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then
            echo "yaay"
        else
            echo "naay"
            exit 1;
    fi
    export processed_shell=$( expr 10 '*' "$processed_notebook")
    """

    process_chunk_task_shell = ShellTask(
        name="execute_shell",
        command=shell_command,
        returns=["processed_shell"],
    )

    # A downstream step of process_<python, notebook, shell> which reads the parameter "processed".
    # The value of processed is within the context of the branch.
    # For example, for chunk=1, the value of processed_python is chunk*10 = 10
    # the value of processed_notebook is processed_python*10 = 100
    # the value of processed_shell is processed_notebook*10 = 1000
    read_chunk = PythonTask(
        name="read processed chunk",
        function=read_processed_chunk,
        terminate_with_success=True,
    )

    pipeline = Pipeline(
        steps=[
            process_chunk_task_python,
            process_chunk_task_notebook,
            process_chunk_task_shell,
            read_chunk,
        ],
    )

    if execute:
        pipeline.execute()

    return pipeline


def main():
    # Create a map state which iterates over a list of chunks.
    # chunk is the value of the iterable.
    map_state = Map(  # [concept:map]
        name="map_state",
        iterate_on="chunks",  # [concept:iteration-parameter]
        iterate_as="chunk",  # [concept:iterator-variable]
        branch=iterable_branch(execute=False),  # [concept:branch-pipeline]
    )

    # Outside of the loop, processed is a list of all the processed chunks.
    # This is also called as the reduce pattern.
    # the value of processed_python is [10, 20, 30]
    # the value of processed_notebook is [100, 200, 300]
    # the value of processed_shell is [1000, 2000, 3000]
    collect = PythonTask(  # [concept:collect-step]
        name="collect",
        function=assert_default_reducer,
    )

    continue_to = Stub(name="continue to")

    pipeline = Pipeline(steps=[map_state, collect])  # [concept:pipeline]

    pipeline.execute(parameters_file="examples/common/initial_parameters.yaml")  # [concept:execution]

    return pipeline


if __name__ == "__main__":
    main()

Try it now:

uv run examples/07-map/map.py

Like writing:

chunks = [1, 2, 3]
results = []
for chunk in chunks:
    result = process_chunk(chunk)
    results.append(result)

The branch workflow

Each iteration runs this pipeline with different chunk values:

Helper function (creates the workflow for each iteration):

def create_processing_workflow():
    from runnable import Pipeline, PythonTask, NotebookTask, ShellTask

    # Process chunk through multiple steps
    python_step = PythonTask(
        function=process_chunk,           # chunk=1 → processed_python=10
        returns=["processed_python"]
    )

    notebook_step = NotebookTask(
        notebook="process_chunk.ipynb",   # processed_python=10 → processed_notebook=100
        returns=["processed_notebook"]
    )

    shell_step = ShellTask(
        command="./process_chunk.sh",     # processed_notebook=100 → processed_shell=1000
        returns=["processed_shell"]
    )

    return Pipeline(steps=[python_step, notebook_step, shell_step])

Each iteration runs the same pipeline structure:

flowchart LR
    A[chunk] --> B[Python Task]
    B --> C[Notebook Task]
    C --> D[Shell Task]
    D --> E[Read Task]
    E --> F[returns: processed]

🔧 Custom reducers

By default, map collects all results into lists. Customize this with reducers:

flowchart TD
    A["Results: 10, 20, 30"] --> B{Reducer}
    B --> C["Default: 10, 20, 30"]
    B --> D[Max: 30]
    B --> E[Sum: 60]
    B --> F[Count: 3]
from runnable import Map, Pipeline

def main():
    # Use custom reducer to aggregate results
    map_state = Map(
        name="process_with_max",
        iterate_on="chunks",                    # [1, 2, 3]
        iterate_as="chunk",
        reducer="lambda *x: max(x)",           # Take maximum instead of collecting all
        branch=create_processing_workflow()
    )

    pipeline = Pipeline(steps=[map_state])
    pipeline.execute(parameters_file="parameters.yaml")  # Contains chunks: [1, 2, 3]
    return pipeline

# Results: processed_python = max(10, 20, 30) = 30
#          processed_notebook = max(100, 200, 300) = 300
#          processed_shell = max(1000, 2000, 3000) = 3000

if __name__ == "__main__":
    main()
See complete runnable code
examples/07-map/custom_reducer.py
"""
map states allows to repeat a branch for each value of an iterable.

The below example can written, in python, as:

chunks = [1, 2, 3]

for chunk in chunks:
    # Any task within the pipeline can access the value of chunk as an argument.
    processed = process_chunk(chunk)

    # The value of processed for every iteration is the value returned by the steps
    # of the current execution. For example, the value of processed
    # for chunk=1, is chunk*10 = 10 for downstream steps.
    read_processed_chunk(chunk, processed)

It is possible to use a custom reducer, for example, this reducer is a max of the collection.
# Once the reducer is applied, processed is reduced to a single value.
assert processed == max(chunk * 10 for chunk in chunks)

You can execute this pipeline by:

    python examples/07-map/custom_reducer.py
"""

from examples.common.functions import (
    assert_custom_reducer,
    process_chunk,
    read_processed_chunk,
)
from runnable import Map, NotebookTask, Pipeline, PythonTask, ShellTask


def iterable_branch(execute: bool = True):
    """
    Use the pattern of using "execute" to control the execution of the pipeline.

    The same pipeline can be run independently from the command line.

    WARNING: If the execution is not controlled by "execute", the pipeline will be executed
    even during the definition of the branch in parallel steps.
    """
    # The python function to process a single chunk of data.
    # In the example, we are multiplying the chunk by 10.
    process_chunk_task_python = PythonTask(
        name="execute_python",
        function=process_chunk,
        returns=["processed_python"],
    )

    # return parameters within a map branch have to be unique
    # The notebook takes in the value of processed_python as an argument.
    # and returns a new parameter "processed_notebook" which is 10*processed_python
    process_chunk_task_notebook = NotebookTask(
        name="execute_notebook",
        notebook="examples/common/process_chunk.ipynb",
        returns=["processed_notebook"],
    )

    # following the pattern, the shell takes in the value of processed_notebook as an argument.
    # and returns a new parameter "processed_shell" which is 10*processed_notebook.
    shell_command = """
    if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \
        && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then
            echo "yaay"
        else
            echo "naay"
            exit 1;
    fi
    export processed_shell=$( expr 10 '*' "$processed_notebook")
    """

    process_chunk_task_shell = ShellTask(
        name="execute_shell",
        command=shell_command,
        returns=["processed_shell"],
    )

    # A downstream step of process_<python, notebook, shell> which reads the parameter "processed".
    # The value of processed is within the context of the branch.
    # For example, for chunk=1, the value of processed_python is chunk*10 = 10
    # the value of processed_notebook is processed_python*10 = 100
    # the value of processed_shell is processed_notebook*10 = 1000
    read_chunk = PythonTask(
        name="read processed chunk",
        function=read_processed_chunk,
        terminate_with_success=True,
    )

    pipeline = Pipeline(
        steps=[
            process_chunk_task_python,
            process_chunk_task_notebook,
            process_chunk_task_shell,
            read_chunk,
        ],
    )

    if execute:
        pipeline.execute()

    return pipeline


def main():
    # Create a map state which iterates over a list of chunks.
    # chunk is the value of the iterable.
    # Upon completion of the map state, all the parameters of the tasks
    # within the pipeline will be processed by the reducer.
    # In this case, the reducer is the max of all the processed chunks.
    map_state = Map(  # [concept:map-with-custom-reducer]
        name="map state",
        iterate_on="chunks",
        iterate_as="chunk",
        reducer="lambda *x: max(x)",  # [concept:custom-reducer]
        branch=iterable_branch(execute=False),
    )

    collect = PythonTask(  # [concept:collect-step]
        name="collect",
        function=assert_custom_reducer,
        terminate_with_success=True,
    )

    pipeline = Pipeline(steps=[map_state, collect])  # [concept:pipeline]

    pipeline.execute(parameters_file="examples/common/initial_parameters.yaml")  # [concept:execution]

    return pipeline


if __name__ == "__main__":
    main()

Try it now:

uv run examples/07-map/custom_reducer.py

Reducer Options

Lambda functions:

  • "lambda *x: max(x)" → Maximum value
  • "lambda *x: sum(x)" → Sum all values
  • "lambda *x: len(x)" → Count items
  • "lambda *x: x[0]" → Take first result only

Python functions in dot notation:

You can also reference Python functions using dot notation:

# If you have a function in a module
# my_module.py:
def custom_max_reducer(*results):
    return max(results)

# Use it in Map with dot notation
map_state = Map(
    name="process_with_custom_reducer",
    iterate_on="chunks",
    iterate_as="chunk",
    reducer="my_module.custom_max_reducer",  # Reference function by module path
    branch=create_processing_workflow()
)

Built-in function references:

# Use built-in functions
reducer="max"           # Built-in max function
reducer="sum"           # Built-in sum function
reducer="len"           # Built-in len function

# Use functions from standard library modules
reducer="statistics.mean"     # Average of results
reducer="statistics.median"   # Median of results
reducer="operator.add"        # Sum using operator module

When to use map

Perfect for:

  • Processing file collections
  • Batch processing data chunks
  • Cross-validation in ML
  • Parameter sweeps
  • A/B testing multiple variants

Example use cases:

# Process multiple datasets
iterate_on="datasets", iterate_as="dataset"

# Test hyperparameters
iterate_on="learning_rates", iterate_as="lr"

# Handle batch processing
iterate_on="file_paths", iterate_as="file_path"

Map vs Parallel

  • Map: Same workflow, different data (for loop)
  • Parallel: Different workflows, same time (independent tasks)

Next: Learn about conditional workflows.