Pipeline#
Pipeline is the main element of the Chatsky framework.
Pipeline is responsible for managing and executing the various components
(PipelineComponent)
including Actor.
- class PipelineServiceGroup(**data)[source]#
Bases:
ServiceGroupA service group that allows actor inside.
- components: List[Union[Actor, Service, ServiceGroup]]#
A
ServiceGroupobject, that will be added to the group.
- class Pipeline(script, start_label, fallback_label=None, *, default_priority=None, slots=None, models=None, messenger_interface=None, context_storage=None, pre_services=None, post_services=None, before_handler=None, after_handler=None, timeout=None, parallelize_processing=None)[source]#
Bases:
BaseModelClass that automates service execution and creates service pipeline.
- pre_services: ServiceGroup#
ServiceGroupthat will be executed before Actor.
- post_services: ServiceGroup#
ServiceGroupthat will be executed afterActor.
- start_label: AbsoluteNodeLabel#
(required) The first node of every context.
- fallback_label: AbsoluteNodeLabel#
Node which will is used if
Actorcannot find the next node.This most commonly happens when there are not suitable transitions.
Defaults to
start_label.
- default_priority: float#
Default priority value for
Transition.Defaults to
1.0.
- slots: GroupSlot#
Slots configuration.
- models: Dict[str, LLM_API]#
LLM models to be made available in custom functions.
- messenger_interface: MessengerInterface#
A MessengerInterface instance for this pipeline.
It handles connections to interfaces that provide user requests and accept bot responses.
- context_storage: DBContextStorage#
A
DBContextStorageinstance for this pipeline or a dict to store dialogContext.
- before_handler: BeforeHandler#
BeforeHandlerto add to the pipeline service.
- after_handler: AfterHandler#
AfterHandlerto add to the pipeline service.
- timeout: Optional[float]#
Timeout to add to pipeline root service group.
- parallelize_processing: bool#
This flag determines whether or not the functions defined in the
PRE_RESPONSE_PROCESSINGandPRE_TRANSITIONS_PROCESSINGsections of the script should be parallelized over respective groups.
- _context_lock: Dict[str, asyncio.Lock]#
Dictionary mapping context ids to asyncio locks. Is used to forbid concurrent execution for the same context id.
- classmethod from_file(file, custom_dir='custom', **overrides)[source]#
Create Pipeline by importing it from a file. A file (json or yaml) should contain a dictionary with keys being a subset of pipeline init parameters.
See
JSONImporter.import_pipeline_file()for more information.- Parameters:
file (
Union[str,Path]) – Path to a file containing pipeline init parameters.custom_dir (
Union[str,Path]) – Path to a directory containing custom code. Defaults to “./custom”. Iffiledoes not use custom code, this parameter will not have any effect.overrides – You can pass init parameters to override those imported from the
file.
- Return type:
- property services_pipeline: PipelineServiceGroup#
A group containing
Pipeline.pre_services,ActorandPipeline.post_services. It hasPipeline.before_handlerandPipeline.after_handlerapplied to it.
- validate_start_label()[source]#
Validate
start_labelis inscript.
- validate_fallback_label()[source]#
Validate
fallback_labelis inscript.
- async _run_pipeline(request, ctx_id, update_ctx_misc=None)[source]#
Method that should be invoked on user input. This method has the same signature as
PipelineRunnerFunction.This method does:
Create new context with a random ID if
ctx_idisNone;Acquire
asyncio.Lockfrom_context_lockto prevent concurrent execution on thisctx_id;If
ctx_idis notNoneeither retrieve it from thecontext_storageor create a new one with that id;Update
Context.miscwithupdate_ctx_misc;Set up
Context.framework_datafields;Add
requestto the context;Execute
services_pipeline. This includesActor(readActor.run_component()for more information);Save context in the
context_storage.
- Return type:
- Returns:
Modified context
ctx_id.
- run()[source]#
Method that starts a pipeline and connects to
messenger_interface. It also connects to thecontext_storage(if it’s not already connected).It passes
_run_pipeline()tomessenger_interfaceas a callback, so every time user request is received,_run_pipeline()will be called.This method can be both blocking and non-blocking. It depends on current
messenger_interfacenature. Message interfaces that run in a loop block current thread.
- __call__(request, ctx_id, update_ctx_misc=None)[source]#
Method that executes pipeline once. Basically, it is a shortcut for
_run_pipeline(). NB! When pipeline is executed this way,messenger_interfacewon’t be initiated nor connected. Still, it connects to thecontext_storage(if it’s not already connected) to avoid sync issues.This method has the same signature as
PipelineRunnerFunction.- Return type: