Error Handling API

Error Handling Classes

class conveyor.error_handling.RetryConfig(max_attempts: int = 3, base_delay: float = 1.0, exponential_backoff: bool = True, max_delay: float = 60.0, backoff_multiplier: float = 2.0)[source]

Bases: object

Configuration for retry behavior.

max_attempts: int = 3
base_delay: float = 1.0
exponential_backoff: bool = True
max_delay: float = 60.0
backoff_multiplier: float = 2.0
__init__(max_attempts: int = 3, base_delay: float = 1.0, exponential_backoff: bool = True, max_delay: float = 60.0, backoff_multiplier: float = 2.0) None
class conveyor.error_handling.ErrorHandler[source]

Bases: object

Base class for custom error handlers.

async handle_error(error: Exception, item: Any, task_name: str, attempt: int) tuple[bool, Any][source]

Handle an error and return (should_continue, value).

Returns:

(should_continue, value)
  • should_continue: True to continue pipeline, False to raise

  • value: Value to use (only relevant if should_continue is True)

Return type:

tuple

class conveyor.error_handling.LoggingErrorHandler(logger: Logger | None = None, replacement_value: Any | None = None)[source]

Bases: ErrorHandler

Error handler that logs errors and continues.

__init__(logger: Logger | None = None, replacement_value: Any | None = None)[source]
async handle_error(error: Exception, item: Any, task_name: str, attempt: int) tuple[bool, Any][source]

Handle an error and return (should_continue, value).

Returns:

(should_continue, value)
  • should_continue: True to continue pipeline, False to raise

  • value: Value to use (only relevant if should_continue is True)

Return type:

tuple

Usage Examples

Basic Error Handling

from conveyor import single_task, Pipeline
from conveyor.error_handling import RetryConfig

@single_task(retry_config=RetryConfig(max_retries=3, backoff_factor=2.0))
def unreliable_task(item):
    if random.random() < 0.3:  # 30% failure rate
        raise ValueError("Random failure")
    return item.upper()

pipeline = Pipeline()
pipeline.add_task(unreliable_task)

Custom Error Handling

from conveyor.error_handling import ErrorHandler

class CustomErrorHandler(ErrorHandler):
    def handle_error(self, error, item, context):
        if isinstance(error, ValueError):
            # Log and skip
            logger.warning(f"Skipping item due to error: {error}")
            return None
        else:
            # Re-raise for other errors
            raise error

@single_task(error_handler=CustomErrorHandler())
def process_with_custom_handling(item):
    return validate_and_process(item)