Best Practices¶
This guide covers best practices for building efficient, maintainable, and robust streaming pipelines with Conveyor.
Pipeline Design¶
Keep Tasks Small and Focused¶
Each task should have a single responsibility:
# Good: Focused tasks
@single_task
async def validate_email(email: str) -> str:
if "@" not in email:
raise ValueError("Invalid email")
return email
@single_task
async def send_email(email: str) -> dict:
# Send email logic
return {"email": email, "sent": True}
# Better than one large task doing both
Use Appropriate Task Types¶
Choose the right task type for your use case:
# Use single_task for I/O operations
@single_task
async def fetch_user_data(user_id: int) -> dict:
return await api_client.get_user(user_id)
# Use batch_task for operations that benefit from batching
@batch_task(max_size=50)
async def bulk_insert(records: list[dict]) -> list[int]:
return await db.bulk_insert(records)
Error Handling¶
Use Defensive Programming¶
Always validate inputs and handle edge cases:
@single_task
async def safe_divide(a: float, b: float) -> float:
if b == 0:
raise ValueError("Division by zero")
if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
raise TypeError("Arguments must be numbers")
return a / b
Choose Appropriate Error Strategies¶
Select error handling strategies based on your requirements:
# Skip invalid items
@single_task(on_error="skip_item")
async def parse_json(text: str) -> dict:
return json.loads(text)
# Retry with exponential backoff
@single_task(on_error="retry", max_retries=3, retry_delay=1.0, retry_backoff=2.0)
async def api_call(url: str) -> dict:
return await http_client.get(url)
Performance Optimization¶
Optimize Batch Sizes¶
Tune batch sizes based on your specific workload:
# For database operations, larger batches are often better
@batch_task(max_size=1000)
async def bulk_database_insert(records: list[dict]) -> list[int]:
return await db.bulk_insert(records)
# For API calls with rate limits, smaller batches may be better
@batch_task(max_size=10)
async def api_batch_call(items: list[str]) -> list[dict]:
return await api_client.batch_process(items)
Use Concurrency Limits¶
Control resource usage with concurrency limits:
@single_task(concurrency_limit=5) # Max 5 concurrent requests
async def external_api_call(item: str) -> dict:
return await external_api.process(item)
Memory Management¶
Process Large Datasets¶
Use generators for large datasets to maintain constant memory usage:
async def process_large_file():
async def read_lines():
async with aiofiles.open("large_file.txt") as f:
async for line in f:
yield line.strip()
# Memory usage stays constant regardless of file size
async for result in pipeline(read_lines()):
handle_result(result)
Avoid Collecting Everything¶
Stream results instead of collecting all at once:
# Good: Stream results
async for result in pipeline(data):
await save_result(result)
# Avoid: Collecting all results in memory
# results = await pipeline(data).collect() # Uses lots of memory
Testing¶
Test Individual Tasks¶
Write unit tests for each task:
import pytest
from conveyor import single_task
@single_task
async def add_one(x: int) -> int:
return x + 1
@pytest.mark.asyncio
async def test_add_one():
result = await add_one(5)
assert result == 6
Test Pipeline Integration¶
Test complete pipelines with small datasets:
@pytest.mark.asyncio
async def test_pipeline():
pipeline = task1 | task2 | task3
results = await pipeline([1, 2, 3]).collect()
assert len(results) == 3
Monitoring and Observability¶
Add Logging¶
Include proper logging in your tasks:
import logging
logger = logging.getLogger(__name__)
@single_task
async def process_item(item: dict) -> dict:
logger.info(f"Processing item {item.get('id')}")
try:
result = await do_processing(item)
logger.info(f"Successfully processed item {item.get('id')}")
return result
except Exception as e:
logger.error(f"Failed to process item {item.get('id')}: {e}")
raise
Track Metrics¶
Monitor pipeline performance:
import time
from conveyor import single_task
@single_task
async def timed_task(item: str) -> str:
start_time = time.time()
result = await process_item(item)
duration = time.time() - start_time
metrics.record_duration("task_duration", duration)
return result
Configuration Management¶
Use Environment Variables¶
Keep configuration external:
import os
from conveyor import Context
def create_context():
return Context({
"api_key": os.getenv("API_KEY"),
"database_url": os.getenv("DATABASE_URL"),
"max_retries": int(os.getenv("MAX_RETRIES", "3")),
})
Validate Configuration¶
Validate configuration at startup:
def validate_config(context: Context):
required_keys = ["api_key", "database_url"]
for key in required_keys:
if not context.get(key):
raise ValueError(f"Missing required configuration: {key}")
Documentation¶
Document Task Behavior¶
Use clear docstrings:
@single_task
async def normalize_email(email: str) -> str:
"""
Normalize an email address to lowercase.
Args:
email: The email address to normalize
Returns:
The normalized email address
Raises:
ValueError: If the email format is invalid
"""
if "@" not in email:
raise ValueError("Invalid email format")
return email.lower().strip()
Document Pipeline Purpose¶
Document what your pipelines do:
# User registration pipeline
# Validates email -> Creates user -> Sends welcome email -> Updates analytics
user_registration_pipeline = (
validate_email |
create_user |
send_welcome_email |
update_analytics
)