🛡️ Failure Handling¶
Control what happens when tasks fail, with graceful recovery and alternative paths.
Default behavior¶
By default, any task failure stops the entire pipeline:
flowchart TD
A[Step 1] --> B[Step 2]
B --> C[❌ FAILS]
C --> D[Pipeline Stops]
E[Step 3] -.->|Never runs| F[❌]
from runnable import Pipeline, PythonTask, Stub
from examples.common.functions import hello, raise_ex
def main():
# Normal pipeline - fails on any error
step1 = PythonTask(name="step_1", function=hello)
step2 = PythonTask(name="step_2", function=raise_ex) # This will fail!
step3 = Stub(name="step_3") # Never runs
pipeline = Pipeline(steps=[step1, step2, step3])
pipeline.execute() # Stops at step2 failure
return pipeline
if __name__ == "__main__":
main()
See complete runnable code
examples/02-sequential/default_fail.py
"""
When defining a Pipeline(), it automatically adds a success node and failure node.
By default any failure in a step is considered to be a failure in the pipeline.
In the below example, the progression would be as follows:
step 1 >> step 2 >> fail
Corresponds to:
try:
step1()
step2() # Raises the exception
step3()
except Exception as e:
raise e
You can run this example by:
python examples/02-sequential/default_fail.py
"""
from examples.common.functions import hello, raise_ex
from runnable import Pipeline, PythonTask, Stub
def main():
step1 = PythonTask(name="step 1", function=hello)
step2 = PythonTask(name="step 2", function=raise_ex) # This will fail
step3 = Stub(name="step 3") # This step will not be executed
pipeline = Pipeline(steps=[step1, step2, step3])
pipeline.execute()
return pipeline
if __name__ == "__main__":
main()
Try it now:
Flow: step 1 → step 2 (fails) → pipeline stops
Custom failure handling¶
Specify what to do when a task fails:
flowchart TD
A[Step 1] --> B[❌ FAILS]
B --> C[Recovery Pipeline]
C --> D[✅ Success]
E[Step 2] -.->|Skipped| F[❌]
G[Step 3] -.->|Skipped| H[❌]
from runnable import Pipeline, PythonTask, Stub
from examples.common.functions import raise_ex
def main():
# Create recovery pipeline
recovery_pipeline = Stub(name="recovery_step").as_pipeline()
# Set up failure handling
step_1 = PythonTask(name="step_1", function=raise_ex) # This will fail
step_1.on_failure = recovery_pipeline # Run this instead
step_2 = Stub(name="step_2") # Skipped (normal flow interrupted)
step_3 = Stub(name="step_3") # Skipped (normal flow interrupted)
pipeline = Pipeline(steps=[step_1, step_2, step_3])
pipeline.execute() # Runs: step_1 → fails → recovery_pipeline → success!
return pipeline
if __name__ == "__main__":
main()
See complete runnable code
examples/02-sequential/on_failure_succeed.py
"""
This pipeline showcases handling failures in a pipeline.
The path taken if none of the steps failed:
step_1 -> step_2 -> step_3 -> success
step_1 is a python function that raises an exception.
And we can instruct the pipeline to execute step_4 if step_1 fails
and then eventually succeed too.
step_1 -> step_4 -> success
This pattern is handy when you are expecting a failure of a step
and have ways to handle it.
Corresponds to:
try:
step1() # Raises the exception
step2()
step3()
except Exception as e:
step4()
Run this pipeline:
python examples/02-sequential/on_failure_succeed.py
"""
from examples.common.functions import raise_ex
from runnable import Pipeline, PythonTask, Stub
def main():
step_1 = PythonTask(name="step_1", function=raise_ex) # [concept:failing-task]
step_2 = Stub(name="step_2")
step_3 = Stub(name="step_3")
on_failure_pipeline = Stub(name="step_4").as_pipeline() # [concept:failure-pipeline]
step_1.on_failure = on_failure_pipeline # [concept:failure-handling]
pipeline = Pipeline( # [concept:pipeline]
steps=[step_1, step_2, step_3],
)
pipeline.execute() # [concept:execution]
return pipeline
if __name__ == "__main__":
main()
Try it now:
Flow: step_1 (fails) → step_4 → success
How it works¶
- Task fails → Normal execution stops
on_failurepipeline executes instead- Recovery pipeline can succeed or fail
- Pipeline continues if recovery succeeds
Real-world patterns¶
flowchart TD
A[Fetch Latest Data] --> B{Success?}
B -->|✅| C[Process Latest]
B -->|❌| D[Use Cached Data]
C --> E[Results]
D --> E
Data fallback¶
# Example failure handling configuration (partial code)
fetch_latest_data_task.on_failure = use_cached_data_pipeline
Retry with different settings¶
# Example failure handling configuration (partial code)
fast_model_task.on_failure = robust_model_pipeline
Error reporting¶
# Example failure handling configuration (partial code)
critical_task.on_failure = send_alert_pipeline
Graceful degradation¶
# Example failure handling configuration (partial code)
feature_extraction_task.on_failure = use_simple_features_pipeline
Failure pipeline example¶
Helper function (creates a recovery pipeline):
def create_fallback_pipeline():
log_error = PythonTask(
name="log_error",
function=log_failure_details
)
use_backup = PythonTask(
name="use_backup",
function=load_backup_data,
returns=["data"]
)
return Pipeline(steps=[log_error, use_backup])
# Usage example (partial code)
main_task.on_failure = create_fallback_pipeline()
When to use failure handling¶
Essential for: - Production pipelines that must complete - Data pipelines with unreliable sources - ML pipelines with multiple model options - External API integrations - File processing with backup sources
Failure strategy
- Fail fast: For development and testing
- Graceful recovery: For production systems
- Log everything: Always capture failure details
- Test failures: Verify your recovery paths work
Next: Learn about mocking and testing.