[docs]@dataclassclassPipelineContext:"""Configuration and state for pipeline execution."""# Execution behaviorexecution_mode:ExecutionMode="ordered"# "ordered" or "as_completed"max_parallelism:Optional[int]=None# None means unlimited# Pipeline statepipeline_id:Optional[str]=None# Custom data storage for sharing between tasksdata:Dict[str,Any]=field(default_factory=dict)
[docs]defcopy(self)->'PipelineContext':"""Create a copy of the context."""returnPipelineContext(execution_mode=self.execution_mode,max_parallelism=self.max_parallelism,pipeline_id=self.pipeline_id,data=self.data.copy(),)
# Context variable for thread-local pipeline context_pipeline_context:ContextVar[Optional[PipelineContext]]=ContextVar('pipeline_context',default=None)
[docs]defget_current_context()->Optional[PipelineContext]:"""Get the current pipeline context."""return_pipeline_context.get()
[docs]defset_current_context(context:PipelineContext)->None:"""Set the current pipeline context."""_pipeline_context.set(context)
[docs]classContextManager:"""Context manager for pipeline execution."""
[docs]asyncdefwith_context(context:PipelineContext,coro):"""Run a coroutine with a specific pipeline context."""withContextManager(context):returnawaitcoro