Streaming¶
Streaming is Conveyor’s key innovation - the ability to process and yield results as they become available, rather than waiting for all processing to complete. This enables better responsiveness, resource utilization, and user experience.
Streaming Concepts¶
Traditional vs Streaming Processing¶
Traditional Batch Processing:
# Traditional approach - wait for ALL results
async def traditional_processing(items):
results = []
for item in items:
result = await process_item(item)
results.append(result)
return results # All results available at once, at the end
all_results = await traditional_processing(large_dataset)
print(f"Got {len(all_results)} results") # Long wait, then everything at once
Conveyor Streaming:
# Streaming approach - results as they're ready
pipeline = process_item_task | transform_task | validate_task
async for result in pipeline(large_dataset):
print(f"Got result: {result}") # Results appear immediately as ready
# Can process, display, or save each result immediately
Performance Benefits¶
Streaming Performance Timeline
Example with 8 tasks taking [0.1s, 0.2s, 0.3s, 0.4s, 0.5s, 2.0s, 0.7s, 0.8s]:
Traditional approach:
Time: 2.0s → ALL results yielded at once
Conveyor streaming approach:
Time: 0.1s → yield result 1 ⚡
Time: 0.2s → yield result 2 ⚡
Time: 0.3s → yield result 3 ⚡
Time: 0.4s → yield result 4 ⚡
Time: 0.5s → yield result 5 ⚡
Time: 2.0s → yield results 6,7,8 (7,8 buffered, waiting for 6)
Result: First 5 results available 75% faster!
Execution Modes¶
Ordered Streaming (Default)¶
Preserves input order while streaming results as early as possible:
@single_task
async def variable_delay_task(item: tuple[str, float]) -> str:
name, delay = item
await asyncio.sleep(delay)
return f"Processed {name}"
async def ordered_streaming_demo():
items = [
("fast", 0.1),
("medium", 0.5),
("slow", 2.0),
("quick", 0.2)
]
pipeline = variable_delay_task
print("Ordered streaming:")
async for result in pipeline(items):
print(f" {asyncio.get_event_loop().time():.1f}s: {result}")
# Output order matches input order:
# 0.1s: Processed fast
# 0.5s: Processed medium
# 2.0s: Processed slow <- blocks subsequent results
# 2.0s: Processed quick <- immediately after slow completes
As-Completed Streaming¶
Yields results as they complete, regardless of input order:
async def as_completed_demo():
items = [
("fast", 0.1),
("medium", 0.5),
("slow", 2.0),
("quick", 0.2)
]
pipeline = variable_delay_task
print("As-completed streaming:")
async for result in pipeline.as_completed(items):
print(f" {asyncio.get_event_loop().time():.1f}s: {result}")
# Output in completion order:
# 0.1s: Processed fast <- fastest first
# 0.2s: Processed quick <- second fastest
# 0.5s: Processed medium <- third fastest
# 2.0s: Processed slow <- slowest last
Pipeline-Level Execution Mode¶
Set execution mode for entire pipeline:
# Create pipeline with specific execution mode
fast_pipeline = pipeline.with_execution_mode("as_completed")
async for result in fast_pipeline(data):
print(f"Result in completion order: {result}")
# Or use context
from conveyor import PipelineContext
context = PipelineContext(execution_mode="as_completed")
contextualized_pipeline = pipeline.with_context(**context.__dict__)
Streaming Patterns¶
Real-time Processing¶
Stream results for immediate processing:
@single_task
async def fetch_stock_price(symbol: str) -> dict:
# Simulate API call with variable latency
await asyncio.sleep(random.uniform(0.1, 1.0))
return {
"symbol": symbol,
"price": random.uniform(50, 200),
"timestamp": time.time()
}
@single_task
async def check_alerts(price_data: dict) -> dict | None:
"""Filter for prices that trigger alerts"""
if price_data["price"] > 150:
return {**price_data, "alert": "HIGH_PRICE"}
return None
async def real_time_monitoring():
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
pipeline = fetch_stock_price | check_alerts
print("Monitoring stock prices (streaming alerts)...")
async for alert in pipeline(symbols):
# Process alerts immediately as they're detected
print(f"🚨 ALERT: {alert['symbol']} at ${alert['price']:.2f}")
await send_notification(alert)
Progress Tracking¶
Show progress as items complete:
@single_task
async def process_document(doc_path: str) -> dict:
"""Process a document (variable processing time)"""
size = os.path.getsize(doc_path)
processing_time = size / 1000000 # Simulate size-based processing
await asyncio.sleep(processing_time)
return {
"path": doc_path,
"size": size,
"processed_at": time.time()
}
async def batch_document_processing():
documents = get_document_list() # Large list of documents
pipeline = process_document
total_docs = len(documents)
processed_count = 0
print(f"Processing {total_docs} documents...")
async for result in pipeline(documents):
processed_count += 1
progress = (processed_count / total_docs) * 100
print(f"[{progress:5.1f}%] Processed: {result['path']}")
# Update UI, save checkpoint, etc.
await update_progress_bar(progress)
Early Results for User Experience¶
Show results immediately for better UX:
@single_task
async def search_database(query: str) -> list[dict]:
"""Search returns results of varying relevance"""
await asyncio.sleep(random.uniform(0.1, 2.0)) # Variable search time
# Return multiple results
return [
{"query": query, "result": f"Result {i}", "relevance": random.random()}
for i in range(random.randint(1, 5))
]
@single_task
async def rank_result(result: dict) -> dict:
"""Add ranking information"""
await asyncio.sleep(0.1) # Ranking processing
return {**result, "ranked": True, "score": result["relevance"] * 100}
async def search_interface():
queries = ["python", "async", "streaming", "pipeline", "batch"]
pipeline = search_database | rank_result
print("Search results (streaming):")
async for result in pipeline.as_completed(queries):
# Show results immediately as they're ready
print(f" {result['query']}: {result['result']} (score: {result['score']:.1f})")
# Update UI immediately - no waiting for all searches
Collection vs Streaming¶
When to Stream¶
Use streaming when:
Results can be processed independently
User experience benefits from immediate feedback
Memory usage needs to be controlled
Early results provide value
# Good for streaming - independent results
async for user in pipeline(user_ids):
await send_email(user) # Each result processed independently
# Good for streaming - progress tracking
async for processed_file in pipeline(file_list):
update_progress_display(processed_file)
# Good for streaming - real-time updates
async for stock_price in pipeline(symbols):
update_dashboard(stock_price)
When to Collect¶
Use collection when:
Need all results for final computation
Results must be processed together
Order matters for final output
# Good for collection - need all results
all_prices = await pipeline(symbols).collect()
average_price = sum(p["price"] for p in all_prices) / len(all_prices)
# Good for collection - batch operations
all_users = await pipeline(user_data).collect()
await database.bulk_insert(all_users)
# Good for collection - sorting final results
results = await pipeline(queries).collect()
sorted_results = sorted(results, key=lambda x: x["score"], reverse=True)
Advanced Streaming Patterns¶
Buffered Streaming¶
Control memory usage with buffering:
@batch_task(max_size=10, min_size=1)
async def buffered_processor(items: list[dict]) -> list[dict]:
"""Process in controlled batches"""
print(f"Processing batch of {len(items)} items")
await asyncio.sleep(0.5) # Batch processing time
return [{"processed": True, **item} for item in items]
async def controlled_streaming():
large_dataset = range(1000) # Large dataset
pipeline = create_item | buffered_processor
# Results stream in batches, controlling memory usage
batch_count = 0
async for result in pipeline(large_dataset):
if batch_count % 50 == 0: # Log every 50 items
print(f"Processed item {result}")
batch_count += 1
Multi-source Streaming¶
Combine multiple input sources:
@single_task
async def fetch_from_api(endpoint: str) -> list[dict]:
"""Fetch data from API endpoint"""
async with aiohttp.ClientSession() as session:
async with session.get(endpoint) as response:
return await response.json()
async def multi_source_processing():
# Multiple data sources
endpoints = [
"https://api1.example.com/data",
"https://api2.example.com/data",
"https://api3.example.com/data"
]
pipeline = fetch_from_api
# Stream results from all sources as they complete
all_data = []
async for data_batch in pipeline.as_completed(endpoints):
print(f"Received {len(data_batch)} items from source")
all_data.extend(data_batch)
# Process immediately, don't wait for all sources
await process_batch_immediately(data_batch)
Conditional Streaming¶
Stream different outputs based on conditions:
@single_task
async def classify_and_route(item: dict) -> dict:
"""Classify item and add routing info"""
# Classification logic
category = await classify_item(item)
return {**item, "category": category, "route": get_route(category)}
@single_task
async def route_processor(item: dict) -> dict:
"""Process based on routing"""
route = item["route"]
if route == "fast_track":
return await fast_process(item)
elif route == "bulk_track":
return await bulk_process(item)
else:
return await default_process(item)
async def conditional_streaming():
pipeline = classify_and_route | route_processor
# Different items may take different processing paths
async for result in pipeline(mixed_data):
category = result.get("category")
if category == "urgent":
await handle_urgent(result)
else:
await handle_normal(result)
Performance Optimization¶
Execution Mode Selection¶
# For order-sensitive processing
ordered_pipeline = pipeline # Default is ordered
# For maximum performance when order doesn't matter
fast_pipeline = pipeline.with_execution_mode("as_completed")
# Performance comparison
start_time = time.time()
# Ordered processing
async for result in ordered_pipeline(data):
pass
ordered_time = time.time() - start_time
start_time = time.time()
# As-completed processing
async for result in fast_pipeline(data):
pass
as_completed_time = time.time() - start_time
print(f"Ordered: {ordered_time:.2f}s, As-completed: {as_completed_time:.2f}s")
Batch Size Tuning for Streaming¶
# Small batches: Lower latency, more streaming opportunities
@batch_task(max_size=5, min_size=1)
async def low_latency_batch(items):
return process_small_batch(items)
# Large batches: Higher throughput, less frequent streaming
@batch_task(max_size=100, min_size=10)
async def high_throughput_batch(items):
return process_large_batch(items)
# Choose based on requirements:
# - Real-time systems: small batches
# - Bulk processing: large batches
Memory-Conscious Streaming¶
async def memory_efficient_processing():
"""Process large datasets without loading all into memory"""
async def data_generator():
"""Generate data on-demand"""
for i in range(1_000_000): # Very large dataset
yield {"id": i, "data": f"item_{i}"}
if i % 10000 == 0:
# Yield control periodically
await asyncio.sleep(0)
pipeline = validate_item | transform_item | save_item
# Process streaming data without memory buildup
processed_count = 0
async for result in pipeline(data_generator()):
processed_count += 1
if processed_count % 1000 == 0:
print(f"Processed {processed_count} items")
# Memory usage stays constant due to streaming
Error Handling in Streaming¶
Graceful Degradation¶
@single_task(on_error="skip_item")
async def fault_tolerant_task(item: dict) -> dict:
"""Continue streaming even if some items fail"""
if item.get("corrupted"):
raise ValueError("Corrupted data")
return {"processed": True, **item}
async def resilient_streaming():
mixed_data = [
{"id": 1, "valid": True},
{"id": 2, "corrupted": True}, # Will be skipped
{"id": 3, "valid": True},
]
pipeline = fault_tolerant_task
# Streaming continues despite errors
async for result in pipeline(mixed_data):
print(f"Successfully processed: {result}")
# Output: Items 1 and 3, item 2 skipped
Error Tracking in Streams¶
async def streaming_with_error_tracking():
success_count = 0
error_count = 0
async for result in pipeline(data):
if result.get("error"):
error_count += 1
print(f"Error encountered: {result['error']}")
else:
success_count += 1
print(f"Success: {result}")
# Real-time statistics
total = success_count + error_count
success_rate = (success_count / total) * 100 if total > 0 else 0
print(f"Success rate: {success_rate:.1f}% ({success_count}/{total})")