Pipelines API¶
Pipeline Classes¶
- class conveyor.pipeline.Pipeline(context: PipelineContext | None = None)[source]¶
Bases:
object- __init__(context: PipelineContext | None = None)[source]¶
- with_execution_mode(mode: Literal['ordered', 'as_completed']) Pipeline[source]¶
Set the execution mode for this pipeline.
- async as_completed(data: Iterable[T]) AsyncIterable[T][source]¶
Execute the pipeline and yield results as they complete, similar to asyncio.as_completed().
- Parameters:
data – Input data to process through the pipeline
- Yields:
Results as they complete, without preserving input order
Usage Examples¶
Basic Pipeline¶
from conveyor import Pipeline, single_task
@single_task
def transform(item):
return item * 2
pipeline = Pipeline()
pipeline.add_task(transform)
results = pipeline.run([1, 2, 3, 4, 5])
Complex Pipeline¶
@single_task
def validate(item):
if item < 0:
raise ValueError("Negative values not allowed")
return item
@batch_task(max_size=10)
def process_batch(items):
return [item ** 2 for item in items]
pipeline = Pipeline()
pipeline.add_task(validate)
pipeline.add_task(process_batch)
results = pipeline.run(range(100))