Skip to content

Argo Workflows Pipeline Execution

Scale your pipelines to the cloud with distributed parallel execution using Argo Workflows - the most powerful execution environment for production ML workflows.

No Additional Dependencies

Argo Workflows execution works with the base runnable installation - no additional dependencies required:

pip install runnable

Note: The runnable[k8s] dependency is only needed for Kubernetes job executors (k8s-job, mini-k8s-job), not for Argo Workflows pipeline execution.

Distributed Parallel Execution

Cloud-scale parallelization! Argo runs your parallel and map nodes across multiple Kubernetes pods, providing distributed computing power and elastic scaling beyond what's possible on a single machine.

Getting Started

Simple Cloud Setup

Runnable generates standard Argo workflow YAML - your infrastructure team can deploy it using existing Kubernetes and Argo tooling!

Basic Configuration

pipeline-executor:
  type: argo
  config:
    image: "my-pipeline:latest"  # Your containerized pipeline
    output_file: "workflow.yaml"  # Generated Argo workflow

Simple Example

from runnable import Pipeline, PythonTask, Parallel

def process_data_a():
    print("Processing dataset A...")
    # Your ML logic here
    return {"accuracy": 0.95}

def process_data_b():
    print("Processing dataset B...")
    # Your ML logic here
    return {"accuracy": 0.92}

def combine_results(results_a, results_b):
    print(f"A: {results_a['accuracy']}, B: {results_b['accuracy']}")
    return {"best": max(results_a['accuracy'], results_b['accuracy'])}

def main():
    # These run in parallel in Argo!
    parallel_processing = Parallel(
        name="process_datasets",
        branches={
            "dataset_a": PythonTask(function=process_data_a, name="process_a"),
            "dataset_b": PythonTask(function=process_data_b, name="process_b")
        }
    )

    combine_task = PythonTask(
        function=combine_results,
        name="combine"
    )

    pipeline = Pipeline(steps=[parallel_processing, combine_task])
    pipeline.execute()  # Generates workflow.yaml

if __name__ == "__main__":
    main()
pipeline-executor:
  type: argo
  config:
    image: "my-pipeline:latest"
    output_file: "workflow.yaml"

Generate and deploy the workflow:

# Generate Argo workflow
RUNNABLE_CONFIGURATION_FILE=config.yaml uv run pipeline.py

# Deploy to your Kubernetes cluster
kubectl apply -f workflow.yaml

Parallel Execution

In this example, process_data_a and process_data_b run simultaneously on different Kubernetes pods, then combine_results runs after both complete.

Why Use Argo Workflows?

Cloud-Scale Benefits

Distributed parallelization: parallel and map nodes run across multiple pods

  • Faster pipelines: Utilize multiple machines and CPU cores across your cluster
  • 🔄 Elastic scaling: Kubernetes automatically manages resources and scaling
  • 🏗️ Production ready: Battle-tested in enterprise environments
  • 📊 Rich monitoring: Native Kubernetes and Argo UI integration

Trade-offs

  • ⚙️ Infrastructure requirement: Needs Kubernetes cluster with Argo installed
  • 🐳 Container overhead: Each task runs in separate pods
  • 🔧 Setup complexity: More moving parts than local executors

Advanced Features

Dynamic Parameters

Runtime Parameter Control

Make your workflows configurable by exposing parameters to the Argo UI:

pipeline-executor:
  type: argo
  config:
    image: "my-pipeline:latest"
    output_file: "workflow.yaml"
    expose_parameters_as_inputs: true  # Enable parameter inputs

Now parameters become configurable in the Argo UI at runtime!

Storage and Persistence

Shared Storage Between Tasks

Use persistent volumes to share data between tasks:

pipeline-executor:
  type: argo
  config:
    image: "my-pipeline:latest"
    persistent_volumes:
      - name: shared-data
        mount_path: /shared

Kubernetes Secrets

Secure Credential Management

Access cluster secrets in your pipeline tasks:

pipeline-executor:
  type: argo
  config:
    image: "my-pipeline:latest"
    secrets_from_k8s:
      - environment_variable: DB_CONNECTION
        secret_name: database-credentials
        secret_key: connection_string

Resource Management

Custom Resource Requirements

Different tasks can have different compute requirements:

pipeline-executor:
  type: argo
  config:
    image: "my-pipeline:latest"
    # Default resources
    resources:
      requests:
        memory: "1Gi"
        cpu: "250m"
      limits:
        memory: "2Gi"
        cpu: "500m"
    # Step-specific overrides
    overrides:
      gpu_training:
        resources:
          requests:
            memory: "8Gi"
            cpu: "2"
            nvidia.com/gpu: "1"
          limits:
            memory: "16Gi"
            cpu: "4"
            nvidia.com/gpu: "1"

Then use the override in your pipeline:

gpu_task = PythonTask(
    function=train_model,
    name="train_with_gpu",
    overrides={"argo": "gpu_training"}
)

Parallelism Control

Manage Resource Usage

Control how many tasks run simultaneously to avoid overwhelming your cluster:

pipeline-executor:
  type: argo
  config:
    image: "my-pipeline:latest"
    parallelism: 5  # Max 5 tasks running at once
    overrides:
      sequential_processing:
        parallelism: 1  # Force sequential execution

Node Selection

Target Specific Nodes

Run tasks on specific node types (e.g., GPU nodes):

