Building Custom Pipeline Executors¶
Execute pipelines on any orchestration platform by understanding the two fundamental orchestration patterns.
The Two Orchestration Patterns¶
Pipeline executors handle DAG execution in two fundamentally different ways:
Pattern 1: DAG Traversal (Local, Local-Container)¶
How it works:
- Runnable traverses the DAG in Python
- For each node: Calls
trigger_node_execution() - Node execution happens locally or via individual job submission
Use for: Simple platforms that submit individual jobs
Pattern 2: DAG Transpilation (Argo Workflows)¶
How it works:
- Runnable converts the entire DAG into the platform's native workflow format
- The platform handles DAG traversal (Argo's workflow engine, Airflow scheduler, etc.)
- Individual nodes use CLI pattern:
runnable execute-single-node ...
Use for: Platforms with native workflow capabilities (Argo, Airflow, Prefect)
Implementation Examples¶
Pattern 1: DAG Traversal (Local Executor)¶
class LocalExecutor(GenericPipelineExecutor):
service_name: str = "local"
def trigger_node_execution(self, node, map_variable=None):
"""Called for each node - execute directly"""
# Direct execution using base class
self.execute_node(node=node, map_variable=map_variable)
def execute_node(self, node, map_variable=None):
"""Execute the node directly in current process"""
self._execute_node(node=node, map_variable=map_variable)
Key insight: Runnable traverses the DAG, calls trigger_node_execution() for each node, which directly executes the task.
Pattern 2: DAG Transpilation (Argo Executor)¶
class ArgoExecutor(GenericPipelineExecutor):
service_name: str = "argo"
def execute_from_graph(self, dag, map_variable=None):
"""Convert entire DAG to Argo WorkflowTemplate"""
# Transpile runnable DAG to Argo workflow YAML
argo_workflow = self._transpile_dag_to_argo_workflow(dag)
# Submit to Kubernetes - Argo handles the DAG traversal
self._submit_argo_workflow(argo_workflow)
def trigger_node_execution(self, node, map_variable=None):
"""This runs INSIDE Argo pods via CLI commands"""
# Called by: runnable execute-single-node <node-name>
# Set up storage access for the pod environment
self._setup_pod_storage_access()
# Execute the node
self._execute_node(node=node, map_variable=map_variable)
Key insight: Argo's workflow engine traverses the DAG, calls CLI commands that invoke trigger_node_execution() in each pod.
The Critical Issue: Storage Access¶
Same issue as job executors: Run logs and catalog must be accessible in the execution environment.
Local Execution: Direct Access¶
class LocalExecutor(GenericPipelineExecutor):
def trigger_node_execution(self, node, map_variable=None):
# Storage accessible locally - proceed directly
self._execute_node(node=node, map_variable=map_variable)
Remote Execution: Volume Mounting¶
Local-Container Pattern - Mount host directories into containers:
class LocalContainerExecutor(GenericPipelineExecutor):
def trigger_node_execution(self, node, map_variable=None):
# Mount host storage into container
self._mount_volumes()
# Run CLI command in container
command = f"runnable execute-single-node {node.name}"
self._run_in_container(command)
def _mount_volumes(self):
# Map host paths to container paths
if self._context.run_log_store.service_name == "file-system":
host_logs = self._context.run_log_store.log_folder
self._volumes[host_logs] = {"bind": "/tmp/run_logs/", "mode": "rw"}
if self._context.catalog.service_name == "file-system":
host_catalog = self._context.catalog.catalog_location
self._volumes[host_catalog] = {"bind": "/tmp/catalog/", "mode": "rw"}
Argo/K8s Pattern - Use PersistentVolumeClaims:
class ArgoExecutor(GenericPipelineExecutor):
def _transpile_dag_to_argo_workflow(self, dag):
# Add PVC mounts to every pod in the workflow
workflow_spec = {
"spec": {
"volumes": [
{"name": "run-logs", "persistentVolumeClaim": {"claimName": "runnable-logs-pvc"}},
{"name": "catalog", "persistentVolumeClaim": {"claimName": "runnable-catalog-pvc"}}
],
"templates": [
{
"container": {
"volumeMounts": [
{"name": "run-logs", "mountPath": "/tmp/run_logs/"},
{"name": "catalog", "mountPath": "/tmp/catalog/"}
]
}
}
]
}
}
def trigger_node_execution(self, node, map_variable=None):
# This runs in Argo pod - update context to use mounted paths
self._use_mounted_storage()
self._execute_node(node=node, map_variable=map_variable)
def _use_mounted_storage(self):
# Point to PVC mount paths
if self._context.run_log_store.service_name == "file-system":
self._context.run_log_store.log_folder = "/tmp/run_logs/"
if self._context.catalog.service_name == "file-system":
self._context.catalog.catalog_location = "/tmp/catalog/"
The pattern: Make sure run logs and catalog are accessible in every execution environment (container, pod, remote job).
Plugin Registration¶
Create your executor and register it:
from extensions.pipeline_executor import GenericPipelineExecutor
from pydantic import Field
class MyPlatformExecutor(GenericPipelineExecutor):
service_name: str = "my-platform"
# Your platform config fields
api_endpoint: str = Field(...)
project_id: str = Field(...)
def trigger_node_execution(self, node, map_variable=None):
# Your platform node execution logic
pass
def execute_from_graph(self, dag, map_variable=None):
# Optional: For DAG transpilation platforms only
pass
Register in pyproject.toml:
[project.entry-points.'pipeline_executor']
"my-platform" = "my_package.executors:MyPlatformExecutor"
Which Pattern to Choose?¶
DAG Traversal (trigger_node_execution only):
- For: Simple batch platforms (AWS Batch, SLURM, etc.)
- How: Runnable calls your method for each node
- Storage: Handle volumes/mounts in
trigger_node_execution()
DAG Transpilation (both methods):
- For: Workflow platforms (Argo, Airflow, Prefect, etc.)
- How: Convert entire DAG to platform's native workflow format
- Storage: Handle volumes/mounts in the transpiled workflow spec
The complexity is in translating DAG semantics (parallel branches, conditionals) to your platform's workflow language.
Integration Advantage¶
🔑 Key Benefit: Custom executors live entirely in your codebase, not in public repositories or external dependencies.
Complete Control & Privacy¶
# In your private repository
# my-company/internal-ml-platform/executors/company_executor.py
class CompanyBatchExecutor(GenericPipelineExecutor):
service_name: str = "company-batch"
# Your internal configuration
internal_api_endpoint: str = Field(...)
security_group: str = Field(...)
compliance_tags: dict = Field(default_factory=dict)
def trigger_node_execution(self, node, map_variable=None):
# Your proprietary integration logic
# Company-specific security, monitoring, cost tracking
pass
Integration benefits:
- 🔒 Security: No external dependencies or public code exposure
- 🏢 Compliance: Implement organization-specific governance and audit requirements
- 💰 Cost Control: Integrate with internal cost tracking and resource management
- 🔧 Customization: Build reusable templates for your exact infrastructure
- 📊 Monitoring: Integrate with dashboards and alerting systems
Reusable Templates¶
Teams can create internal libraries of executors:
# Internal package: company-runnable-executors
from company_runnable_executors import (
ProductionK8sExecutor, # Your Kubernetes setup
StagingBatchExecutor, # Your staging environment
ComplianceExecutor, # SOC2/HIPAA requirements
CostOptimizedExecutor, # Spot instances + cost tracking
)
# Teams use your standardized executors
class MLTrainingPipeline(Pipeline):
def production_config(self):
return ProductionK8sExecutor(
namespace="ml-prod",
resource_limits=self.get_approved_limits(),
compliance_mode=True
)
Ecosystem Integration¶
# Your company's standard configuration templates
pipeline-executor:
type: company-batch
config:
internal_api_endpoint: "https://internal-batch.company.com"
security_group: "ml-workloads-sg"
compliance_tags:
project: "{{PROJECT_ID}}"
cost_center: "{{COST_CENTER}}"
data_classification: "confidential"
monitoring:
dashboard_url: "https://company-monitoring.com/runnable"
alert_channels: ["#ml-alerts", "#devops-alerts"]
This makes runnable a platform for building your internal ML infrastructure, not just using external services.
Need Help?¶
Custom pipeline executors are complex integrations that require deep understanding of both runnable's architecture and your target platform's orchestration model.
Get Support
We're here to help you succeed! Building custom executors involves intricate details about:
- Graph traversal and dependency management
- Step log coordination and error handling
- Parameter passing and context management
- Platform-specific workflow translation patterns
Don't hesitate to reach out:
- 📧 Contact the team for architecture guidance and implementation support
- 🤝 Collaboration opportunities - we're interested in supporting enterprise integrations
- 📖 Documentation feedback - help us improve these guides based on your experience
Better together: Complex orchestration integrations benefit from collaboration between platform experts (you) and runnable architecture experts (us).
Highly Complex Integration
These are among the most sophisticated integrations in runnable that involve:
- Deep understanding of runnable's graph execution engine and step lifecycle
- Complex orchestration platform APIs and workflow specification formats
- Distributed execution coordination, failure handling, and state management
- Advanced container orchestration, networking, and resource management patterns
Success requires significant expertise in both domains. The existing orchestration integrations (especially Argo) took substantial development effort to get right - collaboration dramatically increases your chances of success.
Your success with custom pipeline executors helps the entire runnable community!