5. Extra handlers#
This tutorial shows usage of extra handlers: functions that run before/after components.
For API ref, see:
[1]:
# installing dependencies
%pip install -q chatsky
Note: you may need to restart the kernel to use updated packages.
[2]:
import asyncio
import json
import random
import logging
import sys
from importlib import reload
from datetime import datetime
from chatsky.core.service import (
ServiceGroup,
ExtraHandlerRuntimeInfo,
)
from chatsky import Context, Pipeline
from chatsky.utils.testing.common import (
check_happy_path,
is_interactive_mode,
)
from chatsky.utils.testing.toy_script import HAPPY_PATH, TOY_SCRIPT_KWARGS
reload(logging)
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="")
logger = logging.getLogger(__name__)
random.seed(0)
Intro#
Extra handlers are additional function lists (before-functions and/or after-functions) that can be added to any pipeline components (service and service groups).
Extra handlers main purpose should be statistics collection.
Usage#
Extra handlers can be attached to pipeline component using before_handler
and after_handler
constructor parameter.
Extra handler callable signature can be one of the following: [ctx]
or [ctx, info]
, where:
ctx
-Context
of the current dialog.info
- Dictionary, containing information about current extra handler and the pipeline component that called this.For example,
info.stage
will tell you if this Extra Handler is a BeforeHandler or AfterHandler;info.component.name
will give you the component’s name;info.component.get_state(ctx)
will return the component’s execution state (which isNOT_RUN
for before handlers andFINISHED
for after handlers).
Extra Handler configuration#
Instead of passing a list of functions as extra handler you can pass an instance of either BeforeHandler or AfterHandler.
This allows changing the timeout
and concurrent
options to change the way extra handlers are executed.
Mass extra handler addition#
You can use add_extra_handler to add a function as an extra handler to a service group and if you pass a condition
function it will also add the extra handler to all its subcomponents that satisfy the condition function.
Code explanation#
Here 5 heavy_service
s are run in a single concurrent service group. Each of them sleeps for random a amount of seconds (between 0 and 0.05).
To each of them (as well as the group) time measurement extra handler is attached, that writes execution time to ctx.misc
.
In the end ctx.misc
is logged to info channel.
[3]:
def collect_timestamp_before(ctx: Context, info: ExtraHandlerRuntimeInfo):
ctx.misc.update({f"{info.component.path}": datetime.now()})
def collect_timestamp_after(ctx: Context, info: ExtraHandlerRuntimeInfo):
ctx.misc.update(
{
f"{info.component.path}": datetime.now()
- ctx.misc[f"{info.component.path}"]
}
)
async def heavy_service(_):
await asyncio.sleep(random.randint(0, 5) / 100)
def logging_service(ctx: Context):
logger.info(f"Context misc: {json.dumps(ctx.misc, indent=4, default=str)}")
[4]:
pipeline = Pipeline(
**TOY_SCRIPT_KWARGS,
pre_services=ServiceGroup(
before_handler=[collect_timestamp_before],
after_handler=[collect_timestamp_after],
components=[
{
"handler": heavy_service,
"before_handler": [collect_timestamp_before],
"after_handler": [collect_timestamp_after],
}
]
* 5,
),
post_services=[logging_service],
)
[5]:
if __name__ == "__main__":
check_happy_path(pipeline, HAPPY_PATH[:1], printout=True)
if is_interactive_mode():
pipeline.run()
USER: text='Hi'
Context misc: {
".pre": "0:00:00.121890",
".pre.heavy_service#0": "0:00:00.030262",
".pre.heavy_service#1": "0:00:00.030243",
".pre.heavy_service#2": "0:00:00.000111",
".pre.heavy_service#3": "0:00:00.020226",
".pre.heavy_service#4": "0:00:00.040351"
}
BOT : text='Hi, how are you?'