Pipeline#

Pipeline is the main element of the Chatsky framework.

Pipeline is responsible for managing and executing the various components (PipelineComponent) including Actor.

class PipelineServiceGroup(**data)[source]#

Bases: ServiceGroup

A service group that allows actor inside.

components: List[Union[Actor, Service, ServiceGroup]]#

A ServiceGroup object, that will be added to the group.

class Pipeline(script, start_label, fallback_label=None, *, default_priority=None, slots=None, messenger_interface=None, context_storage=None, pre_services=None, post_services=None, before_handler=None, after_handler=None, timeout=None, optimization_warnings=None, parallelize_processing=None)[source]#

Bases: BaseModel

Class that automates service execution and creates service pipeline.

pre_services: ServiceGroup#

ServiceGroup that will be executed before Actor.

post_services: ServiceGroup#

ServiceGroup that will be executed after Actor.

script: Script#

(required) A Script instance (object or dict).

start_label: AbsoluteNodeLabel#

(required) The first node of every context.

fallback_label: AbsoluteNodeLabel#

Node which will is used if Actor cannot find the next node.

This most commonly happens when there are not suitable transitions.

Defaults to start_label.

default_priority: float#

Default priority value for Transition.

Defaults to 1.0.

slots: GroupSlot#

Slots configuration.

messenger_interface: MessengerInterface#

A MessengerInterface instance for this pipeline.

It handles connections to interfaces that provide user requests and accept bot responses.

context_storage: Union[DBContextStorage, Dict]#

A DBContextStorage instance for this pipeline or a dict to store dialog Context.

before_handler: BeforeHandler#

BeforeHandler to add to the pipeline service.

after_handler: AfterHandler#

AfterHandler to add to the pipeline service.

timeout: Optional[float]#

Timeout to add to pipeline root service group.

optimization_warnings: bool#

Asynchronous pipeline optimization check request flag; warnings will be sent to logs. Additionally, it has some calculated fields:

  • services_pipeline is a pipeline root ServiceGroup object,

  • actor is a pipeline actor, found among services.

parallelize_processing: bool#

This flag determines whether or not the functions defined in the PRE_RESPONSE_PROCESSING and PRE_TRANSITIONS_PROCESSING sections of the script should be parallelized over respective groups.

classmethod from_file(file, custom_dir='custom', **overrides)[source]#

Create Pipeline by importing it from a file. A file (json or yaml) should contain a dictionary with keys being a subset of pipeline init parameters.

See JSONImporter.import_pipeline_file() for more information.

Parameters:
  • file (Union[str, Path]) – Path to a file containing pipeline init parameters.

  • custom_dir (Union[str, Path]) – Path to a directory containing custom code. Defaults to “./custom”. If file does not use custom code, this parameter will not have any effect.

  • overrides – You can pass init parameters to override those imported from the file.

Return type:

Pipeline

property actor: Actor#

An actor instance of the pipeline.

property services_pipeline: PipelineServiceGroup#

A group containing Pipeline.pre_services, Actor and Pipeline.post_services. It has Pipeline.before_handler and Pipeline.after_handler applied to it.

validate_start_label()[source]#

Validate start_label is in script.

validate_fallback_label()[source]#

Validate fallback_label is in script.

add_global_handler(global_handler_type, extra_handler, whitelist=None, blacklist=None)[source]#

Method for adding global wrappers to pipeline. Different types of global wrappers are called before/after pipeline execution or before/after each pipeline component. They can be used for pipeline statistics collection or other functionality extensions. NB! Global wrappers are still wrappers, they shouldn’t be used for much time-consuming tasks (see chatsky.core.service.extra).

Parameters:
  • global_handler_type (GlobalExtraHandlerType) – (required) indication where the wrapper function should be executed.

  • extra_handler (ExtraHandlerFunction) – (required) wrapper function itself.

  • whitelist (Optional[List[str]]) – a list of services to only add this wrapper to.

  • blacklist (Optional[List[str]]) – a list of services to not add this wrapper to.

Returns:

None

property info_dict: dict#

Property for retrieving info dictionary about this pipeline. Returns info dict, containing most important component public fields as well as its type. All complex or unserializable fields here are replaced with ‘Instance of [type]’.

async _run_pipeline(request, ctx_id=None, update_ctx_misc=None)[source]#

Method that should be invoked on user input. This method has the same signature as PipelineRunnerFunction.

This method does:

  1. Retrieve from context_storage or initialize context ctx_id.

  2. Update Context.misc with update_ctx_misc.

  3. Set up Context.framework_data fields.

  4. Add request to the context.

  5. Execute services_pipeline. This includes Actor (read Actor.run_component() for more information).

  6. Save context in the context_storage.

Return type:

Context

Returns:

Modified context ctx_id.

run()[source]#

Method that starts a pipeline and connects to messenger_interface.

It passes _run_pipeline() to messenger_interface as a callback, so every time user request is received, _run_pipeline() will be called.

This method can be both blocking and non-blocking. It depends on current messenger_interface nature. Message interfaces that run in a loop block current thread.

__call__(request, ctx_id=None, update_ctx_misc=None)[source]#

Method that executes pipeline once. Basically, it is a shortcut for _run_pipeline(). NB! When pipeline is executed this way, messenger_interface won’t be initiated nor connected.

This method has the same signature as PipelineRunnerFunction.

Return type:

Context