Skip to content

Sdk

runnable.Catalog

Use to instruct a task to sync data from/to the central catalog. Please refer to concepts for more information.

Attributes:

  • get (List[str]) –

    List of glob patterns to get from central catalog to the compute data folder.

  • put (List[str]) –

    List of glob patterns to put into central catalog from the compute data folder.

Examples:

>>> from runnable import Catalog
>>> catalog = Catalog(compute_data_folder="/path/to/data", get=["*.csv"], put=["*.csv"])

runnable.Stub

A node that passes through the pipeline with no action. Just like pass in Python. Please refer to concepts for more information.

A stub node can tak arbitrary number of arguments.

Attributes:

  • name (str) –

    The name of the node.

  • command (str) –

    The path to the notebook relative the project root.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.


runnable.PythonTask

An execution node of the pipeline of python functions. Please refer to concepts for more information.

Attributes:

  • name (str) –

    The name of the node.

  • function (callable) –

    The function to execute.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

  • returns (List[Union[str, TaskReturns]]) –

    A list of the names of variables to return from the task. The names should match the order of the variables returned by the function.

    TaskReturns: can be JSON friendly variables, objects or metrics.

    By default, all variables are assumed to be JSON friendly and will be serialized to JSON. Pydantic models are readily supported and will be serialized to JSON.

    To return a python object, please use pickled(<name>). It is advised to use pickled(<name>) for big JSON friendly variables.

    For example,

    from runnable import pickled
    
    def f():
        ...
        x = 1
        return x, df # A simple JSON friendly variable and a python object.
    
    task = PythonTask(name="task", function=f, returns=["x", pickled(df)]))
    

    To mark any JSON friendly variable as a metric, please use metric(x). Metric variables should be JSON friendly and can be treated just like any other parameter.

  • catalog (Optional[Catalog]) –

    The files sync data from/to, refer to Catalog.

  • secrets (List[str]) –

    List of secrets to pass to the task. They are exposed as environment variables and removed after execution.

  • overrides (Dict[str, Any]) –

    Any overrides to the command. Individual tasks can override the global configuration config by referring to the specific override.

    For example,

    Global configuration
    executor:
      type: local-container
      config:
        docker_image: "runnable/runnable:latest"
        overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"
    
    Task specific configuration
    task = PythonTask(name="task", function="function'",
            overrides={'local-container': custom_docker_image})
    

runnable.ShellTask

An execution node of the pipeline of shell script. Please refer to concepts for more information.

Attributes:

  • name (str) –

    The name of the node.

  • command (str) –

    The path to the notebook relative the project root.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

  • returns (List[str]) –

    A list of the names of environment variables to collect from the task.

    The names should match the order of the variables returned by the function. Shell based tasks can only return JSON friendly variables.

    To mark any JSON friendly variable as a metric, please use metric(x). Metric variables should be JSON friendly and can be treated just like any other parameter.

  • catalog (Optional[Catalog]) –

    The files sync data from/to, refer to Catalog.

  • secrets (List[str]) –

    List of secrets to pass to the task. They are exposed as environment variables

  • overrides (Dict[str, Any]) –

    Any overrides to the command. Individual tasks can override the global configuration config by referring to the specific override.

    For example,

    Global configuration
    executor:
      type: local-container
      config:
        docker_image: "runnable/runnable:latest"
        overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"
    
    Task specific configuration
    task = ShellTask(name="task", command="export x=1",
            overrides={'local-container': custom_docker_image})
    

runnable.NotebookTask

An execution node of the pipeline of notebook. Please refer to concepts for more information.

We internally use Ploomber engine to execute the notebook.

Attributes:

  • name (str) –

    The name of the node.

  • notebook (str) –

    The path to the notebook relative the project root.

  • optional_ploomber_args (Dict[str, Any]) –

    Any optional ploomber args, please refer to Ploomber engine for more information.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node. Defaults to False.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node. Defaults to False.

  • on_failure (str) –

    The name of the node to execute if the step fails.

  • returns (List[Union[str, TaskReturns]]) –

    A list of the names of variables to return from the task. The names should match the order of the variables returned by the function.

    TaskReturns: can be JSON friendly variables, objects or metrics.

    By default, all variables are assumed to be JSON friendly and will be serialized to JSON. Pydantic models are readily supported and will be serialized to JSON.

    To return a python object, please use pickled(<name>). It is advised to use pickled(<name>) for big JSON friendly variables.

    For example,

    from runnable import pickled
    
    # assume, example.ipynb is the notebook with df and x as variables in some cells.
    
    task = Notebook(name="task", notebook="example.ipynb", returns=["x", pickled(df)]))
    

    To mark any JSON friendly variable as a metric, please use metric(x). Metric variables should be JSON friendly and can be treated just like any other parameter.

  • catalog (Optional[Catalog]) –

    The files sync data from/to, refer to Catalog.

  • secrets (List[str]) –

    List of secrets to pass to the task. They are exposed as environment variables

  • overrides (Dict[str, Any]) –

    Any overrides to the command. Individual tasks can override the global configuration config by referring to the specific override.

    For example,

    Global configuration
    executor:
      type: local-container
      config:
        docker_image: "runnable/runnable:latest"
        overrides:
          custom_docker_image:
            docker_image: "runnable/runnable:custom"
    
    Task specific configuration
    task = NotebookTask(name="task", notebook="example.ipynb",
            overrides={'local-container': custom_docker_image})
    

