Skip to content

Parallel

parallel node in runnable embed multiple pipeline as branches.

API Documentation


Concept

The below diagram shows training a baseline model and CNN model in parallel and picking the best model for inference.

    flowchart LR

        getFeatures([Get Features]):::green
        trainStep(Train Models):::green
        chooseBest([Evaluate Models]):::green
        inference([Run Inference]):::green
        success([Success]):::green

        prepareBase([Prepare for baseline model]):::yellow
        trainBase([Train XGBoost]):::yellow
        successBase([success]):::yellow
        prepareBase --> trainBase --> successBase

        trainCNN([Train CNN model]):::yellow
        successCNN([CNNModel success]):::yellow
        trainCNN --> successCNN


        getFeatures --> trainStep
        trainStep --> prepareBase
        trainStep --> trainCNN
        successBase --> chooseBest
        successCNN --> chooseBest
        chooseBest --> inference
        inference --> success


        classDef yellow stroke:#FFFF00
        classDef green stroke:#0f0

The branch for training the baseline and cnn are pipelines themselves are defined as any other pipeline.

The step Train Models is a parallel step that has the branches as the individual pipelines.

Syntax

from runnable import Pipeline, Parallel
def get_baseline_pipeline():
    ...
    pipeline = Pipeline(...)
    return pipeline

def get_cnn_pipeline():
    ...
    pipeline = Pipeline(...)
    return pipeline

def main():
    train_models = Parallel(name="train models",
                    branches={'baseline': get_baseline_pipeline, 'cnn': get_cnn_pipeline()},
                    terminate_with_success=True)
    pipeline = Pipeline(steps=[train_models])

    pipeline.execute

    return pipeline
branch: &baseline
start_at: prepare
steps:
    ...

branch: &cnn
start_at: train
steps:
    ...

dag:
description: |
    This example demonstrates the use of the Parallel step.

    parallel step takes a mapping of branches which are pipelines themselves.

start_at: parallel_step
steps:
    parallel_step:
    type: parallel
    next: success
    branches:
      baseline: *baseline
      cnn: *cnn

Execution

The pipelines of the parallel branch should not execute during the definition of parallel step. In case, you want to execute the individual branches in isolation, use a flag to control it.

eg: the functions get_baseline and get_cnn can take a argument execute which is defaulted to True. During the composition of parallel step, pass in execute as False.

Traversal

A branch of a parallel step is considered success only if the success step is reached at the end. The steps of the pipeline can fail and be handled by on failure and redirected to success if that is the desired behavior.

The parallel step is considered successful only if all the branches of the step have terminated successfully.

Complete example

"""
This example demonstrates the use of the Parallel step.

The branches of the parallel step are themselves pipelines and can be defined
as shown in 02-sequential/traversal.py.

WARNING, the function returning the pipeline should not executed
during the definition of the branch in parallel steps.

Run this pipeline as:
    python examples/06-parallel/parallel.py
"""

from examples.common.functions import hello
from runnable import NotebookTask, Parallel, Pipeline, PythonTask, ShellTask, Stub


def traversal():
    """
    Use the pattern of using "execute" to control the execution of the pipeline.

    The same pipeline can be run independently from the command line.

    WARNING: If the execution is not controlled by "execute", the pipeline will be executed
    even during the definition of the branch in parallel steps.
    """
    stub_task = Stub(name="hello stub")

    python_task = PythonTask(
        name="hello python",
        function=hello,
    )

    shell_task = ShellTask(
        name="hello shell",
        command="echo 'Hello World!'",
    )

    notebook_task = NotebookTask(
        name="hello notebook",
        notebook="examples/common/simple_notebook.ipynb",
        terminate_with_success=True,
    )

    # The pipeline has a mix of tasks.
    # The order of execution follows the order of the tasks in the list.
    pipeline = Pipeline(steps=[stub_task, python_task, shell_task, notebook_task])

    return pipeline


def main():
    parallel_step = Parallel(
        name="parallel_step",
        terminate_with_success=True,
        branches={"branch1": traversal(), "branch2": traversal()},
    )

    pipeline = Pipeline(steps=[parallel_step])

    pipeline.execute()
    return pipeline


if __name__ == "__main__":
    main()
branch: &branch
  description: |
    Use this pattern to define repeatable branch

    This pipeline is the same as the one defined in examples/02-sequential/traversal.yaml
  start_at: hello stub
  steps:
    hello stub:
      type: stub
      next: hello python
    hello python:
      type: task
      command_type: python
      command: examples.common.functions.hello # dotted path to the function.
      next: hello shell
    hello shell:
      type: task
      command_type: shell
      command: echo "Hello World!" # Command to run
      next: hello notebook
    hello notebook:
      type: task
      command_type: notebook
      command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project.
      next: success


dag:
  description: |
    This example demonstrates the use of the Parallel step.

    parallel step takes a mapping of branches which are pipelines themselves.

    Run this pipeline as:
      runnable execute -f examples/06-parallel/parallel.yaml


  start_at: parallel_step
  steps:
    parallel_step:
      type: parallel
      next: success
      branches:
        branch1: *branch
        branch2: *branch