Types#

The Types module contains several classes and special types that are used throughout the DFF Pipeline. The classes and special types in this module can include data models, data structures, and other types that are defined for type hinting.

class PipelineRunnerFunction(*args, **kwargs)[source]#

Bases: Protocol

Protocol for pipeline running.

_is_protocol = True#
class ComponentExecutionState(value)[source]#

Bases: str, Enum

Enum, representing pipeline component execution state. These states are stored in ctx.framework_keys[PIPELINE_STATE_KEY], that should always be requested with NOT_RUN being default fallback. Following states are supported:

  • NOT_RUN: component has not been executed yet (the default one),

  • RUNNING: component is currently being executed,

  • FINISHED: component executed successfully,

  • FAILED: component execution failed.

NOT_RUN = 'NOT_RUN'#
RUNNING = 'RUNNING'#
FINISHED = 'FINISHED'#
FAILED = 'FAILED'#
_generate_next_value_(start, count, last_values)#

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_value: the last value assigned or None

_member_names_ = ['NOT_RUN', 'RUNNING', 'FINISHED', 'FAILED']#
_member_map_ = {'FAILED': ComponentExecutionState.FAILED, 'FINISHED': ComponentExecutionState.FINISHED, 'NOT_RUN': ComponentExecutionState.NOT_RUN, 'RUNNING': ComponentExecutionState.RUNNING}#
_member_type_#

alias of str

_value2member_map_ = {'FAILED': ComponentExecutionState.FAILED, 'FINISHED': ComponentExecutionState.FINISHED, 'NOT_RUN': ComponentExecutionState.NOT_RUN, 'RUNNING': ComponentExecutionState.RUNNING}#
class GlobalExtraHandlerType(value)[source]#

Bases: str, Enum

Enum, representing types of global wrappers, that can be set applied for a pipeline. The following types are supported:

  • BEFORE_ALL: function called before each pipeline call,

  • BEFORE: function called before each component,

  • AFTER: function called after each component,

  • AFTER_ALL: function called after each pipeline call.

BEFORE_ALL = 'BEFORE_ALL'#
BEFORE = 'BEFORE'#
AFTER = 'AFTER'#
AFTER_ALL = 'AFTER_ALL'#
_generate_next_value_(start, count, last_values)#

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_value: the last value assigned or None

_member_names_ = ['BEFORE_ALL', 'BEFORE', 'AFTER', 'AFTER_ALL']#
_member_map_ = {'AFTER': GlobalExtraHandlerType.AFTER, 'AFTER_ALL': GlobalExtraHandlerType.AFTER_ALL, 'BEFORE': GlobalExtraHandlerType.BEFORE, 'BEFORE_ALL': GlobalExtraHandlerType.BEFORE_ALL}#
_member_type_#

alias of str

_value2member_map_ = {'AFTER': GlobalExtraHandlerType.AFTER, 'AFTER_ALL': GlobalExtraHandlerType.AFTER_ALL, 'BEFORE': GlobalExtraHandlerType.BEFORE, 'BEFORE_ALL': GlobalExtraHandlerType.BEFORE_ALL}#
class ExtraHandlerType(value)[source]#

Bases: str, Enum

Enum, representing wrapper execution stage: before or after the wrapped function. The following types are supported:

  • UNDEFINED: wrapper function with undetermined execution stage,

  • BEFORE: wrapper function called before component,

  • AFTER: wrapper function called after component.

UNDEFINED = 'UNDEFINED'#
BEFORE = 'BEFORE'#
AFTER = 'AFTER'#
_generate_next_value_(start, count, last_values)#

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_value: the last value assigned or None

_member_names_ = ['UNDEFINED', 'BEFORE', 'AFTER']#
_member_map_ = {'AFTER': ExtraHandlerType.AFTER, 'BEFORE': ExtraHandlerType.BEFORE, 'UNDEFINED': ExtraHandlerType.UNDEFINED}#
_member_type_#

alias of str

_value2member_map_ = {'AFTER': ExtraHandlerType.AFTER, 'BEFORE': ExtraHandlerType.BEFORE, 'UNDEFINED': ExtraHandlerType.UNDEFINED}#
PIPELINE_STATE_KEY = 'PIPELINE'#

PIPELINE: storage for services and groups execution status. Should be used in ctx.framework_keys[PIPELINE_STATE_KEY].

StartConditionCheckerFunction#

A function type for components start_conditions. Accepts context and pipeline, returns boolean (whether service can be launched).

alias of Callable[[Context, Pipeline], bool]

StartConditionCheckerAggregationFunction#

A function type for creating aggregation start_conditions for components. Accepts list of functions (other start_conditions to aggregate), returns boolean (whether service can be launched).

alias of Callable[[Iterable[bool]], bool]

ExtraHandlerConditionFunction#

A function type used during global wrappers initialization to determine whether wrapper should be applied to component with given path or not. Checks components path to be in whitelist (if defined) and not to be in blacklist (if defined). Accepts str (component path), returns boolean (whether wrapper should be applied).

alias of Callable[[str], bool]

class ServiceRuntimeInfo(**data)[source]#

Bases: BaseModel

Type of object, that is passed to components in runtime. Contains current component info (name, path, timeout, asynchronous). Also contains execution_state - a dictionary, containing execution states of other components mapped to their paths.

name: str#
path: str#
timeout: Optional[float]#
asynchronous: bool#
execution_state: Dict[str, ComponentExecutionState]#
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

