Skip to content

Map

map nodes in runnable allows to execute a pipeline for all the items in a list.

Concept

A relatable example from data science would be doing a grid search over hyper parameters where the training pipeline needs to run on every hyper parameter.

    flowchart TD
    gridSearch([Grid Search]):::green
    success([Success]):::green

    subgraph one[Parameter 1]
        process_chunk1([Train model]):::yellow
        success_chunk1([Success]):::yellow

        process_chunk1 --> success_chunk1
    end

    subgraph two[Parameter ...]
        process_chunk2([Train model]):::yellow
        success_chunk2([Success]):::yellow

        process_chunk2 --> success_chunk2
    end

    subgraph three[Parameter n]
        process_chunk3([Train model]):::yellow
        success_chunk3([Success]):::yellow

        process_chunk3 --> success_chunk3
    end

    reduce([Reduce]):::green


    gridSearch --> one --> reduce
    gridSearch --> two --> reduce
    gridSearch --> three --> reduce
    reduce --> success

    classDef yellow stroke:#FFFF00
    classDef green stroke:#0f0

The reduce step is part of the map state definition.

API Documentation

Conceptually, map node can be represented in python as:

for i in iterable_parameter:
    # a pipeline of steps
    x = execute_first_step(i)
    score = execute_second_step(i, x)

reduce(score) # could be as simple as a list of scores indexed by i or a custom reducer function/lambda
...

Syntax

The runnable syntax for the above example:

from runnable import PythonTask, Map, Pipeline

def execute_first_step(i): # (1)
    ...

    return x # (2)

def execute_second_step(i, x): # (3)
    ...

def get_iterable_branch(): # (4)
    first_step_task = PythonTask(name="execute_first_step",
                    function="execute_first_step",
                    returns=["x"])

    second_step_task = PythonTask(name="execute_second_step",
                    function="execute_second_step",
                    terminate_with_success=True)

    pipeline = Pipeline(steps=[first_step_task,second_step_task])

def main():
    generate_task = PythonTask(name="generate_task",
                        function="generate",
                        returns=["iterable_parameter"]) # (5)

    iterate_task = Map(name="iterate",
                    branch=get_iterable_branch(),
                    iterate_on="iterable_parameter", # (6)
                    iterate_as="i",
                    terminate_with_success=True) # (7)

    pipeline = Pipeline(steps=[generate_task, iterate_task])

    pipeline.execute()
    return pipeline

if __name__ == "__main__":
    main()
  1. Takes in an input parameter i, the current value of the iteration.
  2. returns a parameter x.
  3. i is the current value of iteration, x is the return parameter of function call at iteration i.
  4. returns a pipeline whose tasks are dependent on an iterable i
  5. returns the parameter iterable_parameter.
  6. loop over iterable_parameter executing iterable_branch over each value of i.
  7. Present i as input argument to all tasks of iterable_branch.
branch: &branch # (1)
start_at: execute_first_step
steps:
  execute_first_step: # (2)
    type: task
    command: execute_first_step
    next: execute_second_step
    returns:
      - x # (3)
  execute_second_step:
    type: task
    command: execute_second_step # (4)
    next: success


dag:
start_at: generate_task
steps:
  generate_task:
    type: task
    command: generate
    returns:
      - iterable_parameter # (5)
  iterate_task:
    type: map
    branch: *branch # (6)
    iterate_on: iterable_parameter # (7)
    iterate_as: i # (8)
    next: success
  1. The pipeline to iterate over an iterable parameter
  2. The task expects i, the current value of iteration.
  3. The task returns x.
  4. The task expects i, the current value of iteration and x at the current iteration.
  5. returns a iterable, iterable_parameter.
  6. the branch to iterate over
  7. the parameter to iterate on, returned by a task generate_task.
  8. present the current value of iteration as i to all the tasks of the branch.

Reduce

Default behavior

The returns of the tasks of the iterable branch are reduced to a list indexed by the order of iterable. In the above example, there would be parameter available for downstream steps of iterate_task that is a list of all xs observed during the iteration.

For clarity, the default reducer is: lambda *x: list(x) # returns a list of the args

Custom reduce

The map state also accepts a argument reducer which could be a lambda or function that accepts *args (a non-keyword variable length argument list) and returns a reduced value. The downstream steps of iterate_task would use the reduced value.

Traversal

A branch of a map step is considered success only if the success step is reached at the end. The steps of the pipeline can fail and be handled by on failure and redirected to success if that is the desired behavior.

The map step is considered successful only if all the branches of the step have terminated successfully.

Complete example

Uses the default reducer

"""
map states allows to repeat a branch for each value of an iterable.

The below example can written, in python, as:

chunks = [1, 2, 3]

for chunk in chunks:
    # Any task within the pipeline can access the value of chunk as an argument.
    processed = process_chunk(chunk)

    # The value of processed for every iteration is the value returned by the steps
    # of the current execution. For example, the value of processed
    # for chunk=1, is chunk*10 = 10 for downstream steps.
    read_processed_chunk(chunk, processed)

# Outside of loop, processed is a list of all the processed chunks.
# This is also called as the reduce pattern.
assert processed == [chunk * 10 for chunk in chunks]

Run this pipeline as:
    python examples/07-map/map.py
"""

from examples.common.functions import (
    assert_default_reducer,
    process_chunk,
    read_processed_chunk,
)
from runnable import Map, NotebookTask, Pipeline, PythonTask, ShellTask


def iterable_branch(execute: bool = True):
    """
    Use the pattern of using "execute" to control the execution of the pipeline.

    The same pipeline can be run independently from the command line.

    WARNING: If the execution is not controlled by "execute", the pipeline will be executed
    even during the definition of the branch in parallel steps.
    """
    # The python function to process a single chunk of data.
    # In the example, we are multiplying the chunk by 10.
    process_chunk_task_python = PythonTask(
        name="execute_python",
        function=process_chunk,
        returns=["processed_python"],
    )

    # return parameters within a map branch have to be unique
    # The notebook takes in the value of processed_python as an argument.
    # and returns a new parameter "processed_notebook" which is 10*processed_python
    process_chunk_task_notebook = NotebookTask(
        name="execute_notebook",
        notebook="examples/common/process_chunk.ipynb",
        returns=["processed_notebook"],
    )

    # following the pattern, the shell takes in the value of processed_notebook as an argument.
    # and returns a new parameter "processed_shell" which is 10*processed_notebook.
    shell_command = """
    if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \
        && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then
            echo "yaay"
        else
            echo "naay"
            exit 1;
    fi
    export processed_shell=$( expr 10 '*' "$processed_notebook")
    """

    process_chunk_task_shell = ShellTask(
        name="execute_shell",
        command=shell_command,
        returns=["processed_shell"],
    )

    # A downstream step of process_<python, notebook, shell> which reads the parameter "processed".
    # The value of processed is within the context of the branch.
    # For example, for chunk=1, the value of processed_python is chunk*10 = 10
    # the value of processed_notebook is processed_python*10 = 100
    # the value of processed_shell is processed_notebook*10 = 1000
    read_chunk = PythonTask(
        name="read processed chunk",
        function=read_processed_chunk,
        terminate_with_success=True,
    )

    pipeline = Pipeline(
        steps=[process_chunk_task_python, process_chunk_task_notebook, process_chunk_task_shell, read_chunk],
    )

    if execute:
        pipeline.execute()

    return pipeline


