Pipeline#
Pipeline is the main element of the Chatsky framework.
Pipeline is responsible for managing and executing the various components
(PipelineComponent)
including Actor.
- class PipelineServiceGroup(**data)[source]#
Bases:
ServiceGroupA service group that allows actor inside.
- components: List[Union[Actor, Service, ServiceGroup]]#
A
ServiceGroupobject, that will be added to the group.
- class Pipeline(script, start_label, fallback_label=None, *, default_priority=None, slots=None, models=None, messenger_interface=None, context_storage=None, pre_services=None, post_services=None, before_handler=None, after_handler=None, timeout=None, parallelize_processing=None)[source]#
Bases:
BaseModelClass that automates service execution and creates service pipeline.
- pre_services: ServiceGroup#
ServiceGroupthat will be executed before Actor.
- post_services: ServiceGroup#
ServiceGroupthat will be executed afterActor.
- start_label: AbsoluteNodeLabel#
(required) The first node of every context.
- fallback_label: AbsoluteNodeLabel#
Node which will is used if
Actorcannot find the next node.This most commonly happens when there are not suitable transitions.
Defaults to
start_label.
- default_priority: float#
Default priority value for
Transition.Defaults to
1.0.
- slots: GroupSlot#
Slots configuration.
- models: Dict[str, LLM_API]#
LLM models to be made available in custom functions.
- messenger_interface: MessengerInterface#
A MessengerInterface instance for this pipeline.
It handles connections to interfaces that provide user requests and accept bot responses.
- context_storage: DBContextStorage#
A
DBContextStorageinstance for this pipeline or a dict to store dialogContext.
- before_handler: BeforeHandler#
BeforeHandlerto add to the pipeline service.
- after_handler: AfterHandler#
AfterHandlerto add to the pipeline service.
- timeout: Optional[float]#
Timeout to add to pipeline root service group.
- parallelize_processing: bool#
This flag determines whether or not the functions defined in the
PRE_RESPONSE_PROCESSINGandPRE_TRANSITIONS_PROCESSINGsections of the script should be parallelized over respective groups.
- classmethod from_file(file, custom_dir='custom', **overrides)[source]#
Create Pipeline by importing it from a file. A file (json or yaml) should contain a dictionary with keys being a subset of pipeline init parameters.
See
JSONImporter.import_pipeline_file()for more information.- Parameters:
file (
Union[str,Path]) – Path to a file containing pipeline init parameters.custom_dir (
Union[str,Path]) – Path to a directory containing custom code. Defaults to “./custom”. Iffiledoes not use custom code, this parameter will not have any effect.overrides – You can pass init parameters to override those imported from the
file.
- Return type:
- property services_pipeline: PipelineServiceGroup#
A group containing
Pipeline.pre_services,ActorandPipeline.post_services. It hasPipeline.before_handlerandPipeline.after_handlerapplied to it.
- validate_start_label()[source]#
Validate
start_labelis inscript.
- validate_fallback_label()[source]#
Validate
fallback_labelis inscript.
- async _run_pipeline(request, ctx_id=None, update_ctx_misc=None)[source]#
Method that should be invoked on user input. This method has the same signature as
PipelineRunnerFunction.This method does:
Retrieve from
context_storageor initialize contextctx_id.Update
Context.miscwithupdate_ctx_misc.Set up
Context.framework_datafields.Add
requestto the context.Execute
services_pipeline. This includesActor(readActor.run_component()for more information).Save context in the
context_storage.
- Return type:
- Returns:
Modified context
ctx_id.
- run()[source]#
Method that starts a pipeline and connects to
messenger_interface. It also connects to thecontext_storage(if it’s not already connected).It passes
_run_pipeline()tomessenger_interfaceas a callback, so every time user request is received,_run_pipeline()will be called.This method can be both blocking and non-blocking. It depends on current
messenger_interfacenature. Message interfaces that run in a loop block current thread.
- __call__(request, ctx_id=None, update_ctx_misc=None)[source]#
Method that executes pipeline once. Basically, it is a shortcut for
_run_pipeline(). NB! When pipeline is executed this way,messenger_interfacewon’t be initiated nor connected. Still, it connects to thecontext_storage(if it’s not already connected) to avoid sync issues.This method has the same signature as
PipelineRunnerFunction.- Return type: