Source code for chatsky.core.service.component

"""
Component
---------
The Component module defines a :py:class:`.PipelineComponent` class.

This is a base class for pipeline processing and is responsible for performing a specific task.
"""

from __future__ import annotations

import logging
import abc
import asyncio
from typing import Optional, TYPE_CHECKING
from pydantic import BaseModel, Field, field_validator

from chatsky.core.service.extra import BeforeHandler, AfterHandler
from chatsky.core.script_function import AnyCondition
from chatsky.core.service.types import (
    ComponentExecutionState,
    ExtraHandlerType,
    ExtraHandlerFunction,
)

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from chatsky.core.context import Context


[docs] class PipelineComponent(abc.ABC, BaseModel, extra="forbid", arbitrary_types_allowed=True): """ Base class for a single task processed by :py:class:`.Pipeline`. """ before_handler: BeforeHandler = Field(default_factory=BeforeHandler) """ :py:class:`~.BeforeHandler`, associated with this component. """ after_handler: AfterHandler = Field(default_factory=AfterHandler) """ :py:class:`~.AfterHandler`, associated with this component. """ timeout: Optional[float] = None """ Maximum component execution time (in seconds), if it exceeds this time, it is interrupted. """ concurrent: bool = False """ Optional flag that indicates whether this component should be executed concurrently with adjacent concurrent components. """ start_condition: AnyCondition = Field(default=True, validate_default=True) """ :py:data:`~.AnyCondition` that is invoked before each component execution; component is executed only if it returns ``True``. """ name: Optional[str] = None """ Name of the component. Defaults to :py:attr:`.computed_name` potentially modified by :py:func:`~chatsky.core.utils.rename_component_incrementing`. See :py:meth:`.validate_name` for rules. """ path: Optional[str] = None """ Separated by dots path to component, is universally unique. """
[docs] @field_validator("name") @classmethod def validate_name(cls, name: str): """ Validate this component's name: Name cannot be empty or contain "." or "#". :raises ValueError: If name failed validation. """ if name is not None: if name == "": raise ValueError("Name cannot be blank.") if "." in name: raise ValueError(f"Name cannot contain '.': {name!r}.") if "#" in name: raise ValueError(f"Name cannot contain '#': {name!r}.") return name
[docs] def _set_state(self, ctx: Context, value: ComponentExecutionState): """ Method for component runtime state setting, state is preserved in :py:attr:`.Context.framework_data`. :param ctx: :py:class:`.Context` to keep state in. :param value: State to set. """ ctx.framework_data.service_states[self.path].execution_status = value
[docs] def get_state(self, ctx: Context) -> ComponentExecutionState: """ Method for component runtime state getting, state is preserved in :py:attr:`.Context.framework_data`. :param ctx: :py:class:`~.Context` to get state from. :return: :py:class:`.ComponentExecutionState` of this service. """ return ctx.framework_data.service_states[self.path].execution_status
[docs] @abc.abstractmethod async def run_component(self, ctx: Context) -> Optional[ComponentExecutionState]: """ Run this component. :param ctx: Current dialog :py:class:`~.Context`. """ raise NotImplementedError
@property def computed_name(self) -> str: """ Default name that is used if :py:attr:`~.PipelineComponent.name` is not defined. In case two components in a :py:class:`~chatsky.core.service.group.ServiceGroup` have the same :py:attr:`.computed_name` an incrementing number is appended to the name. """ return "noname_service"
[docs] async def _run(self, ctx: Context) -> None: """ A method for running a pipeline component. Executes extra handlers before and after execution, launches :py:meth:`.run_component` method. This method is run after the component's timeout is set (if needed). :param ctx: Current dialog :py:class:`~.Context`. """ async def _inner_run(): if await self.start_condition(ctx): await self.before_handler(ctx, self) self._set_state(ctx, ComponentExecutionState.RUNNING) logger.debug(f"Running component {self.path!r}") result = await self.run_component(ctx) if isinstance(result, ComponentExecutionState): self._set_state(ctx, result) else: self._set_state(ctx, ComponentExecutionState.FINISHED) await self.after_handler(ctx, self) else: self._set_state(ctx, ComponentExecutionState.NOT_RUN) try: await asyncio.wait_for(_inner_run(), timeout=self.timeout) except Exception as exc: self._set_state(ctx, ComponentExecutionState.FAILED) logger.error(f"Service '{self.name}' execution failed!", exc_info=exc) finally: ctx.framework_data.service_states[self.path].finished_event.set()
[docs] async def __call__(self, ctx: Context) -> None: """ A method for calling pipeline components. It sets up timeout and executes it using :py:meth:`_run` method. :param ctx: Current dialog :py:class:`~.Context`. :return: ``None`` """ await self._run(ctx)
[docs] def add_extra_handler(self, extra_handler_type: ExtraHandlerType, extra_handler: ExtraHandlerFunction): """ Add extra handler to this component. :param extra_handler_type: A type of extra handler to add (before or after). :param extra_handler: Function to add to the component as an extra handler. """ if extra_handler_type == ExtraHandlerType.BEFORE: target = self.before_handler elif extra_handler_type == ExtraHandlerType.AFTER: target = self.after_handler else: raise ValueError(f"Unrecognized ExtraHandlerType: {extra_handler_type}") target.functions.append(extra_handler)