pipeline-executor:
  type: argo
  config:
    image: "my-pipeline:latest"
    overrides:
      gpu_nodes:
        node_selector:
          accelerator: "nvidia-tesla-k80"
        tolerations:
          - key: "gpu"
            operator: "Equal"
            value: "true"
            effect: "NoSchedule"

Configuration Reference

Executes the pipeline using Argo Workflows.

The defaults configuration is kept similar to the Argo Workflow spec.

Configuration:

pipeline-executor:
  type: argo
  config:
    pvc_for_runnable: "my-pvc"
    custom_volumes:
      - mount_path: "/tmp"
        persistent_volume_claim:
          claim_name: "my-pvc"
          read_only: false/true
    expose_parameters_as_inputs: true/false
    secrets_from_k8s:
      - key1
      - key2
      - ...
    output_file: "argo-pipeline.yaml"
    log_level: "DEBUG"/"INFO"/"WARNING"/"ERROR"/"CRITICAL"
    defaults:
      image: "my-image"
      activeDeadlineSeconds: 86400
      failFast: true
      nodeSelector:
        label: value
      parallelism: 1
      retryStrategy:
        backoff:
        duration: "2m"
        factor: 2
        maxDuration: "1h"
        limit: 0
        retryPolicy: "Always"
      timeout: "1h"
      tolerations:
      imagePullPolicy: "Always"/"IfNotPresent"/"Never"
      resources:
        limits:
          memory: "1Gi"
          cpu: "250m"
          gpu: 0
        requests:
          memory: "1Gi"
          cpu: "250m"
      env:
        - name: "MY_ENV"
        value: "my-value"
        - name: secret_env
        secretName: "my-secret"
        secretKey: "my-key"
    overrides:
      key1:
        ... similar structure to defaults

    argoWorkflow:
      metadata:
        annotations:
          key1: value1
          key2: value2
        generateName: "my-workflow"
        labels:
          key1: value1

As of now, runnable needs a pvc to store the logs and the catalog; provided by pvc_for_runnable. - custom_volumes can be used to mount additional volumes to the container.

  • expose_parameters_as_inputs can be used to expose the initial parameters as inputs to the workflow.
  • secrets_from_k8s can be used to expose the secrets from the k8s secret store.
  • output_file is the file where the argo pipeline will be dumped.
  • log_level is the log level for the containers.
  • defaults is the default configuration for all the containers.

Production Considerations

Infrastructure Requirements

Before using Argo: Ensure your cluster has Argo Workflows installed and configured (Argo can run on Kubernetes or standalone)

Service Compatibility

Storage: Use shared storage (persistent volumes) for catalog and run_log_store - the buffered run log store won't work across pods

Secrets: Use Kubernetes secrets via secrets_from_k8s rather than .env files

Complete Production Example

Full Configuration with Best Practices

# production-argo-config.yaml
pipeline-executor:
  type: argo
  config:
    # Core settings
    image: "my-pipeline:v1.2.3"
    output_file: "workflow.yaml"

    # Runtime parameters
    expose_parameters_as_inputs: true

    # Storage
    persistent_volumes:
      - name: shared-storage
        mount_path: /shared

    # Security
    secrets_from_k8s:
      - environment_variable: DB_CONNECTION
        secret_name: database-credentials
        secret_key: connection_string

    # Resource management
    parallelism: 10
    resources:
      requests:
        memory: "2Gi"
        cpu: "500m"
      limits:
        memory: "4Gi"
        cpu: "1"

    # Task-specific overrides
    overrides:
      gpu_training:
        resources:
          requests:
            nvidia.com/gpu: "1"
            memory: "8Gi"
            cpu: "2"
          limits:
            nvidia.com/gpu: "1"
            memory: "16Gi"
            cpu: "4"
        node_selector:
          accelerator: "nvidia-tesla-v100"

      lightweight_tasks:
        resources:
          requests:
            memory: "512Mi"
            cpu: "100m"
          limits:
            memory: "1Gi"
            cpu: "200m"

# Supporting services
run_log_store:
  type: file-system
  config:
    log_folder: /shared/logs

catalog_handler:
  type: file-system
  config:
    catalog_location: /shared/data

Deployment Workflow

  1. Generate workflow: RUNNABLE_CONFIGURATION_FILE=production-argo-config.yaml uv run my_pipeline.py
  2. Review generated YAML: Check workflow.yaml for correctness
  3. Deploy to cluster: kubectl apply -f workflow.yaml
  4. Monitor execution: Use Argo UI or kubectl to track progress

CI/CD Integration

In production, integrate this into your CI/CD pipeline to automatically generate and deploy workflows when your pipeline code changes.

When to Use Argo Workflows

Choose Argo When

  • Need distributed parallel execution across multiple machines
  • Running production ML workloads at scale
  • Want elastic resource management (auto-scaling)
  • Have Kubernetes infrastructure available
  • Need specialized compute for different tasks (CPUs, GPUs, memory)
  • Require more parallelism than a single machine can provide

Use Local Container When

  • Testing container-based pipelines before cloud deployment
  • Single-machine parallel execution is sufficient
  • Want simpler setup and debugging
  • Don't need distributed computing resources

Use Local Executor When

  • Development and experimentation
  • All tasks use the same environment
  • Want fastest possible development iteration

Advanced: Complex Nested Workflows

Nested Pipeline Support

Runnable supports deeply nested workflows with Map inside Parallel inside Map structures. Argo handles the complexity automatically - you just write simple Python pipeline code and Runnable generates the appropriate workflow DAGs.