Pipeline#
Pipeline is the main element of the Chatsky framework.
Pipeline is responsible for managing and executing the various components
(PipelineComponent
)
including Actor
.
- class PipelineServiceGroup(**data)[source]#
Bases:
ServiceGroup
A service group that allows actor inside.
- components: List[Union[Actor, Service, ServiceGroup]]#
A
ServiceGroup
object, that will be added to the group.
- class Pipeline(script, start_label, fallback_label=None, *, default_priority=None, slots=None, models=None, messenger_interface=None, context_storage=None, pre_services=None, post_services=None, before_handler=None, after_handler=None, timeout=None, parallelize_processing=None)[source]#
Bases:
BaseModel
Class that automates service execution and creates service pipeline.
- pre_services: ServiceGroup#
ServiceGroup
that will be executed before Actor.
- post_services: ServiceGroup#
ServiceGroup
that will be executed afterActor
.
- start_label: AbsoluteNodeLabel#
(required) The first node of every context.
- fallback_label: AbsoluteNodeLabel#
Node which will is used if
Actor
cannot find the next node.This most commonly happens when there are not suitable transitions.
Defaults to
start_label
.
- default_priority: float#
Default priority value for
Transition
.Defaults to
1.0
.
- slots: GroupSlot#
Slots configuration.
- models: Dict[str, LLM_API]#
LLM models to be made available in custom functions.
- messenger_interface: MessengerInterface#
A MessengerInterface instance for this pipeline.
It handles connections to interfaces that provide user requests and accept bot responses.
- context_storage: DBContextStorage#
A
DBContextStorage
instance for this pipeline or a dict to store dialogContext
.
- before_handler: BeforeHandler#
BeforeHandler
to add to the pipeline service.
- after_handler: AfterHandler#
AfterHandler
to add to the pipeline service.
- timeout: Optional[float]#
Timeout to add to pipeline root service group.
- parallelize_processing: bool#
This flag determines whether or not the functions defined in the
PRE_RESPONSE_PROCESSING
andPRE_TRANSITIONS_PROCESSING
sections of the script should be parallelized over respective groups.
- classmethod from_file(file, custom_dir='custom', **overrides)[source]#
Create Pipeline by importing it from a file. A file (json or yaml) should contain a dictionary with keys being a subset of pipeline init parameters.
See
JSONImporter.import_pipeline_file()
for more information.- Parameters:
file (
Union
[str
,Path
]) – Path to a file containing pipeline init parameters.custom_dir (
Union
[str
,Path
]) – Path to a directory containing custom code. Defaults to “./custom”. Iffile
does not use custom code, this parameter will not have any effect.overrides – You can pass init parameters to override those imported from the
file
.
- Return type:
- property services_pipeline: PipelineServiceGroup#
A group containing
Pipeline.pre_services
,Actor
andPipeline.post_services
. It hasPipeline.before_handler
andPipeline.after_handler
applied to it.
- validate_start_label()[source]#
Validate
start_label
is inscript
.
- validate_fallback_label()[source]#
Validate
fallback_label
is inscript
.
- async _run_pipeline(request, ctx_id=None, update_ctx_misc=None)[source]#
Method that should be invoked on user input. This method has the same signature as
PipelineRunnerFunction
.This method does:
Retrieve from
context_storage
or initialize contextctx_id
.Update
Context.misc
withupdate_ctx_misc
.Set up
Context.framework_data
fields.Add
request
to the context.Execute
services_pipeline
. This includesActor
(readActor.run_component()
for more information).Save context in the
context_storage
.
- Return type:
- Returns:
Modified context
ctx_id
.
- run()[source]#
Method that starts a pipeline and connects to
messenger_interface
. It also connects to thecontext_storage
(if it’s not already connected).It passes
_run_pipeline()
tomessenger_interface
as a callback, so every time user request is received,_run_pipeline()
will be called.This method can be both blocking and non-blocking. It depends on current
messenger_interface
nature. Message interfaces that run in a loop block current thread.
- __call__(request, ctx_id=None, update_ctx_misc=None)[source]#
Method that executes pipeline once. Basically, it is a shortcut for
_run_pipeline()
. NB! When pipeline is executed this way,messenger_interface
won’t be initiated nor connected. Still, it connects to thecontext_storage
(if it’s not already connected) to avoid sync issues.This method has the same signature as
PipelineRunnerFunction
.- Return type: