4. Groups and conditions (basic)#
The following example shows pipeline
service group usage and start conditions.
Here, Services and ServiceGroups are shown for advanced data pre- and postprocessing based on conditions.
[1]:
# installing dependencies
%pip install -q dff
Note: you may need to restart the kernel to use updated packages.
[2]:
import json
import logging
from dff.pipeline import (
Service,
Pipeline,
not_condition,
service_successful_condition,
ServiceRuntimeInfo,
ACTOR,
)
from dff.utils.testing.common import (
check_happy_path,
is_interactive_mode,
run_interactive_mode,
)
from dff.utils.testing.toy_script import HAPPY_PATH, TOY_SCRIPT
logger = logging.getLogger(__name__)
Pipeline can contain not only single services, but also service groups. Service groups can be defined as ServiceGroupBuilder
objects: lists of ServiceBuilders
and ServiceGroupBuilders
or objects. The objects should contain services
- a ServiceBuilder and ServiceGroupBuilder object list.
To receive serialized information about service, service group or pipeline a property info_dict
can be used, it returns important object properties as a dict.
Services and service groups can be executed conditionally. Conditions are functions passed to start_condition
argument. These functions should have following signature:
(ctx: Context, pipeline: Pipeline) -> bool.
Service is only executed if its start_condition returned True
. By default all the services start unconditionally. There are number of built-in condition functions. Built-in condition functions check other service states. These are most important built-in condition functions:
always_start_condition
- Default condition function, always starts service.service_successful_condition(path)
- Function that checks, whether service with givenpath
executed successfully.not_condition(function)
- Function that returns result opposite from the one returned by thefunction
(condition function) argument.
Here there is a conditionally executed service named never_running_service
is always executed. It is executed only if always_running_service
is not finished, that should never happen. The service named context_printing_service
prints pipeline runtime information, that contains execution state of all previously run services.
[3]:
def always_running_service(_, __, info: ServiceRuntimeInfo):
logger.info(f"Service '{info.name}' is running...")
def never_running_service(_, __, info: ServiceRuntimeInfo):
raise Exception(f"Oh no! The '{info.name}' service is running!")
def runtime_info_printing_service(_, __, info: ServiceRuntimeInfo):
logger.info(
f"Service '{info.name}' runtime execution info:"
f"{json.dumps(info, indent=4, default=str)}"
)
[4]:
pipeline_dict = {
"script": TOY_SCRIPT,
"start_label": ("greeting_flow", "start_node"),
"fallback_label": ("greeting_flow", "fallback_node"),
"components": [
Service(
handler=always_running_service,
name="always_running_service",
),
ACTOR,
Service(
handler=never_running_service,
start_condition=not_condition(
service_successful_condition(".pipeline.always_running_service")
),
),
Service(
handler=runtime_info_printing_service,
name="runtime_info_printing_service",
),
],
}
[5]:
pipeline = Pipeline.from_dict(pipeline_dict)
if __name__ == "__main__":
check_happy_path(pipeline, HAPPY_PATH)
if is_interactive_mode():
run_interactive_mode(pipeline)
(user) >>> text='Hi'
(bot) <<< text='Hi, how are you?'
(user) >>> text='i'm fine, how are you?'
(bot) <<< text='Good. What do you want to talk about?'
(user) >>> text='Let's talk about music.'
(bot) <<< text='Sorry, I can not talk about music now.'
(user) >>> text='Ok, goodbye.'
(bot) <<< text='bye'
(user) >>> text='Hi'
(bot) <<< text='Hi, how are you?'