Skip to content

Job

Jobs are isolated unit of work which can be python functions, jupyter notebooks or shell scripts.

Considering a simple function:

def add_numbers(x: int, y: int):
    # save some data in data.csv
    return x + y

The runnable representation of it is:

from functions import add_numbers
from runnable import PythonJob, Catalog

write_catalog = Catalog(put=["data.csv"])
job = PythonJob(function=add_numbers,
                returns["sum_of_numbers"],
                catalog=write_catalog,
            )

PythonJob requires a function to call. The input parameters are passed in from the parameters provided at the time of execution.

The return parameters are stored for future reference. Any data object generated in the process can be saved to the catalog.


Python functions

You can use Python functions as jobs in a pipeline, enabling flexible encapsulation of logic, parameter passing, result capturing, and cataloging of outputs.

"""
You can execute this pipeline by:

    python examples/01-tasks/python_tasks.py

The stdout of "Hello World!" would be captured as execution
log and stored in the catalog.

An example of the catalog structure:

.catalog
└── baked-heyrovsky-0602
    └── hello.execution.log

2 directories, 1 file


The hello.execution.log has the captured stdout of "Hello World!".
"""

from examples.common.functions import hello
from runnable import PythonJob


def main():
    job = PythonJob(function=hello)

    job.execute()

    return job


if __name__ == "__main__":
    main()

The stdout (e.g., "Hello World!") and logs are captured and stored in the catalog for traceability.

from examples.common.functions import write_files
from runnable import Catalog, PythonJob


def main():
    write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"])
    job = PythonJob(
        function=write_files,
        catalog=write_catalog,
    )

    job.execute()

    return job


if __name__ == "__main__":
    main()

The Catalog object specifies which files or data should be saved after job execution.

"""
The below example shows how to set/get parameters in python
tasks of the pipeline.

The function, set_parameter, returns
    - JSON serializable types
    - pydantic models
    - pandas dataframe, any "object" type

pydantic models are implicitly handled by runnable
but "object" types should be marked as "pickled".

Use pickled even for python data types is advised for
reasonably large collections.

Run the below example as:
    python examples/03-parameters/passing_parameters_python.py

"""

from examples.common.functions import write_parameter
from runnable import PythonJob, metric, pickled


def main():
    job = PythonJob(
        function=write_parameter,
        returns=[
            pickled("df"),
            "integer",
            "floater",
            "stringer",
            "pydantic_param",
            metric("score"),
        ],
    )

    job.execute()

    return job


if __name__ == "__main__":
    main()

Parameters can be passed at execution time, and returned values can be automatically handled, serialized, and tracked as metrics.


Notebooks

You can also use Jupyter notebooks as jobs in your pipeline. This allows you to encapsulate notebook logic, capture outputs, and integrate notebooks seamlessly into your workflow.

"""
You can execute this pipeline by:

    python examples/11-jobs/notebooks.py

The output of the notebook will be captured as execution
log and stored in the catalog.


"""

from runnable import NotebookJob


def main():
    job = NotebookJob(
        notebook="examples/common/simple_notebook.ipynb",
    )

    job.execute()

    return job


if __name__ == "__main__":
    main()
The output of the notebook will be captured as execution log along with the actual notebook and stored in the catalog for traceability.


Shell script

You can also use shell scripts or commands as jobs in your pipeline. This allows you to execute any shell command, capture its output, and integrate it into your workflow.

"""
You can execute this pipeline by:

    python examples/01-tasks/scripts.py

The command can be anything that can be executed in a shell.
The stdout/stderr of the execution is captured as execution log and stored in the catalog.

For example:

.catalog
└── seasoned-perlman-1355
    └── hello.execution.log

"""

from runnable import ShellJob


def main():
    # If this step executes successfully, the pipeline will terminate with success
    job = ShellJob(command="echo 'Hello World!'")

    job.execute()

    return job


if __name__ == "__main__":
    main()
The stdout and stderr of the shell command are captured as execution log and stored in the catalog for traceability.

For more advanced examples, see the files in examples/11-jobs/.