Skip to content

Reference

Please accompany the reference with examples from the repo.

PythonTask

runnable.PythonTask

An execution node of the pipeline of python functions. Please refer to concepts for more information.

Attributes:

  • name (str) –

    The name of the node.

  • function (callable) –

    The function to execute.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

  • returns (List[Union[str, TaskReturns]]) –

    A list of the names of variables to return from the task. The names should match the order of the variables returned by the function.

    TaskReturns: can be JSON friendly variables, objects or metrics.

    By default, all variables are assumed to be JSON friendly and will be serialized to JSON. Pydantic models are readily supported and will be serialized to JSON.

    To return a python object, please use pickled(<name>). It is advised to use pickled(<name>) for big JSON friendly variables.

    For example,

    from runnable import pickled
    
    def f():
        ...
        x = 1
        return x, df # A simple JSON friendly variable and a python object.
    
    task = PythonTask(name="task", function=f, returns=["x", pickled(df)]))
    

    To mark any JSON friendly variable as a metric, please use metric(x). Metric variables should be JSON friendly and can be treated just like any other parameter.

  • catalog (Optional[Catalog]) –

    The files sync data from/to, refer to Catalog.

  • secrets (List[str]) –

    List of secrets to pass to the task. They are exposed as environment variables and removed after execution.

  • overrides (Dict[str, Any]) –

    Any overrides to the command. Individual tasks can override the global configuration config by referring to the specific override.

    For example,

    Global configuration
    executor:
      type: local-container
      config:
        docker_image: "runnable/runnable:latest"
        overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"
    
    Task specific configuration
    task = PythonTask(name="task", function="function'",
            overrides={'local-container': custom_docker_image})
    
An execution node of the pipeline of python functions.
Please refer to define pipeline/tasks/python for more information.

As part of the dag definition, a python task is defined as follows:

dag:
  steps:
    python_task: # The name of the node
      type: task
      command_type: python # this is default
      command: my_module.my_function # the dotted path to the function. Please refer to the yaml section of
        define pipeline/tasks/python for concrete details.
      returns:
        - name: # The name to assign the return value
          kind: json # the default value is json,
            can be object for python objects and metric for metrics
      secrets:
        - my_secret_key # A list of secrets to expose by secrets manager
      catalog:
        get:
          - A list of glob patterns to get from the catalog to the local file system
        put:
          - A list of glob patterns to put to the catalog from the local file system
      on_failure: The name of the step to traverse in case of failure
      overrides:
        Individual tasks can override the global configuration config by referring to the
        specific override.

        For example,
        #Global configuration
        executor:
        type: local-container
        config:
          docker_image: "runnable/runnable:latest"
          overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"

        ## In the node definition
        overrides:
        local-container:
          docker_image: "runnable/runnable:custom"

        This instruction will override the docker image for the local-container executor.
      next: The next node to execute after this task, use "success" to terminate the pipeline successfully
        or "fail" to terminate the pipeline with an error.

NotebookTask

runnable.NotebookTask

An execution node of the pipeline of notebook. Please refer to concepts for more information.

We internally use Ploomber engine to execute the notebook.

Attributes:

  • name (str) –

    The name of the node.

  • notebook (str) –

    The path to the notebook relative the project root.

  • optional_ploomber_args (Dict[str, Any]) –

    Any optional ploomber args, please refer to Ploomber engine for more information.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

  • returns (List[Union[str, TaskReturns]]) –

    A list of the names of variables to return from the task. The names should match the order of the variables returned by the function.

    TaskReturns: can be JSON friendly variables, objects or metrics.

    By default, all variables are assumed to be JSON friendly and will be serialized to JSON. Pydantic models are readily supported and will be serialized to JSON.

    To return a python object, please use pickled(<name>). It is advised to use pickled(<name>) for big JSON friendly variables.

    For example,

    from runnable import pickled
    
    # assume, example.ipynb is the notebook with df and x as variables in some cells.
    
    task = Notebook(name="task", notebook="example.ipynb", returns=["x", pickled(df)]))
    

    To mark any JSON friendly variable as a metric, please use metric(x). Metric variables should be JSON friendly and can be treated just like any other parameter.

  • catalog (Optional[Catalog]) –

    The files sync data from/to, refer to Catalog.

  • secrets (List[str]) –

    List of secrets to pass to the task. They are exposed as environment variables

  • overrides (Dict[str, Any]) –

    Any overrides to the command. Individual tasks can override the global configuration config by referring to the specific override.

    For example,

    Global configuration
    executor:
      type: local-container
      config:
        docker_image: "runnable/runnable:latest"
        overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"
    
    Task specific configuration
    task = NotebookTask(name="task", notebook="example.ipynb",
            overrides={'local-container': custom_docker_image})
    
An execution node of the pipeline of notebook execution.
Please refer to define pipeline/tasks/notebook for more information.

As part of the dag definition, a notebook task is defined as follows:

dag:
  steps:
    notebook_task: # The name of the node
      type: task
      command_type: notebook
      command: the path to the notebook relative to project root.
      optional_ploomber_args: a dictionary of arguments to be passed to ploomber engine
      returns:
        - name: # The name to assign the return value
          kind: json # the default value is json,
            can be object for python objects and metric for metrics
      secrets:
        - my_secret_key # A list of secrets to expose by secrets manager
      catalog:
        get:
          - A list of glob patterns to get from the catalog to the local file system
        put:
          - A list of glob patterns to put to the catalog from the local file system
      on_failure: The name of the step to traverse in case of failure
      overrides:
        Individual tasks can override the global configuration config by referring to the
        specific override.

        For example,
        #Global configuration
        executor:
        type: local-container
        config:
          docker_image: "runnable/runnable:latest"
          overrides:
            custom_docker_image:
              docker_image: "runnable/runnable:custom"

        ## In the node definition
        overrides:
          local-container:
            docker_image: "runnable/runnable:custom"

        This instruction will override the docker image for the local-container executor.
      next: The next node to execute after this task, use "success" to terminate the pipeline successfully
        or "fail" to terminate the pipeline with an error.

ShellTask

runnable.ShellTask

An execution node of the pipeline of shell script. Please refer to concepts for more information.

Attributes:

  • name (str) –

    The name of the node.

  • command (str) –

    The path to the notebook relative the project root.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

  • returns (List[str]) –

    A list of the names of environment variables to collect from the task.

    The names should match the order of the variables returned by the function. Shell based tasks can only return JSON friendly variables.

    To mark any JSON friendly variable as a metric, please use metric(x). Metric variables should be JSON friendly and can be treated just like any other parameter.

  • catalog (Optional[Catalog]) –

    The files sync data from/to, refer to Catalog.

  • secrets (List[str]) –

    List of secrets to pass to the task. They are exposed as environment variables

  • overrides (Dict[str, Any]) –

    Any overrides to the command. Individual tasks can override the global configuration config by referring to the specific override.

    For example,

    Global configuration
    executor:
      type: local-container
      config:
        docker_image: "runnable/runnable:latest"
        overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"
    
    Task specific configuration
    task = ShellTask(name="task", command="export x=1",
            overrides={'local-container': custom_docker_image})
    
An execution node of the pipeline of notebook execution.
Please refer to define pipeline/tasks/notebook for more information.

As part of the dag definition, a notebook task is defined as follows:

dag:
  steps:
    notebook_task: # The name of the node
      type: task
      command_type: notebook
      command: the path to the notebook relative to project root.
      optional_ploomber_args: a dictionary of arguments to be passed to ploomber engine
      returns:
        - name: # The name to assign the return value
          kind: json # the default value is json,
            can be object for python objects and metric for metrics
      secrets:
        - my_secret_key # A list of secrets to expose by secrets manager
      catalog:
        get:
          - A list of glob patterns to get from the catalog to the local file system
        put:
          - A list of glob patterns to put to the catalog from the local file system
      on_failure: The name of the step to traverse in case of failure
      overrides:
        Individual tasks can override the global configuration config by referring to the
        specific override.

        For example,
        #Global configuration
        executor:
        type: local-container
        config:
          docker_image: "runnable/runnable:latest"
          overrides:
            custom_docker_image:
              docker_image: "runnable/runnable:custom"

        ## In the node definition
        overrides:
          local-container:
            docker_image: "runnable/runnable:custom"

        This instruction will override the docker image for the local-container executor.
      next: The next node to execute after this task, use "success" to terminate the pipeline successfully
        or "fail" to terminate the pipeline with an error.

Stub

runnable.Stub

A node that passes through the pipeline with no action. Just like pass in Python. Please refer to concepts for more information.

A stub node can tak arbitrary number of arguments.

Attributes:

  • name (str) –

    The name of the node.

  • command (str) –

    The path to the notebook relative the project root.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

An stub execution node of the pipeline.
Please refer to define pipeline/tasks/stub for more information.

As part of the dag definition, a stub task is defined as follows:

dag:
  steps:
    stub_task: # The name of the node
    type: stub
    on_failure: The name of the step to traverse in case of failure
    next: The next node to execute after this task, use "success" to terminate the pipeline successfully
      or "fail" to terminate the pipeline with an error.

It can take arbritary number of parameters, which is handy to temporarily silence a task node.

Catalog

runnable.Catalog

Use to instruct a task to sync data from/to the central catalog. Please refer to concepts for more information.

Attributes:

  • get (List[str]) –

    List of glob patterns to get from central catalog to the compute data folder.

  • put (List[str]) –

    List of glob patterns to put into central catalog from the compute data folder.

Examples:

>>> from runnable import Catalog
>>> catalog = Catalog(compute_data_folder="/path/to/data", get=["*.csv"], put=["*.csv"])

Pipeline

runnable.Pipeline

A Pipeline is a sequence of Steps.

Attributes:

  • steps (List[Stub | PythonTask | NotebookTask | ShellTask | Parallel | Map]]) –

    A list of Steps that make up the Pipeline.

    The order of steps is important as it determines the order of execution. Any on failure behavior should the first step in on_failure pipelines.

  • on_failure (List[List[Pipeline]) –

    A list of Pipelines to execute in case of failure.

    For example, for the below pipeline: step1 >> step2 and step1 to reach step3 in case of failure.

    failure_pipeline = Pipeline(steps=[step1, step3])
    
    pipeline = Pipeline(steps=[step1, step2, on_failure=[failure_pipeline])
    
  • name (str) –

    The name of the Pipeline. Defaults to "".

  • description (str) –

    A description of the Pipeline. Defaults to "".

The pipeline implicitly add success and fail nodes.

execute

execute(configuration_file: str = '', run_id: str = '', tag: str = '', parameters_file: str = '', log_level: str = defaults.LOG_LEVEL)

Execute the Pipeline.

Execution of pipeline could either be:

Traverse and execute all the steps of the pipeline, eg. local execution.

Or create the representation of the pipeline for other executors.

Please refer to concepts for more information.

Parameters:

  • configuration_file (str, default: '' ) –

    The path to the configuration file. Defaults to "". The configuration file can be overridden by the environment variable RUNNABLE_CONFIGURATION_FILE.

  • run_id (str, default: '' ) –

    The ID of the run. Defaults to "".

  • tag (str, default: '' ) –

    The tag of the run. Defaults to "". Use to group multiple runs.

  • parameters_file (str, default: '' ) –

    The path to the parameters file. Defaults to "".

  • log_level (str, default: LOG_LEVEL ) –

    The log level. Defaults to defaults.LOG_LEVEL.

Parallel

runnable.Parallel

A node that executes multiple branches in parallel. Please refer to concepts for more information.

Attributes:

  • name (str) –

    The name of the node.

  • branches (Dict[str, Pipeline]) –

    A dictionary of branches to execute in parallel.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node.

  • on_failure (str) –

    The name of the node to execute if any of the branches fail.


Map

runnable.Map

A node that iterates over a list of items and executes a pipeline for each item. Please refer to concepts for more information.

Attributes:

  • branch (Pipeline) –

    The pipeline to execute for each item.

  • iterate_on (str) –

    The name of the parameter to iterate over. The parameter should be defined either by previous steps or statically at the start of execution.

  • iterate_as (str) –

    The name of the iterable to be passed to functions.

  • reducer (Callable) –

    The function to reduce the results of the branches.

  • overrides (Dict[str, Any]) –

    Any overrides to the command.