Pipelines API

Pipeline Classes

class conveyor.pipeline.Pipeline(context: PipelineContext | None = None)[source]

Bases: object

__init__(context: PipelineContext | None = None)[source]
add(*tasks: BaseTask) Pipeline[source]
with_context(**kwargs) Pipeline[source]

Create a new pipeline with modified context settings.

with_execution_mode(mode: Literal['ordered', 'as_completed']) Pipeline[source]

Set the execution mode for this pipeline.

async as_completed(data: Iterable[T]) AsyncIterable[T][source]

Execute the pipeline and yield results as they complete, similar to asyncio.as_completed().

Parameters:

data – Input data to process through the pipeline

Yields:

Results as they complete, without preserving input order

Usage Examples

Basic Pipeline

from conveyor import Pipeline, single_task

@single_task
def transform(item):
    return item * 2

pipeline = Pipeline()
pipeline.add_task(transform)
results = pipeline.run([1, 2, 3, 4, 5])

Complex Pipeline

@single_task
def validate(item):
    if item < 0:
        raise ValueError("Negative values not allowed")
    return item

@batch_task(max_size=10)
def process_batch(items):
    return [item ** 2 for item in items]

pipeline = Pipeline()
pipeline.add_task(validate)
pipeline.add_task(process_batch)
results = pipeline.run(range(100))