Pipeline#

The Pipeline module contains the Pipeline class, which is a fundamental element of the DFF. The Pipeline class is responsible for managing and executing the various components (PipelineComponent)which make up the processing of messages from and to users. It provides a way to organize and structure the messages processing flow. The Pipeline class is designed to be highly customizable and configurable, allowing developers to add, remove, or modify the components that make up the messages processing flow.

The Pipeline class is designed to be used in conjunction with the PipelineComponent class, which is defined in the Component module. Together, these classes provide a powerful and flexible way to structure and manage the messages processing flow.

class Pipeline(components, script, start_label, fallback_label=None, label_priority=1.0, validation_stage=None, condition_handler=None, verbose=True, handlers=None, messenger_interface=None, context_storage=None, before_handler=None, after_handler=None, timeout=None, optimization_warnings=False, parallelize_processing=False)[source]#

Bases: object

Class that automates service execution and creates service pipeline. It accepts constructor parameters:

Parameters:
  • script (Union[Script, Dict]) – (required) A Script instance (object or dict).

  • start_label (Tuple[str, str]) – (required) Actor start label.

  • fallback_label (Optional[Tuple[str, str]]) – Actor fallback label.

  • label_priority (float) – Default priority value for all actor labels where there is no priority. Defaults to 1.0.

  • validation_stage (Optional[bool]) – This flag sets whether the validation stage is executed after actor creation. It is executed by default. Defaults to None.

  • condition_handler (Optional[Callable]) – Handler that processes a call of actor condition functions. Defaults to None.

  • verbose (bool) – If it is True, logging is used in actor. Defaults to True.

  • handlers (Optional[Dict[ActorStage, List[Callable]]]) –

    This variable is responsible for the usage of external handlers on the certain stages of work of Actor.

    • key: ActorStage - Stage in which the handler is called.

    • value: List[Callable] - The list of called handlers for each stage. Defaults to an empty dict.

  • messenger_interface (Optional[MessengerInterface]) – An AbsMessagingInterface instance for this pipeline.

  • context_storage (Union[DBContextStorage, Dict, None]) – An DBContextStorage instance for this pipeline or a dict to store dialog Context.

  • services – (required) A ServiceGroupBuilder object, that will be transformed to root service group. It should include Actor, but only once (raises exception otherwise). It will always be named pipeline.

  • wrappers – List of wrappers to add to pipeline root service group.

  • timeout (Optional[float]) – Timeout to add to pipeline root service group.

  • optimization_warnings (bool) –

    Asynchronous pipeline optimization check request flag; warnings will be sent to logs. Additionally it has some calculated fields:

    • _services_pipeline is a pipeline root ServiceGroup object,

    • actor is a pipeline actor, found among services.

  • parallelize_processing (bool) – This flag determines whether or not the functions defined in the PRE_RESPONSE_PROCESSING and PRE_TRANSITIONS_PROCESSING sections of the script should be parallelized over respective groups.

add_global_handler(global_handler_type, extra_handler, whitelist=None, blacklist=None)[source]#

Method for adding global wrappers to pipeline. Different types of global wrappers are called before/after pipeline execution or before/after each pipeline component. They can be used for pipeline statistics collection or other functionality extensions. NB! Global wrappers are still wrappers, they shouldn’t be used for much time-consuming tasks (see ../service/wrapper.py).

Parameters:
  • global_handler_type (GlobalExtraHandlerType) – (required) indication where the wrapper function should be executed.

  • extra_handler (ExtraHandlerFunction) – (required) wrapper function itself.

  • whitelist (Optional[List[str]]) – a list of services to only add this wrapper to.

  • blacklist (Optional[List[str]]) – a list of services to not add this wrapper to.

Returns:

None

property info_dict: dict#

Property for retrieving info dictionary about this pipeline. Returns info dict, containing most important component public fields as well as its type. All complex or unserializable fields here are replaced with ‘Instance of [type]’.

pretty_format(show_extra_handlers=False, indent=4)[source]#

Method for receiving pretty-formatted string description of the pipeline. Resulting string structure is somewhat similar to YAML string. Should be used in debugging/logging purposes and should not be parsed.

Parameters:
  • show_wrappers – Whether to include Wrappers or not (could be many and/or generated).

  • indent (int) – Offset from new line to add before component children.

Return type:

str

