fromreimportUfromtypingimportAny,AsyncIterable,Iterable,List,TypeVar,Union,Optionalfrom.streamimportAsyncStreamfrom.tasksimportBaseTask,UNDEFINED_VALUEfrom.contextimportPipelineContext,ContextManager,ExecutionMode,set_current_contextimportuuidT=TypeVar('T')# Used to represent undefined argument
[docs]defwith_context(self,**kwargs)->"Pipeline":"""Create a new pipeline with modified context settings."""new_context=self.context.copy()forkey,valueinkwargs.items():ifhasattr(new_context,key):setattr(new_context,key,value)else:new_context.data[key]=value# Create new pipeline and copy stages from current pipelinenew_pipeline=Pipeline(new_context)new_pipeline.stages=self.stages.copy()returnnew_pipeline
[docs]defwith_execution_mode(self,mode:ExecutionMode)->"Pipeline":"""Set the execution mode for this pipeline."""returnself.with_context(execution_mode=mode)
def__or__(self,other:Union[BaseTask,'Pipeline'])->'Pipeline':ifisinstance(other,BaseTask):returnPipeline(self.context.copy()).add(*self.stages).add(other)ifisinstance(other,Pipeline):# Check against Pipeline class itselfnew_context=self.context.copy()returnPipeline(new_context).add(*self.stages,*other.stages)raiseTypeError(f"Cannot pipe Pipeline to {type(other)}")def__call__(self,data:Iterable[T]=UNDEFINED_VALUE,*args,**kwargs)->AsyncStream[T]:ifnot(isinstance(data,Iterable)andnotisinstance(data,(str,tuple))):# if first argument is not an iterable we consider it as intial arg for first stageifargs:args=(data,*args)elifdata!=UNDEFINED_VALUE:args=(data,)data=UNDEFINED_VALUEstream=AsyncStream(self._run_pipeline(data,*args,**kwargs))# The execution mode is handled through the context system in _run_pipeline# No need to call stream.as_completed() herereturnstream
[docs]asyncdefas_completed(self,data:Iterable[T])->AsyncIterable[T]:""" Execute the pipeline and yield results as they complete, similar to asyncio.as_completed(). Args: data: Input data to process through the pipeline Yields: Results as they complete, without preserving input order """# Create a pipeline configured for as_completed executionas_completed_pipeline=self.with_execution_mode("as_completed")asyncforiteminas_completed_pipeline(data):yielditem
def_run_pipeline(self,data:Iterable[T]=UNDEFINED_VALUE,*args,**kwargs)->AsyncIterable[T]:asyncdefgen():# Set up context for this pipeline executionexecution_context=self.context.copy()execution_context.pipeline_id=str(uuid.uuid4())# Use the context manager for the entire pipeline executionwithContextManager(execution_context):current_stream:AsyncIterable[Any]=self._make_input_async(data)forstage_index,stageinenumerate(self.stages):# Update the context variable so tasks can see the current stageset_current_context(execution_context)ifstage_index==0and(argsorkwargs):# If there are kwargs, pass them to the first stageifargs:stage._side_args=args# args provided in pipeline call override any side_args defined in the stageifkwargs:stage._side_kwargs.update(**kwargs)# The process method is async and returns an AsyncIterable# Pass the context explicitly to ensure it's available in all task executionscurrent_stream=awaitstage.process(current_stream)asyncforitemincurrent_stream:yielditemreturngen()def_make_input_async(self,data:Iterable[T]=UNDEFINED_VALUE)->AsyncIterable[T]:asyncdef_gen():ifisinstance(data,AsyncIterable):# If the input data is already an AsyncIterable, return it directlyasyncforitemindata:yielditemelifisinstance(data,list):foritemindata:yielditemelse:yielddatareturn_gen()