Streams API

Stream Classes

class conveyor.stream.AsyncStream(source: AsyncIterable[T])[source]

Bases: AsyncIterable[T]

__init__(source: AsyncIterable[T])[source]
async collect() List[T][source]
as_completed()[source]

Return an async iterator that yields results as they complete, similar to asyncio.as_completed().

This method returns an async iterator that yields results in the order they complete, not in the order they were submitted. This can be more efficient when some items take longer to process than others.

Note: This method currently just yields items as they come from the source. The real as_completed logic should happen at the pipeline/task level via execution_mode.

Returns:

An async iterator that yields items in completion order

Return type:

AsyncIterable[T]

Usage Examples

Basic Streaming

from conveyor import Stream, single_task

@single_task
def process_stream_item(item):
    return item.upper()

stream = Stream()
stream.add_task(process_stream_item)

# Process streaming data
for result in stream.process(data_generator()):
    print(result)

Streaming with Backpressure

@batch_task(max_size=100, max_queue_size=1000)
def process_with_backpressure(items):
    # Simulate heavy processing
    time.sleep(0.1)
    return [item.lower() for item in items]

stream = Stream(buffer_size=500)
stream.add_task(process_with_backpressure)