Tasks API¶
Task Classes¶
- class conveyor.tasks.BaseTask(on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]¶
Bases:
object- __init__(on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]¶
- async process(items: AsyncIterable[T]) AsyncIterable[T][source]¶
- get_context() PipelineContext | None[source]¶
Get the current pipeline context.
- class conveyor.tasks.SingleTask(func: Callable[[...], Iterable[Any] | Any | None], _side_args: List[Any] | None = None, _side_kwargs: dict[str, Any] | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None, concurrency_limit: int | None = None)[source]¶
Bases:
BaseTask- __init__(func: Callable[[...], Iterable[Any] | Any | None], _side_args: List[Any] | None = None, _side_kwargs: dict[str, Any] | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None, concurrency_limit: int | None = None)[source]¶
- with_inputs(*args: Any, **kwargs: Any) SingleTask[source]¶
Returns a new SingleTask instance configured with side inputs.
- async process(items: AsyncIterable[T]) AsyncIterable[Any][source]¶
- class conveyor.tasks.BatchTask(func: Callable[[...], Iterable[Any] | Any | None], min_size: int = 1, max_size: int | None = None, _side_args: List[Any] | None = None, _side_kwargs: dict[str, Any] | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]¶
Bases:
BaseTask- __init__(func: Callable[[...], Iterable[Any] | Any | None], min_size: int = 1, max_size: int | None = None, _side_args: List[Any] | None = None, _side_kwargs: dict[str, Any] | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]¶
- with_inputs(*args: Any, **kwargs: Any) BatchTask[source]¶
Returns a new BatchTask instance configured with side inputs.
- async process(items: AsyncIterable[T]) AsyncIterable[Any][source]¶
Task Decorators¶
- conveyor.decorators.single_task(func: Callable | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_attempts: int = 1, retry_delay: float = 1.0, retry_exponential_backoff: bool = True, retry_max_delay: float = 60.0, error_handler: ErrorHandler | None = None, task_name: str | None = None, concurrency_limit: int | None = None)[source]¶
Decorator for creating single-item processing tasks.
- Parameters:
func – The function to wrap
on_error – What to do when an error occurs (“fail”, “skip_item”)
retry_attempts – Maximum number of retry attempts (default: 1, no retry)
retry_delay – Base delay between retries in seconds
retry_exponential_backoff – Whether to use exponential backoff
retry_max_delay – Maximum delay between retries
error_handler – Custom error handler
task_name – Name for logging/debugging
concurrency_limit – Maximum number of concurrent operations (None for unlimited)
- conveyor.decorators.batch_task(min_size: int = 1, max_size: int | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_attempts: int = 1, retry_delay: float = 1.0, retry_exponential_backoff: bool = True, retry_max_delay: float = 60.0, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]¶
Decorator for creating batch processing tasks.
- Parameters:
min_size – Minimum batch size
max_size – Maximum batch size
on_error – What to do when an error occurs (“fail”, “skip_item”, “skip_batch”)
retry_attempts – Maximum number of retry attempts (default: 1, no retry)
retry_delay – Base delay between retries in seconds
retry_exponential_backoff – Whether to use exponential backoff
retry_max_delay – Maximum delay between retries
error_handler – Custom error handler
task_name – Name for logging/debugging
Usage Examples¶
Basic Single Task Definition¶
from conveyor import single_task
@single_task
async def process_data(item):
"""Process a single data item."""
return item.upper()
# Or with error handling options
@single_task(
on_error="skip_item",
retry_attempts=3,
retry_delay=1.0
)
async def robust_process(item):
"""Process with retry logic."""
return await some_api_call(item)
Batch Processing¶
from conveyor import batch_task
@batch_task(max_size=100)
async def process_batch(items):
"""Process items in batches."""
return [item.upper() for item in items]
# With minimum batch size
@batch_task(min_size=5, max_size=20)
async def efficient_batch(items):
"""Process with minimum batch requirements."""
return await bulk_operation(items)