classmethod from_script(script, start_label, fallback_label=None, label_priority=1.0, validation_stage=None, condition_handler=None, verbose=True, parallelize_processing=False, handlers=None, context_storage=None, messenger_interface=None, pre_services=None, post_services=None)[source]#

Pipeline script-based constructor. It creates Actor object and wraps it with pipeline. NB! It is generally not designed for projects with complex structure. Service and ServiceGroup customization becomes not as obvious as it could be with it. Should be preferred for simple workflows with Actor auto-execution.

Parameters:
  • script (Union[Script, Dict]) – (required) A Script instance (object or dict).

  • start_label (Tuple[str, str]) – (required) Actor start label.

  • fallback_label (Optional[Tuple[str, str]]) – Actor fallback label.

  • label_priority (float) – Default priority value for all actor labels where there is no priority. Defaults to 1.0.

  • validation_stage (Optional[bool]) – This flag sets whether the validation stage is executed after actor creation. It is executed by default. Defaults to None.

  • condition_handler (Optional[Callable]) – Handler that processes a call of actor condition functions. Defaults to None.

  • verbose (bool) – If it is True, logging is used in actor. Defaults to True.

  • parallelize_processing (bool) – This flag determines whether or not the functions defined in the PRE_RESPONSE_PROCESSING and PRE_TRANSITIONS_PROCESSING sections of the script should be parallelized over respective groups.

  • handlers (Optional[Dict[ActorStage, List[Callable]]]) –

    This variable is responsible for the usage of external handlers on the certain stages of work of Actor.

    • key: ActorStage - Stage in which the handler is called.

    • value: List[Callable] - The list of called handlers for each stage. Defaults to an empty dict.

  • context_storage (Union[DBContextStorage, Dict, None]) – An DBContextStorage instance for this pipeline or a dict to store dialog Context.

  • messenger_interface (Optional[MessengerInterface]) – An instance for this pipeline.

  • pre_services (Optional[List[Union[ServiceBuilder, ServiceGroupBuilder]]]) – List of ServiceBuilder or ServiceGroupBuilder that will be executed before Actor.

  • post_services (Optional[List[Union[ServiceBuilder, ServiceGroupBuilder]]]) – List of ServiceBuilder or ServiceGroupBuilder that will be executed after Actor. It constructs root service group by merging pre_services + actor + post_services.

Return type:

Pipeline

set_actor(script, start_label, fallback_label=None, label_priority=1.0, validation_stage=None, condition_handler=None, verbose=True, handlers=None)[source]#

Set actor for the current pipeline and conducts necessary checks. Reset actor to previous if any errors are found.

Parameters:
  • script (Union[Script, Dict]) – (required) A Script instance (object or dict).

  • start_label (Tuple[str, str]) – (required) Actor start label. The start node of Script. The execution begins with it.

  • fallback_label (Optional[Tuple[str, str]]) – Actor fallback label. The label of Script. Dialog comes into that label if all other transitions failed, or there was an error while executing the scenario.

  • label_priority (float) – Default priority value for all actor labels where there is no priority. Defaults to 1.0.

  • validation_stage (Optional[bool]) – This flag sets whether the validation stage is executed in actor. It is executed by default. Defaults to None.

  • condition_handler (Optional[Callable]) – Handler that processes a call of actor condition functions. Defaults to None.

  • verbose (bool) – If it is True, logging is used in actor. Defaults to True.

  • handlers (Optional[Dict[ActorStage, List[Callable]]]) –

    This variable is responsible for the usage of external handlers on the certain stages of work of Actor.

    • key ActorStage - Stage in which the handler is called.

    • value List[Callable] - The list of called handlers for each stage. Defaults to an empty dict.

classmethod from_dict(dictionary)[source]#

Pipeline dictionary-based constructor. Dictionary should have the fields defined in Pipeline main constructor, it will be split and passed to it as **kwargs.

Return type:

Pipeline

async _run_pipeline(request, ctx_id=None, update_ctx_misc=None)[source]#

Method that should be invoked on user input. This method has the same signature as PipelineRunnerFunction.

Return type:

Context

run()[source]#

Method that starts a pipeline and connects to messenger_interface. It passes _run_pipeline to messenger_interface as a callbacks, so every time user request is received, _run_pipeline will be called. This method can be both blocking and non-blocking. It depends on current messenger_interface nature. Message interfaces that run in a loop block current thread.

property script: Script#