def main():
    # Create a map state which iterates over a list of chunks.
    # chunk is the value of the iterable.
    map_state = Map(
        name="map state",
        iterate_on="chunks",
        iterate_as="chunk",
        branch=iterable_branch(execute=False),
    )

    # Outside of the loop, processed is a list of all the processed chunks.
    # This is also called as the reduce pattern.
    # the value of processed_python is [10, 20, 30]
    # the value of processed_notebook is [100, 200, 300]
    # the value of processed_shell is [1000, 2000, 3000]
    collect = PythonTask(
        name="collect",
        function=assert_default_reducer,
        terminate_with_success=True,
    )

    pipeline = Pipeline(steps=[map_state, collect])

    pipeline.execute(parameters_file="examples/common/initial_parameters.yaml")

    return pipeline


if __name__ == "__main__":
    main()
branch: &branch
  start_at: execute_python
  steps:
    execute_python:
      type: task
      command: examples.common.functions.process_chunk
      returns:
        - name: processed_python
      next: execute_notebook
    execute_notebook:
      type: task
      command_type: notebook
      command: examples/common/process_chunk.ipynb
      returns:
        - name: processed_notebook
      next: execute_shell
    execute_shell:
      type: task
      command_type: shell
      command: |
        if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \
        && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then
            echo "yaay"
        else
            echo "naay"
            exit 1;
        fi
        export processed_shell=$( expr 10 '*' "$processed_notebook")
      returns:
        - name: processed_shell
      next: read_chunk
    read_chunk:
      type: task
      command: examples.common.functions.read_processed_chunk
      next: success


dag:
  description: |
    map states allows to repeat a branch for each value of an iterable.

    The below example can written, in python, as:

    chunks = [1, 2, 3]

    for chunk in chunks:
        # Any task within the pipeline can access the value of chunk as an argument.
        processed = process_chunk(chunk)

        # The value of processed for every iteration is the value returned by the steps
        # of the current execution. For example, the value of processed
        # for chunk=1, is chunk*10 = 10 for downstream steps.
        read_processed_chunk(chunk, processed)

    # Outside of loop, processed is a list of all the processed chunks.
    # This is also called as the reduce pattern.
    assert processed == [chunk * 10 for chunk in chunks]

    Run this pipeline as:
      runnable execute -f examples/07-map/map.yaml \
      -p examples/common/initial_parameters.yaml

  start_at: map_state
  steps:
    map_state:
      type: map
      branch: *branch
      iterate_on: chunks
      iterate_as: chunk
      next: collect
    collect:
      type: task
      command: examples.common.functions.assert_default_reducer
      next: success

Differs from default reducer to a lambda *x: max(x) reducer.

"""
map states allows to repeat a branch for each value of an iterable.

The below example can written, in python, as:

chunks = [1, 2, 3]

for chunk in chunks:
    # Any task within the pipeline can access the value of chunk as an argument.
    processed = process_chunk(chunk)

    # The value of processed for every iteration is the value returned by the steps
    # of the current execution. For example, the value of processed
    # for chunk=1, is chunk*10 = 10 for downstream steps.
    read_processed_chunk(chunk, processed)

It is possible to use a custom reducer, for example, this reducer is a max of the collection.
# Once the reducer is applied, processed is reduced to a single value.
assert processed == max(chunk * 10 for chunk in chunks)
"""

from examples.common.functions import (
    assert_custom_reducer,
    process_chunk,
    read_processed_chunk,
)
from runnable import Map, NotebookTask, Pipeline, PythonTask, ShellTask


