Connecting the Workflow¶
So far we've been treating ML training as one big function. In reality, ML workflows have distinct steps: data loading, preprocessing, training, and evaluation. Let's break our monolithic function into a proper pipeline.
Why Break It Up?¶
Our current approach has limitations:
def train_ml_model_flexible():
# All steps in one function
df = load_data() # Step 1
preprocessed = preprocess_data() # Step 2
model = train_model() # Step 3
results = evaluate_model() # Step 4
return results
Problems:
- If training fails, you lose preprocessing work
- Hard to debug specific steps
- Can't reuse preprocessing for different models
- No visibility into step-by-step progress
The Solution: Pipeline with Tasks¶
Let's use the individual functions we already have and connect them as a pipeline:
from runnable import Pipeline, PythonTask, pickled
from functions import load_data, preprocess_data, train_model, evaluate_model
def main():
pipeline = Pipeline(steps=[
PythonTask(
function=load_data,
name="load_data",
returns=[pickled("df")]
),
PythonTask(
function=preprocess_data,
name="preprocess",
returns=[pickled("preprocessed_data")]
),
PythonTask(
function=train_model,
name="train",
returns=[pickled("model_data")]
),
PythonTask(
function=evaluate_model,
name="evaluate",
returns=[pickled("evaluation_results")]
)
])
pipeline.execute()
return pipeline
Try it:
How Data Flow Works Through Run Log¶
Runnable connects pipeline steps through run log parameter management:
returns=[pickled("df")]→ Run log stores parameter "df" (binary data in catalog)preprocess_data(df, ...)→ Run log provides "df" parameter (fetches from catalog if pickled)train_model(preprocessed_data)→ Run log provides "preprocessed_data" parameterevaluate_model(model_data, preprocessed_data)→ Run log provides both parameters
The pattern: Function parameter names must match the names in previous tasks' returns declarations because that's how the run log maps parameters.
What You Get with Pipelines¶
⚡ Step-by-Step Execution¶
Each step runs individually and you can see progress:
load_data: ✅ Completed in 0.1s
preprocess: ✅ Completed in 0.3s
train: ✅ Completed in 2.4s
evaluate: ✅ Completed in 0.2s
🔍 Intermediate Results Preserved¶
Each step's output is saved. You can inspect intermediate results without rerunning expensive steps:
🛠️ Better Debugging¶
If training fails, you don't lose your preprocessing work. You can debug just the training step.
📊 Individual Step Tracking¶
See timing and resource usage for each step, helping identify bottlenecks.
🔗 Understanding Parameter Naming¶
For data flow to work correctly, follow this naming pattern:
# Step 1: Function returns something, run log tracks as "df"
PythonTask(function=load_data, returns=[pickled("df")])
# Step 2: Function parameter "df" matches run log parameter "df"
def preprocess_data(df, test_size=0.2): # Gets "df" from run log
return preprocessed_data
# Step 3: Save as "preprocessed_data" in run log
PythonTask(function=preprocess_data, returns=[pickled("preprocessed_data")])
# Step 4: Parameter names match run log parameter names
def train_model(preprocessed_data, n_estimators=100): # Gets from run log
def evaluate_model(model_data, preprocessed_data): # Gets both from run log
Key Rule: Parameter names in later functions must exactly match the names in earlier returns declarations.
🚨 Common Parameter Issues¶
Problem: Parameter name doesn't match returns name
# Won't work - name mismatch!
PythonTask(function=load_data, returns=[pickled("dataframe")])
def preprocess_data(df, test_size=0.2): # Run log has "dataframe", expects "df"
Solution: Make parameter names match returns names
# Works - run log has "df", function expects "df"
PythonTask(function=load_data, returns=[pickled("df")])
def preprocess_data(df, test_size=0.2): # Gets "df" from run log
Debug Tip: Check run log files in .run_log_store/ to see actual parameter names stored.
Advanced: Parameters in Pipelines¶
You can still use parameters, but now at the step level:
# Add parameters to specific steps
pipeline = Pipeline(steps=[
PythonTask(function=load_data, name="load_data", returns=[pickled("df")]),
PythonTask(function=preprocess_data, name="preprocess", returns=[pickled("preprocessed_data")]),
PythonTask(function=train_model, name="train", returns=[pickled("model_data")]),
PythonTask(function=evaluate_model, name="evaluate", returns=[pickled("evaluation_results")])
])
# Parameters still work the same way
# RUNNABLE_PRM_test_size=0.3 uv run 04_connecting_workflow.py
Parameters get passed to the appropriate functions based on their parameter names.
Compare: Monolithic vs Pipeline¶
Monolithic Function (Chapters 1-3):
- ❌ All-or-nothing execution
- ❌ Hard to debug failed steps
- ❌ Expensive to rerun everything
- ❌ No intermediate result visibility
Pipeline (Chapter 4):
- ✅ Step-by-step execution with progress
- ✅ Intermediate results preserved
- ✅ Resume from failed steps
- ✅ Better debugging and development
- ✅ Parameter-based data flow between steps
Your Functions Didn't Change¶
Notice that we're using the exact same functions from earlier:
load_data()preprocess_data()train_model()evaluate_model()
No refactoring required. Runnable works with your existing functions - you just organize them into steps.
What's Next?¶
We have a great pipeline, but we're still dealing with everything in memory. What about large datasets that don't fit in RAM? Or sharing intermediate results with teammates?
Next chapter: We'll add efficient data management for large-scale ML workflows.
Next: Handling Large Datasets - Efficient storage and retrieval of data artifacts