Job
Jobs are isolated unit of work which can be python functions, jupyter notebooks or shell scripts.
Considering a simple function:
The runnable representation of it is:
from functions import add_numbers
from runnable import PythonJob, Catalog
write_catalog = Catalog(put=["data.csv"])
job = PythonJob(function=add_numbers,
returns["sum_of_numbers"],
catalog=write_catalog,
)
PythonJob
requires a function to call. The input parameters are passed in
from the parameters provided at the time of execution.
The return parameters are stored for future reference. Any data object generated in the process can be saved to the catalog.
Python functions¶
You can use Python functions as jobs in a pipeline, enabling flexible encapsulation of logic, parameter passing, result capturing, and cataloging of outputs.
"""
You can execute this pipeline by:
python examples/01-tasks/python_tasks.py
The stdout of "Hello World!" would be captured as execution
log and stored in the catalog.
An example of the catalog structure:
.catalog
└── baked-heyrovsky-0602
└── hello.execution.log
2 directories, 1 file
The hello.execution.log has the captured stdout of "Hello World!".
"""
from examples.common.functions import hello
from runnable import PythonJob
def main():
job = PythonJob(function=hello)
job.execute()
return job
if __name__ == "__main__":
main()
The stdout (e.g., "Hello World!") and logs are captured and stored in the catalog for traceability.
from examples.common.functions import write_files
from runnable import Catalog, PythonJob
def main():
write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"])
job = PythonJob(
function=write_files,
catalog=write_catalog,
)
job.execute()
return job
if __name__ == "__main__":
main()
The Catalog
object specifies which files or data should be saved after job execution.
"""
The below example shows how to set/get parameters in python
tasks of the pipeline.
The function, set_parameter, returns
- JSON serializable types
- pydantic models
- pandas dataframe, any "object" type
pydantic models are implicitly handled by runnable
but "object" types should be marked as "pickled".
Use pickled even for python data types is advised for
reasonably large collections.
Run the below example as:
python examples/03-parameters/passing_parameters_python.py
"""
from examples.common.functions import write_parameter
from runnable import PythonJob, metric, pickled
def main():
job = PythonJob(
function=write_parameter,
returns=[
pickled("df"),
"integer",
"floater",
"stringer",
"pydantic_param",
metric("score"),
],
)
job.execute()
return job
if __name__ == "__main__":
main()
Parameters can be passed at execution time, and returned values can be automatically handled, serialized, and tracked as metrics.
Notebooks¶
You can also use Jupyter notebooks as jobs in your pipeline. This allows you to encapsulate notebook logic, capture outputs, and integrate notebooks seamlessly into your workflow.
"""
You can execute this pipeline by:
python examples/11-jobs/notebooks.py
The output of the notebook will be captured as execution
log and stored in the catalog.
"""
from runnable import NotebookJob
def main():
job = NotebookJob(
notebook="examples/common/simple_notebook.ipynb",
)
job.execute()
return job
if __name__ == "__main__":
main()
Shell script¶
You can also use shell scripts or commands as jobs in your pipeline. This allows you to execute any shell command, capture its output, and integrate it into your workflow.
"""
You can execute this pipeline by:
python examples/01-tasks/scripts.py
The command can be anything that can be executed in a shell.
The stdout/stderr of the execution is captured as execution log and stored in the catalog.
For example:
.catalog
└── seasoned-perlman-1355
└── hello.execution.log
"""
from runnable import ShellJob
def main():
# If this step executes successfully, the pipeline will terminate with success
job = ShellJob(command="echo 'Hello World!'")
job.execute()
return job
if __name__ == "__main__":
main()
For more advanced examples, see the files in examples/11-jobs/
.