Skip to content

Reference

Please accompany the reference with examples from the repo.

PythonTask

runnable.PythonTask

An execution node of the pipeline of python functions. Please refer to concepts for more information.

Attributes:

  • name (str) –

    The name of the node.

  • function (callable) –

    The function to execute.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

  • returns (List[Union[str, TaskReturns]]) –

    A list of the names of variables to return from the task. The names should match the order of the variables returned by the function.

    TaskReturns: can be JSON friendly variables, objects or metrics.

    By default, all variables are assumed to be JSON friendly and will be serialized to JSON. Pydantic models are readily supported and will be serialized to JSON.

    To return a python object, please use pickled(<name>). It is advised to use pickled(<name>) for big JSON friendly variables.

    For example,

    from runnable import pickled
    
    def f():
        ...
        x = 1
        return x, df # A simple JSON friendly variable and a python object.
    
    task = PythonTask(name="task", function=f, returns=["x", pickled(df)]))
    

    To mark any JSON friendly variable as a metric, please use metric(x). Metric variables should be JSON friendly and can be treated just like any other parameter.

  • catalog (Optional[Catalog]) –

    The files sync data from/to, refer to Catalog.

  • secrets (List[str]) –

    List of secrets to pass to the task. They are exposed as environment variables and removed after execution.

  • overrides (Dict[str, Any]) –

    Any overrides to the command. Individual tasks can override the global configuration config by referring to the specific override.

    For example,

    Global configuration
    executor:
      type: local-container
      config:
        docker_image: "runnable/runnable:latest"
        overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"
    
    Task specific configuration
    task = PythonTask(name="task", function="function'",
            overrides={'local-container': custom_docker_image})
    

Attributes:

  • name: the name of the task
  • command: the dotted path reference to the function.
  • next: the next node to call if the function succeeds. Use success to terminate the pipeline successfully or fail to terminate with fail.
  • on_failure: The next node in case of failure.
  • catalog: mapping of cataloging items
  • overrides: mapping of step overrides from global configuration.
dag:
  steps:
    name: <>
      type: task
      command: <>
      next: <>
      on_failure: <>
      catalog: # Any cataloging to be done.
      overrides: # mapping of overrides of global configuration

NotebookTask

runnable.NotebookTask

An execution node of the pipeline of notebook. Please refer to concepts for more information.

We internally use Ploomber engine to execute the notebook.

Attributes:

  • name (str) –

    The name of the node.

  • notebook (str) –

    The path to the notebook relative the project root.

  • optional_ploomber_args (Dict[str, Any]) –

    Any optional ploomber args, please refer to Ploomber engine for more information.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

  • returns (List[Union[str, TaskReturns]]) –

    A list of the names of variables to return from the task. The names should match the order of the variables returned by the function.

    TaskReturns: can be JSON friendly variables, objects or metrics.

    By default, all variables are assumed to be JSON friendly and will be serialized to JSON. Pydantic models are readily supported and will be serialized to JSON.

    To return a python object, please use pickled(<name>). It is advised to use pickled(<name>) for big JSON friendly variables.

    For example,

    from runnable import pickled
    
    # assume, example.ipynb is the notebook with df and x as variables in some cells.
    
    task = Notebook(name="task", notebook="example.ipynb", returns=["x", pickled(df)]))
    

    To mark any JSON friendly variable as a metric, please use metric(x). Metric variables should be JSON friendly and can be treated just like any other parameter.

  • catalog (Optional[Catalog]) –

    The files sync data from/to, refer to Catalog.

  • secrets (List[str]) –

    List of secrets to pass to the task. They are exposed as environment variables

  • overrides (Dict[str, Any]) –

    Any overrides to the command. Individual tasks can override the global configuration config by referring to the specific override.

    For example,

    Global configuration
    executor:
      type: local-container
      config:
        docker_image: "runnable/runnable:latest"
        overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"
    
    Task specific configuration
    task = NotebookTask(name="task", notebook="example.ipynb",
            overrides={'local-container': custom_docker_image})
    

Attributes:

  • name: the name of the task
  • command: the path to the notebook relative to the project root.
  • next: the next node to call if the function succeeds. Use success to terminate the pipeline successfully or fail to terminate with fail.
  • on_failure: The next node in case of failure.
  • catalog: mapping of cataloging items
  • overrides: mapping of step overrides from global configuration.
dag:
  steps:
    name: <>
      type: task
      command: <>
      next: <>
      on_failure: <>
      catalog: # Any cataloging to be done.
      overrides: # mapping of overrides of global configuration

ShellTask

runnable.ShellTask

An execution node of the pipeline of shell script. Please refer to concepts for more information.

Attributes:

  • name (str) –

    The name of the node.

  • command (str) –

    The path to the notebook relative the project root.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

  • returns (List[str]) –

    A list of the names of environment variables to collect from the task.

    The names should match the order of the variables returned by the function. Shell based tasks can only return JSON friendly variables.

    To mark any JSON friendly variable as a metric, please use metric(x). Metric variables should be JSON friendly and can be treated just like any other parameter.

  • catalog (Optional[Catalog]) –

    The files sync data from/to, refer to Catalog.

  • secrets (List[str]) –

    List of secrets to pass to the task. They are exposed as environment variables

  • overrides (Dict[str, Any]) –

    Any overrides to the command. Individual tasks can override the global configuration config by referring to the specific override.

    For example,

    Global configuration
    executor:
      type: local-container
      config:
        docker_image: "runnable/runnable:latest"
        overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"
    
    Task specific configuration
    task = ShellTask(name="task", command="export x=1",
            overrides={'local-container': custom_docker_image})
    

Attributes:

  • name: the name of the task
  • command: the path to the notebook relative to the project root.
  • next: the next node to call if the function succeeds. Use success to terminate the pipeline successfully or fail to terminate with fail.
  • on_failure: The next node in case of failure.
  • catalog: mapping of cataloging items
  • overrides: mapping of step overrides from global configuration.
dag:
  steps:
    name: <>
      type: task
      command: <>
      next: <>
      on_failure: <>
      catalog: # Any cataloging to be done.
      overrides: # mapping of overrides of global configuration

Stub

runnable.Stub

A node that passes through the pipeline with no action. Just like pass in Python. Please refer to concepts for more information.

A stub node can tak arbitrary number of arguments.

Attributes:

  • name (str) –

    The name of the node.

  • command (str) –

    The path to the notebook relative the project root.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.


Catalog

runnable.Catalog

Use to instruct a task to sync data from/to the central catalog. Please refer to concepts for more information.

Attributes:

  • get (List[str]) –

    List of glob patterns to get from central catalog to the compute data folder.

  • put (List[str]) –

    List of glob patterns to put into central catalog from the compute data folder.

Examples:

>>> from runnable import Catalog
>>> catalog = Catalog(compute_data_folder="/path/to/data", get=["*.csv"], put=["*.csv"])

Pipeline

runnable.Pipeline

A Pipeline is a sequence of Steps.

Attributes:

  • steps (List[Stub | PythonTask | NotebookTask | ShellTask | Parallel | Map]]) –

    A list of Steps that make up the Pipeline.

    The order of steps is important as it determines the order of execution. Any on failure behavior should the first step in on_failure pipelines.

  • on_failure (List[List[Pipeline]) –

    A list of Pipelines to execute in case of failure.

    For example, for the below pipeline: step1 >> step2 and step1 to reach step3 in case of failure.

    failure_pipeline = Pipeline(steps=[step1, step3])
    
    pipeline = Pipeline(steps=[step1, step2, on_failure=[failure_pipeline])
    
  • name (str) –

    The name of the Pipeline. Defaults to "".

  • description (str) –

    A description of the Pipeline. Defaults to "".

The pipeline implicitly add success and fail nodes.

execute

execute(configuration_file: str = '', run_id: str = '', tag: str = '', parameters_file: str = '', log_level: str = defaults.LOG_LEVEL)

Execute the Pipeline.

Execution of pipeline could either be:

Traverse and execute all the steps of the pipeline, eg. local execution.

Or create the representation of the pipeline for other executors.

Please refer to concepts for more information.

Parameters:

  • configuration_file (str, default: '' ) –

    The path to the configuration file. Defaults to "". The configuration file can be overridden by the environment variable RUNNABLE_CONFIGURATION_FILE.

  • run_id (str, default: '' ) –

    The ID of the run. Defaults to "".

  • tag (str, default: '' ) –

    The tag of the run. Defaults to "". Use to group multiple runs.

  • parameters_file (str, default: '' ) –

    The path to the parameters file. Defaults to "".

  • log_level (str, default: LOG_LEVEL ) –

    The log level. Defaults to defaults.LOG_LEVEL.

Parallel

runnable.Parallel

A node that executes multiple branches in parallel. Please refer to concepts for more information.

Attributes:

  • name (str) –

    The name of the node.

  • branches (Dict[str, Pipeline]) –

    A dictionary of branches to execute in parallel.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node.

  • on_failure (str) –

    The name of the node to execute if any of the branches fail.


Map

runnable.Map

A node that iterates over a list of items and executes a pipeline for each item. Please refer to concepts for more information.

Attributes:

  • branch (Pipeline) –

    The pipeline to execute for each item.

  • iterate_on (str) –

    The name of the parameter to iterate over. The parameter should be defined either by previous steps or statically at the start of execution.

  • iterate_as (str) –

    The name of the iterable to be passed to functions.

  • reducer (Callable) –

    The function to reduce the results of the branches.

  • overrides (Dict[str, Any]) –

    Any overrides to the command.