🔀 Conditional Workflows¶
Make your pipeline choose different paths based on data or results.
The pattern¶
flowchart TD
A[Start] --> B[Toss Function]
B --> C{toss = ?}
C -->|heads| D[Heads Pipeline]
C -->|tails| E[Tails Pipeline]
D --> F[Continue]
E --> F
from runnable import Conditional, Pipeline, PythonTask, Stub
def main():
# Step 1: Make a decision
toss_task = PythonTask(
function=toss_function, # Returns "heads" or "tails"
returns=["toss"], # Named return for conditional to use
name="toss_task"
)
# Step 2: Branch based on decision
conditional = Conditional(
parameter="toss", # Use the "toss" value from above
branches={
"heads": create_heads_pipeline(), # Run this if toss="heads"
"tails": create_tails_pipeline() # Run this if toss="tails"
},
name="conditional"
)
# Step 3: Continue after branching
continue_step = Stub(name="continue_processing")
pipeline = Pipeline(steps=[toss_task, conditional, continue_step])
pipeline.execute()
return pipeline
if __name__ == "__main__":
main()
See complete runnable code
examples/02-sequential/conditional.py
from runnable import Conditional, Pipeline, PythonTask, Stub
def when_heads_function():
print("when_heads_function called")
def when_tails_function():
print("when_tails_function called")
def toss_function():
import os
import random
if "FIX_RANDOM_TOSS" in os.environ:
# Use the fixed value for testing
toss = os.environ["FIX_RANDOM_TOSS"]
print(f"Using fixed toss result: {toss}")
return toss
# Simulate a coin toss
toss = random.choice(["heads", "tails"])
print(f"Toss result: {toss}")
return toss
def main():
when_heads_pipeline = PythonTask( # [concept:branch-pipeline]
name="when_heads_task",
function=when_heads_function,
).as_pipeline()
when_tails_pipeline = PythonTask( # [concept:branch-pipeline]
name="when_tails_task",
function=when_tails_function,
).as_pipeline()
conditional = Conditional( # [concept:conditional]
name="conditional",
branches={
"heads": when_heads_pipeline,
"tails": when_tails_pipeline,
},
parameter="toss",
)
toss_task = PythonTask( # [concept:task-with-returns]
name="toss_task",
function=toss_function,
returns=["toss"],
)
continue_to = Stub(name="continue to") # [concept:stub]
pipeline = Pipeline(steps=[toss_task, conditional, continue_to]) # [concept:pipeline]
pipeline.execute() # [concept:execution]
return pipeline
if __name__ == "__main__":
main()
main()
Try it now:
How it works¶
toss_taskreturns a value named"toss"Conditionalusesparameter="toss"to check that valuebranches={}maps values to different pipelines- Only one branch executes based on the parameter value
The decision function¶
Helper function (makes the decision):
import random
def toss_function():
# Simulate a coin toss
toss = random.choice(["heads", "tails"])
print(f"Toss result: {toss}")
return toss # Returns "heads" or "tails"
Returns "heads" or "tails" - the conditional uses this to pick a branch.
Branch pipelines¶
Helper functions (create the branch pipelines):
def create_heads_pipeline():
return PythonTask(
function=when_heads_function,
name="when_heads_task"
).as_pipeline()
def create_tails_pipeline():
return PythonTask(
function=when_tails_function,
name="when_tails_task"
).as_pipeline()
Each branch is a complete pipeline that runs independently.
Real-world examples¶
flowchart TD
A[Data Input] --> B[Quality Check]
B --> C{data_quality}
C -->|good| D[Direct Analysis]
C -->|needs_cleaning| E[Clean → Analysis]
C -->|invalid| F[Error Handling]
D --> G[Results]
E --> G
F --> G
Data validation:
# Example conditional configuration (partial code)
conditional = Conditional(
parameter="data_quality", # returns "good", "needs_cleaning", "invalid"
branches={
"good": analysis_pipeline,
"needs_cleaning": cleanup_then_analysis_pipeline,
"invalid": error_handling_pipeline
}
)
Model selection:
# Example conditional configuration (partial code)
conditional = Conditional(
parameter="dataset_size", # returns "small", "medium", "large"
branches={
"small": simple_model_pipeline,
"medium": ensemble_pipeline,
"large": distributed_training_pipeline
}
)
Environment routing:
# Example conditional configuration (partial code)
conditional = Conditional(
parameter="environment", # returns "dev", "staging", "prod"
branches={
"dev": fast_testing_pipeline,
"staging": full_validation_pipeline,
"prod": production_pipeline
}
)
Conditional tips
- Always provide a branch for every possible return value
- Use meaningful parameter names like
"status","environment","model_type" - Consider using enums in your decision functions for type safety
Next: Learn about failure handling.