Basic Usage¶
This section demonstrates the fundamental concepts of Conveyor Streaming with simple, practical examples.
Single Task Example¶
The most basic building block is a single task that processes one item at a time:
import asyncio
from conveyor import single_task
@single_task
async def multiply_by_two(x: int) -> int:
"""Multiply a number by 2."""
await asyncio.sleep(0.01) # Simulate async work
return x * 2
async def main():
# Process a list of numbers
data = [1, 2, 3, 4, 5]
# Stream results as they become available
async for result in multiply_by_two(data):
print(f"Result: {result}")
# Or collect all results at once
results = await multiply_by_two(data).collect()
print(f"All results: {results}")
if __name__ == "__main__":
asyncio.run(main())
Batch Task Example¶
Batch tasks process multiple items together, which can be more efficient for certain operations:
from conveyor import batch_task
@batch_task(max_size=3)
async def sum_batch(numbers: list[int]) -> int:
"""Sum a batch of numbers."""
await asyncio.sleep(0.05) # Simulate batch processing
return sum(numbers)
async def main():
data = [1, 2, 3, 4, 5, 6, 7]
# Process in batches
async for batch_sum in sum_batch(data):
print(f"Batch sum: {batch_sum}")
# Output: 6 (1+2+3), 15 (4+5+6), 7 (remaining item)
if __name__ == "__main__":
asyncio.run(main())
Pipeline Composition¶
Chain tasks together using the | operator:
@single_task
async def add_one(x: int) -> int:
return x + 1
@single_task
async def multiply_by_three(x: int) -> int:
return x * 3
async def main():
# Create a pipeline
pipeline = add_one | multiply_by_three
# Process data through the pipeline
data = [1, 2, 3]
results = await pipeline(data).collect()
print(results) # [6, 9, 12] (each number + 1, then * 3)
if __name__ == "__main__":
asyncio.run(main())
Mixed Pipeline Example¶
Combine single tasks and batch tasks in one pipeline:
@single_task
async def validate_positive(x: int) -> int:
"""Ensure number is positive."""
if x <= 0:
raise ValueError(f"Number must be positive, got {x}")
return x
@batch_task(max_size=2)
async def batch_square(numbers: list[int]) -> list[int]:
"""Square each number in the batch."""
return [x ** 2 for x in numbers]
@single_task
async def format_result(x: int) -> str:
"""Format the result as a string."""
return f"Result: {x}"
async def main():
# Create mixed pipeline
pipeline = validate_positive | batch_square | format_result
data = [1, 2, 3, 4, 5]
async for result in pipeline(data):
print(result)
if __name__ == "__main__":
asyncio.run(main())
Error Handling¶
Handle errors gracefully in your pipelines:
@single_task(on_error="skip_item")
async def safe_divide(x: float) -> float:
"""Divide by a random number, might fail."""
import random
divisor = random.choice([0, 1, 2]) # Sometimes 0!
if divisor == 0:
raise ZeroDivisionError("Cannot divide by zero")
return x / divisor
async def main():
data = [10.0, 20.0, 30.0, 40.0, 50.0]
# Some items will be skipped due to division by zero
results = await safe_divide(data).collect()
print(f"Successful results: {results}")
if __name__ == "__main__":
asyncio.run(main())
Using Context¶
Pass shared data through your pipeline:
from conveyor import Context
@single_task
async def add_prefix(text: str, context: Context) -> str:
"""Add a prefix from context to the text."""
prefix = context.get("prefix", "")
return f"{prefix}{text}"
async def main():
# Create context with shared data
context = Context({"prefix": "Hello, "})
data = ["Alice", "Bob", "Charlie"]
async for result in add_prefix(data, context=context):
print(result) # "Hello, Alice", "Hello, Bob", etc.
if __name__ == "__main__":
asyncio.run(main())
Generator Input¶
Process data from generators for memory efficiency:
async def number_generator():
"""Generate numbers asynchronously."""
for i in range(10):
await asyncio.sleep(0.1) # Simulate data arriving over time
yield i
@single_task
async def process_number(x: int) -> str:
return f"Processed: {x}"
async def main():
# Process generator input
gen = number_generator()
async for result in process_number(gen):
print(result) # Results appear as data is generated
if __name__ == "__main__":
asyncio.run(main())