๐งช Async & Streaming Execution (Experimental - Proof of Concept)¶
Execute async functions and stream results in real-time with runnable's experimental async capabilities.
Not Recommended for Production Agentic Frameworks
These async capabilities are experimental and primarily serve as a proof of possibility. They are NOT recommended for production agentic frameworks or complex LLM applications.
For production agentic applications, use mature, purpose-built frameworks:
- Pydantic AI - Production-ready async agent framework with robust LLM integration
- LangChain - Comprehensive framework for LLM applications with extensive tooling
- CrewAI - Multi-agent orchestration with advanced coordination patterns
- AutoGen - Multi-agent conversation framework by Microsoft
Runnable's async features are:
- โ Limited to local execution only (no containers, no Kubernetes)
- โ Experimental and subject to breaking changes
- โ Not optimized for complex agent workflows
- โ Lacking advanced LLM integration patterns
Runnable excels at data pipeline orchestration, not agentic frameworks.
Experimental Feature Details
Async and streaming capabilities are experimental and under active development. Features may change, and stability is not guaranteed for production use.
Overview¶
Runnable provides experimental async and streaming support for specialized use cases requiring real-time processing:
- Batch Processing: Core production-ready data pipelines with full reproducibility and orchestration
- Experimental Streaming: AsyncGenerator support for LLM inference, APIs, and streaming data
AsyncPipeline and AsyncPythonTask are experimental features that enable streaming workflows while maintaining runnable's core features: parameter management, catalog system, reproducibility, and multi-environment execution.
Local Execution Only
Async capabilities are currently only supported for local pipeline execution. AsyncPipeline and AsyncPythonTask cannot be used with containerized (local-container) or Kubernetes (argo) pipeline executors.
When to Use Async Execution¶
- LLM and AI model inference with streaming responses
- Real-time data processing that produces intermediate results
- Long-running async operations that benefit from streaming feedback
- FastAPI integration with Server-Sent Events (SSE)
- WebSocket streaming and event-driven architectures
AsyncPythonTask¶
Execute async functions with native await support and optional streaming:
Basic Async Function¶
from runnable import AsyncPythonTask
import asyncio
async def fetch_data():
await asyncio.sleep(2) # Simulate async operation
return {"status": "complete", "data": [1, 2, 3]}
async def main():
task = AsyncPythonTask(
name="fetch_task",
function=fetch_data,
returns=["result"]
)
# Convert to pipeline and execute with streaming
pipeline = task.as_async_pipeline()
# Stream events from the async pipeline
async for event in pipeline.execute_streaming():
print(f"Event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
Streaming with AsyncGenerator¶
For real-time streaming, use AsyncGenerator functions that yield events:
from runnable import AsyncPythonTask
import asyncio
from typing import AsyncGenerator
async def mock_llm_stream(prompt: str) -> AsyncGenerator[dict, None]:
"""Stream LLM response token by token."""
# Initial status
yield {"type": "status", "status": "thinking"}
await asyncio.sleep(0.3)
# Simulate streaming response
response = f"Response to: {prompt}"
words = response.split()
yield {"type": "status", "status": "generating"}
# Stream word by word
for word in words:
yield {"type": "chunk", "text": word + " "}
await asyncio.sleep(0.05)
# Final event with complete response
yield {"type": "done", "full_text": response}
async def main():
task = AsyncPythonTask(
name="llm_stream",
function=mock_llm_stream,
returns=["full_text"],
stream_end_type="done" # Which event contains final values
)
pipeline = task.as_async_pipeline()
# Stream events in real-time
async for event in pipeline.execute_streaming():
print(f"Streaming event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
AsyncPipeline¶
Execute sequences of async tasks with streaming support:
Multi-Step Async Pipeline¶
from runnable import AsyncPipeline, AsyncPythonTask
import asyncio
async def process_input(text: str):
await asyncio.sleep(0.5)
return text.upper()
async def analyze_text(processed_text: str):
await asyncio.sleep(0.3)
word_count = len(processed_text.split())
return {"word_count": word_count, "processed": processed_text}
async def main():
pipeline = AsyncPipeline(
name="text_processing",
steps=[
AsyncPythonTask(
name="process",
function=process_input,
returns=["processed_text"]
),
AsyncPythonTask(
name="analyze",
function=analyze_text,
returns=["analysis"]
)
]
)
# Stream events from the multi-step pipeline
async for event in pipeline.execute_streaming():
print(f"Pipeline event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
Streaming Execution¶
Access events from AsyncGenerator functions in real-time:
from runnable import AsyncPipeline, AsyncPythonTask
async def streaming_analysis(data: str) -> AsyncGenerator[dict, None]:
yield {"type": "status", "message": "Starting analysis"}
# Simulate processing steps
steps = ["tokenizing", "parsing", "analyzing", "summarizing"]
for step in steps:
yield {"type": "progress", "step": step}
await asyncio.sleep(0.2)
yield {"type": "done", "summary": f"Analyzed: {data[:50]}..."}
async def main():
pipeline = AsyncPipeline(
steps=[
AsyncPythonTask(
name="stream_analysis",
function=streaming_analysis,
returns=["summary"]
)
]
)
# Stream real-time events from AsyncGenerator function
async for event in pipeline.execute_streaming():
print(f"Real-time event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
FastAPI Integration¶
Create streaming APIs with Server-Sent Events (SSE):
Basic FastAPI Setup¶
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json
import os
app = FastAPI()
class ChatRequest(BaseModel):
prompt: str
@app.post("/chat")
async def chat_stream(request: ChatRequest):
"""Stream LLM response via Server-Sent Events."""
async def event_stream():
# Set pipeline parameter
os.environ["RUNNABLE_PRM_prompt"] = request.prompt
try:
# Your async pipeline from previous examples
pipeline = create_chat_pipeline()
# Stream events directly to client
async for event in pipeline.execute_streaming():
yield f"data: {json.dumps(event)}\n\n"
finally:
os.environ.pop("RUNNABLE_PRM_prompt", None)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no"
}
)
def create_chat_pipeline():
return AsyncPythonTask(
name="llm_response",
function=mock_llm_stream,
returns=["full_text"]
).as_async_pipeline()
Advanced Patterns¶
Parallel Async Execution¶
Combine async tasks with parallel execution:
from runnable import AsyncPipeline, AsyncPythonTask, Parallel
async def translate_spanish(text: str) -> AsyncGenerator[dict, None]:
yield {"type": "status", "message": "Translating to Spanish"}
await asyncio.sleep(0.5)
yield {"type": "done", "spanish_translation": f"ES: {text}"}
async def translate_french(text: str) -> AsyncGenerator[dict, None]:
yield {"type": "status", "message": "Translating to French"}
await asyncio.sleep(0.4)
yield {"type": "done", "french_translation": f"FR: {text}"}
async def main():
pipeline = AsyncPipeline(
steps=[
# First generate text
AsyncPythonTask(
name="generate",
function=mock_llm_stream,
returns=["full_text"]
),
# Then translate in parallel
Parallel(
name="translate_parallel",
branches={
"spanish": AsyncPythonTask(
name="spanish",
function=translate_spanish,
returns=["spanish_translation"]
).as_async_pipeline(),
"french": AsyncPythonTask(
name="french",
function=translate_french,
returns=["french_translation"]
).as_async_pipeline()
}
)
]
)
# Stream events from LLM generation and parallel translations
async for event in pipeline.execute_streaming():
print(f"Parallel execution event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
Map with Async Streaming¶
Process multiple items with async streaming:
from runnable import AsyncPipeline, AsyncPythonTask, Map
async def process_item(text: str, language: str) -> AsyncGenerator[dict, None]:
yield {"type": "status", "message": f"Processing {language}"}
await asyncio.sleep(0.3)
yield {"type": "done", "result": f"[{language.upper()}] {text}"}
async def main():
# Branch that processes one language
process_branch = AsyncPythonTask(
name="process_language",
function=process_item,
returns=["result"]
).as_async_pipeline()
pipeline = AsyncPipeline(
steps=[
AsyncPythonTask(
name="generate_text",
function=mock_llm_stream,
returns=["full_text"]
),
Map(
name="process_languages",
iterate_on="languages", # Parameter: ["spanish", "french", "german"]
iterate_as="language", # Current language in loop
branch=process_branch
)
]
)
# Stream events from text generation and map processing
async for event in pipeline.execute_streaming():
print(f"Map streaming event: {event}")
return pipeline
if __name__ == "__main__":
asyncio.run(main())
Event Types and Stream Configuration¶
Standard Event Types¶
AsyncGenerator functions should yield events with a type field:
status: Processing status updatesprogress: Step-by-step progress informationchunk: Incremental data (text tokens, partial results)done: Final completion with return valueserror: Error information
Stream End Configuration¶
Control which event contains the final return values:
AsyncPythonTask(
name="custom_stream",
function=my_async_generator,
returns=["result"],
stream_end_type="complete" # Look for type="complete" events
)
The framework extracts return values from the specified event type, using all keys except type as the returned data.
Best Practices¶
AsyncGenerator Patterns¶
async def well_structured_stream(input_data: str) -> AsyncGenerator[dict, None]:
"""Best practice streaming function structure."""
try:
# Always start with status
yield {"type": "status", "status": "starting", "input": input_data}
# Processing with progress updates
steps = ["step1", "step2", "step3"]
for i, step in enumerate(steps):
yield {"type": "progress", "step": step, "completed": i, "total": len(steps)}
await asyncio.sleep(0.1) # Your actual work here
# Stream incremental results if applicable
result_parts = ["part1", "part2", "part3"]
full_result = []
for part in result_parts:
yield {"type": "chunk", "data": part}
full_result.append(part)
await asyncio.sleep(0.05)
# Always end with complete data
yield {
"type": "done",
"final_result": " ".join(full_result),
"metadata": {"processing_time": "estimated"}
}
except Exception as e:
yield {"type": "error", "error": str(e)}
raise
Error Handling¶
Handle errors gracefully in async pipelines:
async def safe_async_operation(data: str) -> AsyncGenerator[dict, None]:
try:
yield {"type": "status", "status": "processing"}
if not data:
raise ValueError("Empty input data")
# Your processing
await asyncio.sleep(0.5)
result = data.upper()
yield {"type": "done", "result": result}
except Exception as e:
yield {"type": "error", "error": str(e), "failed_input": data}
# Re-raise to fail the pipeline step
raise
FastAPI Best Practices¶
@app.post("/stream-endpoint")
async def stream_endpoint(request: RequestModel):
async def event_stream():
# Always use try/finally for cleanup
os.environ["RUNNABLE_PRM_input"] = request.input
try:
pipeline = create_streaming_pipeline()
async for event in pipeline.execute_streaming(
run_id=f"req-{datetime.now().isoformat()}"
):
# Validate events before sending
if isinstance(event, dict) and "type" in event:
yield f"data: {json.dumps(event)}\n\n"
except Exception as e:
# Send error event to client
error_event = {"type": "error", "error": str(e)}
yield f"data: {json.dumps(error_event)}\n\n"
finally:
# Always clean up parameters
os.environ.pop("RUNNABLE_PRM_input", None)
return StreamingResponse(
event_stream(),
media_type="text/event-stream"
)
Complete Example¶
See the FastAPI LLM streaming example for a complete working implementation with:
- Multiple async pipeline patterns
- FastAPI SSE integration
- Parallel and map async execution
- Conditional async workflows
- Production-ready error handling
# Run the example
cd examples/fastapi_llm
uv run uvicorn main:app --reload
# Test streaming
curl -N -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"prompt": "Hello, how are you?"}'
Key Differences from Sync Execution¶
| Feature | Sync Pipeline | AsyncPipeline |
|---|---|---|
| Execution | pipeline.execute() |
async for event in pipeline.execute_streaming() |
| Streaming | Not available | โ Native streaming support |
| Task Type | PythonTask |
AsyncPythonTask |
| Functions | Regular functions | async def or AsyncGenerator |
| Use Cases | Batch processing | Real-time streaming, LLMs, APIs |
The async capabilities enable entirely new patterns like real-time LLM streaming, progressive data processing, and seamless FastAPI integration while maintaining runnable's core principles of reproducibility and configuration management.