ExtraHandlerFunction#

A function type for creating wrappers (before and after functions). Can accept current dialog context, pipeline, and current wrapper info.

alias of Union[Callable[[Context], Any], Callable[[Context, Pipeline], Any], Callable[[Context, Pipeline, ExtraHandlerRuntimeInfo], Any]]

class ExtraHandlerRuntimeInfo(**data)[source]#

Bases: BaseModel

func: ExtraHandlerFunction#
stage: ExtraHandlerType#
component: ServiceRuntimeInfo#
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

ServiceFunction#

A function type for creating service handlers. Can accept current dialog context, pipeline, and current service info. Can be both synchronous and asynchronous.

alias of Union[Callable[[Context], None], Callable[[Context], Awaitable[None]], Callable[[Context, Pipeline], None], Callable[[Context, Pipeline], Awaitable[None]], Callable[[Context, Pipeline, ServiceRuntimeInfo], None], Callable[[Context, Pipeline, ServiceRuntimeInfo], Awaitable[None]]]

ExtraHandlerBuilder#

A type, representing anything that can be transformed to ExtraHandlers. It can be:

  • ExtraHandlerFunction object

  • Dictionary, containing keys timeout, asynchronous, functions

alias of Union[_ComponentExtraHandler, WrapperDict, List[Union[Callable[[Context], Any], Callable[[Context, Pipeline], Any], Callable[[Context, Pipeline, ExtraHandlerRuntimeInfo], Any]]]]

ServiceBuilder#

A type, representing anything that can be transformed to service. It can be:

  • ServiceFunction (will become handler)

  • Service object (will be spread and recreated)

  • String ‘ACTOR’ - the pipeline Actor will be placed there

  • Dictionary, containing keys that are present in Service constructor parameters

alias of Union[Callable[[Context], None], Callable[[Context], Awaitable[None]], Callable[[Context, Pipeline], None], Callable[[Context, Pipeline], Awaitable[None]], Callable[[Context, Pipeline, ServiceRuntimeInfo], None], Callable[[Context, Pipeline, ServiceRuntimeInfo], Awaitable[None]], Service, str, ServiceDict]

ServiceGroupBuilder#

A type, representing anything that can be transformed to service group. It can be:

  • List of ServiceBuilders, ServiceGroup objects and lists (recursive)

  • ServiceGroup object (will be spread and recreated)

alias of Union[List[Union[Callable[[Context], None], Callable[[Context], Awaitable[None]], Callable[[Context, Pipeline], None], Callable[[Context, Pipeline], Awaitable[None]], Callable[[Context, Pipeline, ServiceRuntimeInfo], None], Callable[[Context, Pipeline, ServiceRuntimeInfo], Awaitable[None]], Service, str, ServiceDict, List[Union[Callable[[Context], None], Callable[[Context], Awaitable[None]], Callable[[Context, Pipeline], None], Callable[[Context, Pipeline], Awaitable[None]], Callable[[Context, Pipeline, ServiceRuntimeInfo], None], Callable[[Context, Pipeline, ServiceRuntimeInfo], Awaitable[None]], Service, str, ServiceDict]], ServiceGroup]], ServiceGroup]

class PipelineBuilder#

Bases: TypedDict

A type, representing anything that can be transformed to pipeline. It can be Dictionary, containing keys that are present in Pipeline constructor parameters.

messenger_interface: NotRequired[Optional[MessengerInterface]]#
context_storage: NotRequired[Union[DBContextStorage, Dict, None]]#
components: Union[List[Union[Callable[[Context], None], Callable[[Context], Awaitable[None]], Callable[[Context, Pipeline], None], Callable[[Context, Pipeline], Awaitable[None]], Callable[[Context, Pipeline, ServiceRuntimeInfo], None], Callable[[Context, Pipeline, ServiceRuntimeInfo], Awaitable[None]], Service, str, ServiceDict, List[Union[Callable[[Context], None], Callable[[Context], Awaitable[None]], Callable[[Context, Pipeline], None], Callable[[Context, Pipeline], Awaitable[None]], Callable[[Context, Pipeline, ServiceRuntimeInfo], None], Callable[[Context, Pipeline, ServiceRuntimeInfo], Awaitable[None]], Service, str, ServiceDict]], ServiceGroup]], ServiceGroup]#
before_handler: NotRequired[Union[_ComponentExtraHandler, WrapperDict, List[Union[Callable[[Context], Any], Callable[[Context, Pipeline], Any], Callable[[Context, Pipeline, ExtraHandlerRuntimeInfo], Any]]], None]]#
after_handler: NotRequired[Union[_ComponentExtraHandler, WrapperDict, List[Union[Callable[[Context], Any], Callable[[Context, Pipeline], Any], Callable[[Context, Pipeline, ExtraHandlerRuntimeInfo], Any]]], None]]#
optimization_warnings: NotRequired[bool]#
parallelize_processing: NotRequired[bool]#
script: Union[Script, Dict]#
start_label: Tuple[str, str]#
fallback_label: NotRequired[Optional[Tuple[str, str]]]#
label_priority: NotRequired[float]#
validation_stage: NotRequired[Optional[bool]]#
condition_handler: NotRequired[Optional[Callable]]#
verbose: NotRequired[bool]#
handlers: NotRequired[Optional[Dict[ActorStage, List[Callable]]]]#