def iterable_branch(execute: bool = True):
    """
    Use the pattern of using "execute" to control the execution of the pipeline.

    The same pipeline can be run independently from the command line.

    WARNING: If the execution is not controlled by "execute", the pipeline will be executed
    even during the definition of the branch in parallel steps.
    """
    # The python function to process a single chunk of data.
    # In the example, we are multiplying the chunk by 10.
    process_chunk_task_python = PythonTask(
        name="execute_python",
        function=process_chunk,
        returns=["processed_python"],
    )

    # return parameters within a map branch have to be unique
    # The notebook takes in the value of processed_python as an argument.
    # and returns a new parameter "processed_notebook" which is 10*processed_python
    process_chunk_task_notebook = NotebookTask(
        name="execute_notebook",
        notebook="examples/common/process_chunk.ipynb",
        returns=["processed_notebook"],
    )

    # following the pattern, the shell takes in the value of processed_notebook as an argument.
    # and returns a new parameter "processed_shell" which is 10*processed_notebook.
    shell_command = """
    if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \
        && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then
            echo "yaay"
        else
            echo "naay"
            exit 1;
    fi
    export processed_shell=$( expr 10 '*' "$processed_notebook")
    """

    process_chunk_task_shell = ShellTask(
        name="execute_shell",
        command=shell_command,
        returns=["processed_shell"],
    )

    # A downstream step of process_<python, notebook, shell> which reads the parameter "processed".
    # The value of processed is within the context of the branch.
    # For example, for chunk=1, the value of processed_python is chunk*10 = 10
    # the value of processed_notebook is processed_python*10 = 100
    # the value of processed_shell is processed_notebook*10 = 1000
    read_chunk = PythonTask(
        name="read processed chunk",
        function=read_processed_chunk,
        terminate_with_success=True,
    )

    pipeline = Pipeline(
        steps=[process_chunk_task_python, process_chunk_task_notebook, process_chunk_task_shell, read_chunk],
    )

    if execute:
        pipeline.execute()

    return pipeline


def main():
    # Create a map state which iterates over a list of chunks.
    # chunk is the value of the iterable.
    # Upon completion of the map state, all the parameters of the tasks
    # within the pipeline will be processed by the reducer.
    # In this case, the reducer is the max of all the processed chunks.
    map_state = Map(
        name="map state",
        iterate_on="chunks",
        iterate_as="chunk",
        reducer="lambda *x: max(x)",
        branch=iterable_branch(execute=False),
    )

    collect = PythonTask(
        name="collect",
        function=assert_custom_reducer,
        terminate_with_success=True,
    )

    pipeline = Pipeline(steps=[map_state, collect])

    pipeline.execute(parameters_file="examples/common/initial_parameters.yaml")

    return pipeline


if __name__ == "__main__":
    main()
branch: &branch
  start_at: execute_python
  steps:
    execute_python:
      type: task
      command: examples.common.functions.process_chunk
      returns:
        - name: processed_python
      next: execute_notebook
    execute_notebook:
      type: task
      command_type: notebook
      command: examples/common/process_chunk.ipynb
      returns:
        - name: processed_notebook
      next: execute_shell
    execute_shell:
      type: task
      command_type: shell
      command: |
        if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \
        && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then
            echo "yaay"
        else
            echo "naay"
            exit 1;
        fi
        export processed_shell=$( expr 10 '*' "$processed_notebook")
      returns:
        - name: processed_shell
      next: read_chunk
    read_chunk:
      type: task
      command: examples.common.functions.read_processed_chunk
      next: success

dag:
  description: |
    map states allows to repeat a branch for each value of an iterable.

    The below example can written, in python, as:

    chunks = [1, 2, 3]

    for chunk in chunks:
        # Any task within the pipeline can access the value of chunk as an argument.
        processed = process_chunk(chunk)

        # The value of processed for every iteration is the value returned by the steps
        # of the current execution. For example, the value of processed
        # for chunk=1, is chunk*10 = 10 for downstream steps.
        read_processed_chunk(chunk, processed)

    It is possible to use a custom reducer, for example, this reducer is a max of the collection.
    # Once the reducer is applied, processed is reduced to a single value.
    assert processed == max(chunk * 10 for chunk in chunks)

    Run this pipeline as:
      runnable execute -f examples/07-map/custom_reducer.yaml \
      -p examples/common/initial_parameters.yaml
  start_at: map_state
  steps:
    map_state:
      type: map
      branch: *branch
      iterate_on: chunks
      iterate_as: chunk
      reducer: "lambda *x: max(x)"
      next: collect
    collect:
      type: task
      command: examples.common.functions.assert_custom_reducer
      next: success