Error Handling API¶
Error Handling Classes¶
- class conveyor.error_handling.RetryConfig(max_attempts: int = 3, base_delay: float = 1.0, exponential_backoff: bool = True, max_delay: float = 60.0, backoff_multiplier: float = 2.0)[source]¶
Bases:
objectConfiguration for retry behavior.
- class conveyor.error_handling.ErrorHandler[source]¶
Bases:
objectBase class for custom error handlers.
- async handle_error(error: Exception, item: Any, task_name: str, attempt: int) tuple[bool, Any][source]¶
Handle an error and return (should_continue, value).
- Returns:
- (should_continue, value)
should_continue: True to continue pipeline, False to raise
value: Value to use (only relevant if should_continue is True)
- Return type:
- class conveyor.error_handling.LoggingErrorHandler(logger: Logger | None = None, replacement_value: Any | None = None)[source]¶
Bases:
ErrorHandlerError handler that logs errors and continues.
- async handle_error(error: Exception, item: Any, task_name: str, attempt: int) tuple[bool, Any][source]¶
Handle an error and return (should_continue, value).
- Returns:
- (should_continue, value)
should_continue: True to continue pipeline, False to raise
value: Value to use (only relevant if should_continue is True)
- Return type:
Usage Examples¶
Basic Error Handling¶
from conveyor import single_task, Pipeline
from conveyor.error_handling import RetryConfig
@single_task(retry_config=RetryConfig(max_retries=3, backoff_factor=2.0))
def unreliable_task(item):
if random.random() < 0.3: # 30% failure rate
raise ValueError("Random failure")
return item.upper()
pipeline = Pipeline()
pipeline.add_task(unreliable_task)
Custom Error Handling¶
from conveyor.error_handling import ErrorHandler
class CustomErrorHandler(ErrorHandler):
def handle_error(self, error, item, context):
if isinstance(error, ValueError):
# Log and skip
logger.warning(f"Skipping item due to error: {error}")
return None
else:
# Re-raise for other errors
raise error
@single_task(error_handler=CustomErrorHandler())
def process_with_custom_handling(item):
return validate_and_process(item)