Pipeline#

Pipeline is the main element of the Chatsky framework.

Pipeline is responsible for managing and executing the various components (PipelineComponent) including Actor.

class PipelineServiceGroup(**data)[source]#

Bases: ServiceGroup

A service group that allows actor inside.

components: List[Union[Actor, Service, ServiceGroup]]#

A ServiceGroup object, that will be added to the group.

class Pipeline(script, start_label, fallback_label=None, *, default_priority=None, slots=None, messenger_interface=None, context_storage=None, pre_services=None, post_services=None, before_handler=None, after_handler=None, timeout=None, parallelize_processing=None)[source]#

Bases: BaseModel

Class that automates service execution and creates service pipeline.

pre_services: ServiceGroup#

ServiceGroup that will be executed before Actor.

post_services: ServiceGroup#

ServiceGroup that will be executed after Actor.

script: Script#

(required) A Script instance (object or dict).

start_label: AbsoluteNodeLabel#

(required) The first node of every context.

fallback_label: AbsoluteNodeLabel#

Node which will is used if Actor cannot find the next node.

This most commonly happens when there are not suitable transitions.

Defaults to start_label.

default_priority: float#

Default priority value for Transition.

Defaults to 1.0.

slots: GroupSlot#

Slots configuration.

messenger_interface: MessengerInterface#

A MessengerInterface instance for this pipeline.

It handles connections to interfaces that provide user requests and accept bot responses.

context_storage: Union[DBContextStorage, Dict]#

A DBContextStorage instance for this pipeline or a dict to store dialog Context.

before_handler: BeforeHandler#

BeforeHandler to add to the pipeline service.

after_handler: AfterHandler#

AfterHandler to add to the pipeline service.

timeout: Optional[float]#

Timeout to add to pipeline root service group.

parallelize_processing: bool#

This flag determines whether or not the functions defined in the PRE_RESPONSE_PROCESSING and PRE_TRANSITIONS_PROCESSING sections of the script should be parallelized over respective groups.

classmethod from_file(file, custom_dir='custom', **overrides)[source]#

Create Pipeline by importing it from a file. A file (json or yaml) should contain a dictionary with keys being a subset of pipeline init parameters.

See JSONImporter.import_pipeline_file() for more information.

Parameters:
  • file (Union[str, Path]) – Path to a file containing pipeline init parameters.

  • custom_dir (Union[str, Path]) – Path to a directory containing custom code. Defaults to “./custom”. If file does not use custom code, this parameter will not have any effect.

  • overrides – You can pass init parameters to override those imported from the file.

Return type:

Pipeline

property actor: Actor#

An actor instance of the pipeline.

property services_pipeline: PipelineServiceGroup#

A group containing Pipeline.pre_services, Actor and Pipeline.post_services. It has Pipeline.before_handler and Pipeline.after_handler applied to it.

validate_start_label()[source]#

Validate start_label is in script.

validate_fallback_label()[source]#

Validate fallback_label is in script.

async _run_pipeline(request, ctx_id=None, update_ctx_misc=None)[source]#

Method that should be invoked on user input. This method has the same signature as PipelineRunnerFunction.

This method does:

  1. Retrieve from context_storage or initialize context ctx_id.

  2. Update Context.misc with update_ctx_misc.

  3. Set up Context.framework_data fields.

  4. Add request to the context.

  5. Execute services_pipeline. This includes Actor (read Actor.run_component() for more information).

  6. Save context in the context_storage.

Return type:

Context

Returns:

Modified context ctx_id.

run()[source]#

Method that starts a pipeline and connects to messenger_interface.

It passes _run_pipeline() to messenger_interface as a callback, so every time user request is received, _run_pipeline() will be called.

This method can be both blocking and non-blocking. It depends on current messenger_interface nature. Message interfaces that run in a loop block current thread.

__call__(request, ctx_id=None, update_ctx_misc=None)[source]#

Method that executes pipeline once. Basically, it is a shortcut for _run_pipeline(). NB! When pipeline is executed this way, messenger_interface won’t be initiated nor connected.

This method has the same signature as PipelineRunnerFunction.

Return type:

Context