runnable.Parallel

A node that executes multiple branches in parallel. Please refer to concepts for more information.

Attributes:

  • name (str) –

    The name of the node.

  • branches (Dict[str, Pipeline]) –

    A dictionary of branches to execute in parallel.

  • terminate_with_failure (bool) –

    Whether to terminate the pipeline with a failure after this node.

  • terminate_with_success (bool) –

    Whether to terminate the pipeline with a success after this node.

  • on_failure (str) –

    The name of the node to execute if any of the branches fail.


runnable.Map

A node that iterates over a list of items and executes a pipeline for each item. Please refer to concepts for more information.

Attributes:

  • branch (Pipeline) –

    The pipeline to execute for each item.

  • iterate_on (str) –

    The name of the parameter to iterate over. The parameter should be defined either by previous steps or statically at the start of execution.

  • iterate_as (str) –

    The name of the iterable to be passed to functions.

  • reducer (Callable) –

    The function to reduce the results of the branches.

  • overrides (Dict[str, Any]) –

    Any overrides to the command.


runnable.Success

A node that represents a successful execution of the pipeline.

Most often, there is no need to use this node as nodes can be instructed to terminate_with_success and pipeline with add_terminal_nodes=True.

Attributes:

  • name (str) –

    The name of the node.


runnable.Fail

A node that represents a failed execution of the pipeline.

Most often, there is no need to use this node as nodes can be instructed to terminate_with_failure and pipeline with add_terminal_nodes=True.

Attributes:

  • name (str) –

    The name of the node.


runnable.Pipeline

A Pipeline is a sequence of Steps.

Attributes:

  • steps (List[Stub | PythonTask | NotebookTask | ShellTask | Parallel | Map]]) –

    A list of Steps that make up the Pipeline.

    The order of steps is important as it determines the order of execution. Any on failure behavior should the first step in on_failure pipelines.

  • on_failure (List[List[Pipeline]) –

    A list of Pipelines to execute in case of failure.

    For example, for the below pipeline: step1 >> step2 and step1 to reach step3 in case of failure.

    failure_pipeline = Pipeline(steps=[step1, step3])
    
    pipeline = Pipeline(steps=[step1, step2, on_failure=[failure_pipeline])
    
  • name (str) –

    The name of the Pipeline. Defaults to "".

  • description (str) –

    A description of the Pipeline. Defaults to "".

The pipeline implicitly add success and fail nodes.

execute

execute(configuration_file: str = '', run_id: str = '', tag: str = '', parameters_file: str = '', log_level: str = defaults.LOG_LEVEL)

Execute the Pipeline.

Execution of pipeline could either be:

Traverse and execute all the steps of the pipeline, eg. local execution.

Or create the representation of the pipeline for other executors.

Please refer to concepts for more information.

Parameters:

  • configuration_file (str, default: '' ) –

    The path to the configuration file. Defaults to "". The configuration file can be overridden by the environment variable RUNNABLE_CONFIGURATION_FILE.

  • run_id (str, default: '' ) –

    The ID of the run. Defaults to "".

  • tag (str, default: '' ) –

    The tag of the run. Defaults to "". Use to group multiple runs.

  • parameters_file (str, default: '' ) –

    The path to the parameters file. Defaults to "".

  • log_level (str, default: LOG_LEVEL ) –

    The log level. Defaults to defaults.LOG_LEVEL.

model_post_init

model_post_init(__context: Any) -> None
The sequence of steps can either be

[step1, step2,..., stepN, [step11, step12,..., step1N], [step21, step22,...,]] indicates: - step1 > step2 > ... > stepN - We expect terminate with success or fail to be explicitly stated on a step. - If it is stated, the step cannot have a next step defined apart from "success" and "fail".

The inner list of steps is only to accommodate on-failure behaviors.
    - For sake of simplicity, lets assume that it has the same behavior as the happy pipeline.
    - A task which was already seen should not be part of this.
    - There should be at least one step which terminates with success

Any definition of pipeline should have one node that terminates with success.