Pipeline#
Pipeline is the main element of the Chatsky framework.
Pipeline is responsible for managing and executing the various components
(PipelineComponent
)
including Actor
.
- class PipelineServiceGroup(**data)[source]#
Bases:
ServiceGroup
A service group that allows actor inside.
-
components:
List
[Union
[Actor
,Service
,ServiceGroup
]]# A
ServiceGroup
object, that will be added to the group.
-
components:
- class Pipeline(script, start_label, fallback_label=None, *, default_priority=None, slots=None, messenger_interface=None, context_storage=None, pre_services=None, post_services=None, before_handler=None, after_handler=None, timeout=None, optimization_warnings=None, parallelize_processing=None)[source]#
Bases:
BaseModel
Class that automates service execution and creates service pipeline.
-
pre_services:
ServiceGroup
# ServiceGroup
that will be executed before Actor.
-
post_services:
ServiceGroup
# ServiceGroup
that will be executed afterActor
.
-
start_label:
AbsoluteNodeLabel
# (required) The first node of every context.
-
fallback_label:
AbsoluteNodeLabel
# Node which will is used if
Actor
cannot find the next node.This most commonly happens when there are not suitable transitions.
Defaults to
start_label
.
-
default_priority:
float
# Default priority value for
Transition
.Defaults to
1.0
.
-
messenger_interface:
MessengerInterface
# A MessengerInterface instance for this pipeline.
It handles connections to interfaces that provide user requests and accept bot responses.
-
context_storage:
Union
[DBContextStorage
,Dict
]# A
DBContextStorage
instance for this pipeline or a dict to store dialogContext
.
-
before_handler:
BeforeHandler
# BeforeHandler
to add to the pipeline service.
-
after_handler:
AfterHandler
# AfterHandler
to add to the pipeline service.
-
timeout:
Optional
[float
]# Timeout to add to pipeline root service group.
-
optimization_warnings:
bool
# Asynchronous pipeline optimization check request flag; warnings will be sent to logs. Additionally, it has some calculated fields:
services_pipeline is a pipeline root
ServiceGroup
object,actor is a pipeline actor, found among services.
-
parallelize_processing:
bool
# This flag determines whether or not the functions defined in the
PRE_RESPONSE_PROCESSING
andPRE_TRANSITIONS_PROCESSING
sections of the script should be parallelized over respective groups.
- classmethod from_file(file, custom_dir='custom', **overrides)[source]#
Create Pipeline by importing it from a file. A file (json or yaml) should contain a dictionary with keys being a subset of pipeline init parameters.
See
JSONImporter.import_pipeline_file()
for more information.- Parameters:
file (
Union
[str
,Path
]) – Path to a file containing pipeline init parameters.custom_dir (
Union
[str
,Path
]) – Path to a directory containing custom code. Defaults to “./custom”. Iffile
does not use custom code, this parameter will not have any effect.overrides – You can pass init parameters to override those imported from the
file
.
- Return type:
- property services_pipeline: PipelineServiceGroup#
A group containing
Pipeline.pre_services
,Actor
andPipeline.post_services
. It hasPipeline.before_handler
andPipeline.after_handler
applied to it.
- validate_start_label()[source]#
Validate
start_label
is inscript
.
- validate_fallback_label()[source]#
Validate
fallback_label
is inscript
.
- add_global_handler(global_handler_type, extra_handler, whitelist=None, blacklist=None)[source]#
Method for adding global wrappers to pipeline. Different types of global wrappers are called before/after pipeline execution or before/after each pipeline component. They can be used for pipeline statistics collection or other functionality extensions. NB! Global wrappers are still wrappers, they shouldn’t be used for much time-consuming tasks (see
chatsky.core.service.extra
).- Parameters:
global_handler_type (
GlobalExtraHandlerType
) – (required) indication where the wrapper function should be executed.extra_handler (ExtraHandlerFunction) – (required) wrapper function itself.
whitelist (
Optional
[List
[str
]]) – a list of services to only add this wrapper to.blacklist (
Optional
[List
[str
]]) – a list of services to not add this wrapper to.
- Returns:
None
- property info_dict: dict#
Property for retrieving info dictionary about this pipeline. Returns info dict, containing most important component public fields as well as its type. All complex or unserializable fields here are replaced with ‘Instance of [type]’.
- async _run_pipeline(request, ctx_id=None, update_ctx_misc=None)[source]#
Method that should be invoked on user input. This method has the same signature as
PipelineRunnerFunction
.This method does:
Retrieve from
context_storage
or initialize contextctx_id
.Update
Context.misc
withupdate_ctx_misc
.Set up
Context.framework_data
fields.Add
request
to the context.Execute
services_pipeline
. This includesActor
(readActor.run_component()
for more information).Save context in the
context_storage
.
- Return type:
- Returns:
Modified context
ctx_id
.
- run()[source]#
Method that starts a pipeline and connects to
messenger_interface
.It passes
_run_pipeline()
tomessenger_interface
as a callback, so every time user request is received,_run_pipeline()
will be called.This method can be both blocking and non-blocking. It depends on current
messenger_interface
nature. Message interfaces that run in a loop block current thread.
- __call__(request, ctx_id=None, update_ctx_misc=None)[source]#
Method that executes pipeline once. Basically, it is a shortcut for
_run_pipeline()
. NB! When pipeline is executed this way,messenger_interface
won’t be initiated nor connected.This method has the same signature as
PipelineRunnerFunction
.- Return type:
-
pre_services: