Context API¶
Context Classes¶
- class conveyor.context.PipelineContext(execution_mode: ~typing.Literal['ordered', 'as_completed'] = 'ordered', max_parallelism: int | None = None, pipeline_id: str | None = None, data: ~typing.Dict[str, ~typing.Any] = <factory>)[source]¶
Bases:
objectConfiguration and state for pipeline execution.
- copy() PipelineContext[source]¶
Create a copy of the context.
- conveyor.context.get_current_context() PipelineContext | None[source]¶
Get the current pipeline context.
- conveyor.context.set_current_context(context: PipelineContext) None[source]¶
Set the current pipeline context.
- class conveyor.context.ContextManager(context: PipelineContext)[source]¶
Bases:
objectContext manager for pipeline execution.
- __init__(context: PipelineContext)[source]¶
- async conveyor.context.with_context(context: PipelineContext, coro)[source]¶
Run a coroutine with a specific pipeline context.
Usage Examples¶
Data Sharing Between Tasks¶
The primary purpose of PipelineContext is to share data between tasks during pipeline execution.
from conveyor import single_task, Pipeline, PipelineContext, get_current_context
@single_task
async def collect_metadata(item):
"""First task that collects metadata in shared context."""
context = get_current_context()
if context:
# Initialize or update shared statistics
stats = context.data.get('stats', {'total_items': 0, 'processed_chars': 0})
stats['total_items'] += 1
stats['processed_chars'] += len(str(item))
context.data['stats'] = stats
return item
@single_task
async def process_with_stats(item):
"""Second task that uses shared context data."""
context = get_current_context()
processed_item = {
'original': item,
'processed': item.upper() if isinstance(item, str) else str(item).upper()
}
if context:
stats = context.data.get('stats', {})
processed_item['pipeline_stats'] = {
'total_processed': stats.get('total_items', 0),
'total_chars': stats.get('processed_chars', 0),
'pipeline_id': context.pipeline_id
}
return processed_item
# Create pipeline
pipeline = collect_metadata | process_with_stats
# Run with initial context data
custom_context = PipelineContext(
data={'batch_id': 'batch_001', 'user_id': 'user_123'}
)
pipeline_with_context = pipeline.with_context(**custom_context.__dict__)
# Execute
data = ["hello", "world", "conveyor", "pipeline"]
async for result in pipeline_with_context(data):
print(f"Processed: {result}")
Configuration and Dependencies¶
Share configuration values and external dependencies:
@single_task
async def process_with_config(item):
context = get_current_context()
if context:
api_key = context.data.get('api_key', 'default')
timeout = context.data.get('timeout', 30)
# Use configuration in processing
return f"Processed {item} with key={api_key[:4]}..."
return f"Processed {item} (no config)"
# Set up pipeline with configuration
config_context = PipelineContext(
data={
'api_key': 'your-secret-key',
'timeout': 60,
'environment': 'production'
}
)
pipeline = process_with_config.with_context(**config_context.__dict__)
Key Concepts¶
PipelineContext.data: Dictionary for sharing custom data between tasks in the same pipeline execution
get_current_context(): Function to access the current pipeline context from within tasks
pipeline_id: Unique identifier automatically assigned to each pipeline execution
Data persistence: Context data persists and can be modified throughout the pipeline execution
Task isolation: Each pipeline execution gets its own context instance