📁 File Management Made Easy¶
Tired of managing temporary files between tasks? Runnable's catalog system handles it automatically and gives you complete execution traceability.
The old way (manual file management)¶
def create_report():
df = analyze_data()
df.to_csv("temp_results.csv") # Hope this exists later...
def send_report():
df = pd.read_csv("temp_results.csv") # Hope this file is there...
# What if the path changed? What if step 1 failed?
The Runnable way (automatic)¶
Step 1: Create and store files
from runnable import Catalog, PythonTask
def write_files():
# Create your files
df.to_csv("df.csv")
with open("data_folder/data.txt", "w") as f:
f.write("Important data")
# Store files automatically
task1 = PythonTask(
function=write_files,
catalog=Catalog(put=["df.csv", "data_folder/data.txt"])
)
Step 2: Retrieve and use files
def read_files():
# Files are automatically available here!
df = pd.read_csv("df.csv") # ✅ File is there
with open("data_folder/data.txt") as f:
data = f.read() # ✅ File is there
# Get files automatically
task2 = PythonTask(
function=read_files,
catalog=Catalog(get=["df.csv", "data_folder/data.txt"])
)
How it works¶
put=["file.csv"]→ Runnable stores the file safelyget=["file.csv"]→ Runnable makes the file available in the next task- No path management → Files appear where your code expects them
Full workflow example¶
from runnable import Pipeline, PythonTask, Catalog
def main():
# Complete workflow with automatic file management
pipeline = Pipeline(steps=[
PythonTask(
function=write_files,
catalog=Catalog(put=["df.csv", "data_folder/data.txt"]),
name="create_files"
),
PythonTask(
function=read_files,
catalog=Catalog(get=["df.csv", "data_folder/data.txt"]),
name="process_files"
)
])
pipeline.execute()
return pipeline
if __name__ == "__main__":
main()
See complete runnable code
"""
Demonstrates moving files within tasks.
- generate_data: creates df.csv and data_folder/data.txt
- delete_local_after_generate: deletes df.csv and data_folder/data.txt
This step ensures that the local files are deleted after the step
- read_data_py: reads df.csv and data_folder/data.txt
- delete_local_after_python_get: deletes df.csv and data_folder/data.txt
This step ensures that the local files are deleted after the step
- read_data_shell: reads df.csv and data_folder/data.txt
- delete_local_after_shell_get: deletes df.csv and data_folder/data.txt
This step ensures that the local files are deleted after the step
- read_data_notebook: reads df.csv and data_folder/data.txt
- delete_local_after_notebook_get: deletes df.csv and data_folder/data.txt
Use this pattern to move files that are not dill friendly.
All the files are stored in catalog.
Run this pipeline as:
python examples/04-catalog/catalog.py
You can execute this pipeline by:
python examples/04-catalog/catalog.py
"""
from examples.common.functions import read_files, write_files
from runnable import Catalog, NotebookTask, Pipeline, PythonTask, ShellTask
def main():
write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"])
generate_data = PythonTask(
name="generate_data",
function=write_files,
catalog=write_catalog,
)
delete_files_command = """
rm df.csv || true && \
rm data_folder/data.txt || true
"""
# delete from local files after generate
# since its local catalog, we delete to show "get from catalog"
delete_local_after_generate = ShellTask(
name="delete_after_generate",
command=delete_files_command,
)
read_catalog = Catalog(get=["df.csv", "data_folder/data.txt"])
read_data_python = PythonTask(
name="read_data_py",
function=read_files,
catalog=read_catalog,
)
delete_local_after_python_get = ShellTask(
name="delete_after_generate_python",
command=delete_files_command,
)
read_data_shell_command = """
(ls df.csv >> /dev/null 2>&1 && echo yes) || exit 1 && \
(ls data_folder/data.txt >> /dev/null 2>&1 && echo yes) || exit 1
"""
read_data_shell = ShellTask(
name="read_data_shell",
command=read_data_shell_command,
catalog=read_catalog,
)
delete_local_after_shell_get = ShellTask(
name="delete_after_generate_shell",
command=delete_files_command,
)
read_data_notebook = NotebookTask(
notebook="examples/common/read_files.ipynb",
name="read_data_notebook",
catalog=read_catalog,
)
delete_local_after_notebook_get = ShellTask(
name="delete_after_generate_notebook",
command=delete_files_command,
terminate_with_success=True,
)
pipeline = Pipeline(
steps=[
generate_data,
delete_local_after_generate,
read_data_python,
delete_local_after_python_get,
read_data_shell,
delete_local_after_shell_get,
read_data_notebook,
delete_local_after_notebook_get,
]
)
_ = pipeline.execute()
return pipeline
if __name__ == "__main__":
main()
Try it now:
Multiple files and folders¶
# Store multiple files explicitly
catalog=Catalog(put=["results.csv", "plots/", "model.pkl"])
# Retrieve what you need
catalog=Catalog(get=["results.csv", "model.pkl"])
No-Copy Mode for Large Files 🚀¶
For large files or datasets, copying can be expensive and unnecessary. Use store_copy=False to track files without copying them:
# Large dataset processing - track but don't copy
task1 = PythonTask(
function=process_large_dataset,
catalog=Catalog(put=["large_dataset.parquet", "model.pkl"], store_copy=False)
)
# Next task can still access the files
task2 = PythonTask(
function=analyze_results,
catalog=Catalog(get=["large_dataset.parquet"])
)
What happens with store_copy=False:
- ✅ MD5 hash captured for integrity verification
- ✅ Files remain in original location
- ✅ No disk space duplication for large files
- ✅ Faster execution - no time spent copying
- ✅ Still tracked in pipeline execution history
When to use no-copy mode:
- Large datasets (GB+ files) where copying is slow and expensive
- Reference data that doesn't change and is already stored safely
- Network storage where files are already backed up
- Performance-critical pipelines where copy time matters
Example with mixed copy strategies:
pipeline = Pipeline(steps=[
PythonTask(
function=prepare_data,
catalog=Catalog(put=[
"config.json", # Small file - copy it
"large_input.parquet" # Large file - hash only
], store_copy=False), # Applies to all files
name="prepare"
),
PythonTask(
function=process_data,
catalog=Catalog(get=["config.json", "large_input.parquet"]),
name="process"
)
])
Glob-style wildcards¶
Use wildcards to match multiple files automatically:
# Store all CSV files
catalog=Catalog(put=["*.csv"])
# Store all files in data folder
catalog=Catalog(put=["data/*"])
# Store all Python files recursively
catalog=Catalog(put=["**/*.py"])
# Store all files with specific pattern
catalog=Catalog(put=["results_*.json", "plots/*.png"])
Common wildcard patterns:
| Pattern | Matches |
|---|---|
*.csv |
All CSV files in current directory |
data/* |
All files in the data folder |
**/*.py |
All Python files in current and subdirectories |
results_*.json |
Files like results_train.json, results_test.json |
plots/*.png |
All PNG files in the plots folder |
Example with wildcards:
def main():
pipeline = Pipeline(steps=[
PythonTask(
function=create_multiple_outputs,
catalog=Catalog(put=["*.csv", "plots/*.png"]), # Store all CSVs and plot PNGs
name="generate_data"
),
PythonTask(
function=process_outputs,
catalog=Catalog(get=["data_*.csv", "plots/summary.png"]), # Get specific files
name="process_data"
)
])
pipeline.execute()
return pipeline
if __name__ == "__main__":
main()
Why this matters¶
Without catalog:
- ❌ Manual path management
- ❌ Files get lost between environments
- ❌ Hard to reproduce workflows
- ❌ Cleanup is manual
With catalog:
- ✅ Automatic file management
- ✅ Works across different environments
- ✅ Perfect reproducibility
- ✅ Automatic cleanup
Automatic execution traceability¶
Runnable automatically captures all execution outputs in the catalog for complete traceability:
What gets captured¶
For every task execution, Runnable stores:
- Execution logs - Complete stdout/stderr output from your tasks
- Output notebooks - Executed notebooks with all outputs and results (for NotebookTask)
- Environment information - Environment variables and execution context
- Timestamps - Precise execution timing information
Where to find execution outputs¶
.catalog/
└── {run-id}/ # Unique run identifier
├── taskname123.execution.log # Task stdout/stderr output
├── output_notebook.ipynb # Executed notebook (if NotebookTask)
└── data_files/ # Your catalog files
Example after running a pipeline:
.catalog/
└── pleasant-nobel-2303/
├── hello_task.execution.log # "Hello World!" output captured
├── data_processing.execution.log # All Python print statements
├── analysis_notebook_out.ipynb # Executed notebook with results
└── results.csv # Your data files
Complete execution visibility¶
Python tasks: Capture all print(), logging, warnings, and errors:
$ cat .catalog/run-id/python_task.execution.log
[23:02:46] Parameters available for the execution:
{'input_file': 'data.csv'}
Processing 1000 rows...
Model accuracy: 94.2%
WARNING: Low confidence predictions detected
Notebook tasks: Store executed notebooks with all outputs:
- Input notebook:
analysis.ipynb - Output notebook:
.catalog/run-id/analysis_out.ipynb(with all cell outputs) - Execution log:
.catalog/run-id/notebook_task.execution.log
Shell tasks: Capture all command output and environment:
$ cat .catalog/run-id/shell_task.execution.log
Installing dependencies...
Running analysis script...
=== COLLECT ===
RUNNABLE_RUN_ID=run-id
PWD=/path/to/project
Results saved to output.txt
Why this matters for debugging¶
No more digging through logs! Everything is organized by run ID:
def main():
pipeline = Pipeline(steps=[
PythonTask(function=data_processing, name="process"),
NotebookTask(notebook="analysis.ipynb", name="analyze"),
ShellTask(command="./deploy.sh", name="deploy")
])
result = pipeline.execute()
# Check .catalog/{run_id}/ for complete execution trace:
# - process123.execution.log (Python output)
# - analyze456_out.ipynb (executed notebook)
# - deploy789.execution.log (shell output)
return pipeline
if __name__ == "__main__":
main()
Best practices
- Use catalog for files that flow between tasks. Keep truly temporary files local.
- Use wildcards (
*.csv,data/*) to automatically capture multiple files without manual listing. - Be specific with wildcards to avoid capturing unwanted files (
results_*.csvvs*.csv). - Use
store_copy=Falsefor large files to save disk space and improve performance. - Check
.catalog/{run-id}/for complete execution traceability - no need to dig through environment-specific logs!
Next: See how the same code can run anywhere with different configurations.