Async & Streaming Execution¶
Execute async functions and stream results in real-time with runnable's native async capabilities.
The Core Insight¶
Runnable natively supports both batch and streaming workflows in a single framework. While traditional orchestrators like Kedro, Metaflow, and Airflow excel at batch processing, runnable uniquely combines:
- Batch Processing: Traditional data pipelines with full reproducibility and orchestration
- Real-Time Streaming: Native AsyncGenerator support for LLM inference, APIs, and streaming data
AsyncPipeline and AsyncPythonTask bring this streaming capability natively to runnable, enabling real-time workflows while maintaining all of runnable's core features: parameter management, catalog system, reproducibility, and multi-environment execution.
Local Execution Only
Async capabilities are currently only supported for local pipeline execution. AsyncPipeline and AsyncPythonTask cannot be used with containerized (local-container) or Kubernetes (argo) pipeline executors.
When to Use Async Execution¶
- LLM and AI model inference with streaming responses
- Real-time data processing that produces intermediate results
- Long-running async operations that benefit from streaming feedback
- FastAPI integration with Server-Sent Events (SSE)
- WebSocket streaming and event-driven architectures
AsyncPythonTask¶
Execute async functions with native await support and optional streaming:
Basic Async Function¶
from runnable import AsyncPythonTask
import asyncio
async def fetch_data():
await asyncio.sleep(2) # Simulate async operation
return {"status": "complete", "data": [1, 2, 3]}
async def main():
task = AsyncPythonTask(
name="fetch_task",
function=fetch_data,
returns=["result"]
)
# Convert to pipeline and execute with streaming
pipeline = task.as_async_pipeline()
# Stream events from the async pipeline
async for event in pipeline.execute_streaming():
print(f"Event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
Streaming with AsyncGenerator¶
For real-time streaming, use AsyncGenerator functions that yield events:
from runnable import AsyncPythonTask
import asyncio
from typing import AsyncGenerator
async def mock_llm_stream(prompt: str) -> AsyncGenerator[dict, None]:
"""Stream LLM response token by token."""
# Initial status
yield {"type": "status", "status": "thinking"}
await asyncio.sleep(0.3)
# Simulate streaming response
response = f"Response to: {prompt}"
words = response.split()
yield {"type": "status", "status": "generating"}
# Stream word by word
for word in words:
yield {"type": "chunk", "text": word + " "}
await asyncio.sleep(0.05)
# Final event with complete response
yield {"type": "done", "full_text": response}
async def main():
task = AsyncPythonTask(
name="llm_stream",
function=mock_llm_stream,
returns=["full_text"],
stream_end_type="done" # Which event contains final values
)
pipeline = task.as_async_pipeline()
# Stream events in real-time
async for event in pipeline.execute_streaming():
print(f"Streaming event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
AsyncPipeline¶
Execute sequences of async tasks with streaming support:
Multi-Step Async Pipeline¶
from runnable import AsyncPipeline, AsyncPythonTask
import asyncio
async def process_input(text: str):
await asyncio.sleep(0.5)
return text.upper()
async def analyze_text(processed_text: str):
await asyncio.sleep(0.3)
word_count = len(processed_text.split())
return {"word_count": word_count, "processed": processed_text}
async def main():
pipeline = AsyncPipeline(
name="text_processing",
steps=[
AsyncPythonTask(
name="process",
function=process_input,
returns=["processed_text"]
),
AsyncPythonTask(
name="analyze",
function=analyze_text,
returns=["analysis"]
)
]
)
# Stream events from the multi-step pipeline
async for event in pipeline.execute_streaming():
print(f"Pipeline event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
Streaming Execution¶
Access events from AsyncGenerator functions in real-time:
from runnable import AsyncPipeline, AsyncPythonTask
async def streaming_analysis(data: str) -> AsyncGenerator[dict, None]:
yield {"type": "status", "message": "Starting analysis"}
# Simulate processing steps
steps = ["tokenizing", "parsing", "analyzing", "summarizing"]
for step in steps:
yield {"type": "progress", "step": step}
await asyncio.sleep(0.2)
yield {"type": "done", "summary": f"Analyzed: {data[:50]}..."}
async def main():
pipeline = AsyncPipeline(
steps=[
AsyncPythonTask(
name="stream_analysis",
function=streaming_analysis,
returns=["summary"]
)
]
)
# Stream real-time events from AsyncGenerator function
async for event in pipeline.execute_streaming():
print(f"Real-time event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
FastAPI Integration¶
Create streaming APIs with Server-Sent Events (SSE):
Basic FastAPI Setup¶
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json
import os
app = FastAPI()
class ChatRequest(BaseModel):
prompt: str
@app.post("/chat")
async def chat_stream(request: ChatRequest):
"""Stream LLM response via Server-Sent Events."""
async def event_stream():
# Set pipeline parameter
os.environ["RUNNABLE_PRM_prompt"] = request.prompt
try:
# Your async pipeline from previous examples
pipeline = create_chat_pipeline()
# Stream events directly to client
async for event in pipeline.execute_streaming():
yield f"data: {json.dumps(event)}\n\n"
finally:
os.environ.pop("RUNNABLE_PRM_prompt", None)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no"
}
)
def create_chat_pipeline():
return AsyncPythonTask(
name="llm_response",
function=mock_llm_stream,
returns=["full_text"]
).as_async_pipeline()
Advanced Patterns¶
Parallel Async Execution¶
Combine async tasks with parallel execution:
from runnable import AsyncPipeline, AsyncPythonTask, Parallel
async def translate_spanish(text: str) -> AsyncGenerator[dict, None]:
yield {"type": "status", "message": "Translating to Spanish"}
await asyncio.sleep(0.5)
yield {"type": "done", "spanish_translation": f"ES: {text}"}
async def translate_french(text: str) -> AsyncGenerator[dict, None]:
yield {"type": "status", "message": "Translating to French"}
await asyncio.sleep(0.4)
yield {"type": "done", "french_translation": f"FR: {text}"}
async def main():
pipeline = AsyncPipeline(
steps=[
# First generate text
AsyncPythonTask(
name="generate",
function=mock_llm_stream,
returns=["full_text"]
),
# Then translate in parallel
Parallel(
name="translate_parallel",
branches={
"spanish": AsyncPythonTask(
name="spanish",
function=translate_spanish,
returns=["spanish_translation"]
).as_async_pipeline(),
"french": AsyncPythonTask(
name="french",
function=translate_french,
returns=["french_translation"]
).as_async_pipeline()
}
)
]
)
# Stream events from LLM generation and parallel translations
async for event in pipeline.execute_streaming():
print(f"Parallel execution event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
Map with Async Streaming¶
Process multiple items with async streaming:
from runnable import AsyncPipeline, AsyncPythonTask, Map
async def process_item(text: str, language: str) -> AsyncGenerator[dict, None]:
yield {"type": "status", "message": f"Processing {language}"}
await asyncio.sleep(0.3)
yield {"type": "done", "result": f"[{language.upper()}] {text}"}
async def main():
# Branch that processes one language
process_branch = AsyncPythonTask(
name="process_language",
function=process_item,
returns=["result"]
).as_async_pipeline()
pipeline = AsyncPipeline(
steps=[
AsyncPythonTask(
name="generate_text",
function=mock_llm_stream,
returns=["full_text"]
),
Map(
name="process_languages",
iterate_on="languages", # Parameter: ["spanish", "french", "german"]
iterate_as="language", # Current language in loop
branch=process_branch
)
]
)
# Stream events from text generation and map processing
async for event in pipeline.execute_streaming():
print(f"Map streaming event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
Event Types and Stream Configuration¶
Standard Event Types¶
AsyncGenerator functions should yield events with a type field:
status: Processing status updatesprogress: Step-by-step progress informationchunk: Incremental data (text tokens, partial results)done: Final completion with return valueserror: Error information
Stream End Configuration¶
Control which event contains the final return values:
AsyncPythonTask(
name="custom_stream",
function=my_async_generator,
returns=["result"],
stream_end_type="complete" # Look for type="complete" events
)
The framework extracts return values from the specified event type, using all keys except type as the returned data.
Best Practices¶
AsyncGenerator Patterns¶
async def well_structured_stream(input_data: str) -> AsyncGenerator[dict, None]:
"""Best practice streaming function structure."""
try:
# Always start with status
yield {"type": "status", "status": "starting", "input": input_data}
# Processing with progress updates
steps = ["step1", "step2", "step3"]
for i, step in enumerate(steps):
yield {"type": "progress", "step": step, "completed": i, "total": len(steps)}
await asyncio.sleep(0.1) # Your actual work here
# Stream incremental results if applicable
result_parts = ["part1", "part2", "part3"]
full_result = []
for part in result_parts:
yield {"type": "chunk", "data": part}
full_result.append(part)
await asyncio.sleep(0.05)
# Always end with complete data
yield {
"type": "done",
"final_result": " ".join(full_result),
"metadata": {"processing_time": "estimated"}
}
except Exception as e:
yield {"type": "error", "error": str(e)}
raise
Error Handling¶
Handle errors gracefully in async pipelines:
async def safe_async_operation(data: str) -> AsyncGenerator[dict, None]:
try:
yield {"type": "status", "status": "processing"}
if not data:
raise ValueError("Empty input data")
# Your processing
await asyncio.sleep(0.5)
result = data.upper()
yield {"type": "done", "result": result}
except Exception as e:
yield {"type": "error", "error": str(e), "failed_input": data}
# Re-raise to fail the pipeline step
raise
FastAPI Best Practices¶
@app.post("/stream-endpoint")
async def stream_endpoint(request: RequestModel):
async def event_stream():
# Always use try/finally for cleanup
os.environ["RUNNABLE_PRM_input"] = request.input
try:
pipeline = create_streaming_pipeline()
async for event in pipeline.execute_streaming(
run_id=f"req-{datetime.now().isoformat()}"
):
# Validate events before sending
if isinstance(event, dict) and "type" in event:
yield f"data: {json.dumps(event)}\n\n"
except Exception as e:
# Send error event to client
error_event = {"type": "error", "error": str(e)}
yield f"data: {json.dumps(error_event)}\n\n"
finally:
# Always clean up parameters
os.environ.pop("RUNNABLE_PRM_input", None)
return StreamingResponse(
event_stream(),
media_type="text/event-stream"
)
Complete Example¶
See the FastAPI LLM streaming example for a complete working implementation with:
- Multiple async pipeline patterns
- FastAPI SSE integration
- Parallel and map async execution
- Conditional async workflows
- Production-ready error handling
# Run the example
cd examples/fastapi_llm
uv run uvicorn main:app --reload
# Test streaming
curl -N -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"prompt": "Hello, how are you?"}'
Key Differences from Sync Execution¶
| Feature | Sync Pipeline | AsyncPipeline |
|---|---|---|
| Execution | pipeline.execute() |
async for event in pipeline.execute_streaming() |
| Streaming | Not available | ✅ Native streaming support |
| Task Type | PythonTask |
AsyncPythonTask |
| Functions | Regular functions | async def or AsyncGenerator |
| Use Cases | Batch processing | Real-time streaming, LLMs, APIs |
The async capabilities enable entirely new patterns like real-time LLM streaming, progressive data processing, and seamless FastAPI integration while maintaining runnable's core principles of reproducibility and configuration management.