Tasks API

Task Classes

class conveyor.tasks.BaseTask(on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]

Bases: object

__init__(on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]
async process(items: AsyncIterable[T]) AsyncIterable[T][source]
get_context() PipelineContext | None[source]

Get the current pipeline context.

async as_completed(data)[source]

Execute the task and yield results as they complete, similar to asyncio.as_completed().

class conveyor.tasks.SingleTask(func: Callable[[...], Iterable[Any] | Any | None], _side_args: List[Any] | None = None, _side_kwargs: dict[str, Any] | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None, concurrency_limit: int | None = None)[source]

Bases: BaseTask

__init__(func: Callable[[...], Iterable[Any] | Any | None], _side_args: List[Any] | None = None, _side_kwargs: dict[str, Any] | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None, concurrency_limit: int | None = None)[source]
with_inputs(*args: Any, **kwargs: Any) SingleTask[source]

Returns a new SingleTask instance configured with side inputs.

async process(items: AsyncIterable[T]) AsyncIterable[Any][source]
class conveyor.tasks.BatchTask(func: Callable[[...], Iterable[Any] | Any | None], min_size: int = 1, max_size: int | None = None, _side_args: List[Any] | None = None, _side_kwargs: dict[str, Any] | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]

Bases: BaseTask

__init__(func: Callable[[...], Iterable[Any] | Any | None], min_size: int = 1, max_size: int | None = None, _side_args: List[Any] | None = None, _side_kwargs: dict[str, Any] | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_config: RetryConfig | None = None, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]
with_inputs(*args: Any, **kwargs: Any) BatchTask[source]

Returns a new BatchTask instance configured with side inputs.

async process(items: AsyncIterable[T]) AsyncIterable[Any][source]

Task Decorators

conveyor.decorators.single_task(func: Callable | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_attempts: int = 1, retry_delay: float = 1.0, retry_exponential_backoff: bool = True, retry_max_delay: float = 60.0, error_handler: ErrorHandler | None = None, task_name: str | None = None, concurrency_limit: int | None = None)[source]

Decorator for creating single-item processing tasks.

Parameters:
  • func – The function to wrap

  • on_error – What to do when an error occurs (“fail”, “skip_item”)

  • retry_attempts – Maximum number of retry attempts (default: 1, no retry)

  • retry_delay – Base delay between retries in seconds

  • retry_exponential_backoff – Whether to use exponential backoff

  • retry_max_delay – Maximum delay between retries

  • error_handler – Custom error handler

  • task_name – Name for logging/debugging

  • concurrency_limit – Maximum number of concurrent operations (None for unlimited)

conveyor.decorators.batch_task(min_size: int = 1, max_size: int | None = None, on_error: Literal['fail', 'skip_item', 'skip_batch'] = 'fail', retry_attempts: int = 1, retry_delay: float = 1.0, retry_exponential_backoff: bool = True, retry_max_delay: float = 60.0, error_handler: ErrorHandler | None = None, task_name: str | None = None)[source]

Decorator for creating batch processing tasks.

Parameters:
  • min_size – Minimum batch size

  • max_size – Maximum batch size

  • on_error – What to do when an error occurs (“fail”, “skip_item”, “skip_batch”)

  • retry_attempts – Maximum number of retry attempts (default: 1, no retry)

  • retry_delay – Base delay between retries in seconds

  • retry_exponential_backoff – Whether to use exponential backoff

  • retry_max_delay – Maximum delay between retries

  • error_handler – Custom error handler

  • task_name – Name for logging/debugging

Usage Examples

Basic Single Task Definition

from conveyor import single_task

@single_task
async def process_data(item):
    """Process a single data item."""
    return item.upper()

# Or with error handling options
@single_task(
    on_error="skip_item",
    retry_attempts=3,
    retry_delay=1.0
)
async def robust_process(item):
    """Process with retry logic."""
    return await some_api_call(item)

Batch Processing

from conveyor import batch_task

@batch_task(max_size=100)
async def process_batch(items):
    """Process items in batches."""
    return [item.upper() for item in items]

# With minimum batch size
@batch_task(min_size=5, max_size=20)
async def efficient_batch(items):
    """Process with minimum batch requirements."""
    return await bulk_operation(items)