chatsky_ui.services package

chatsky_ui.services.index module

Index service

The Index service is responsible for indexing the user bot’s conditions, responses, and services. By indexing the project, the Index service creates an in-memory representation that can be quickly accessed when needed.

class chatsky_ui.services.index.Index[source]

Bases: object

get_services() dict[source]

Get indexed services.

Example:
>>> get_services()
{
    "is_upper_case": {"type": "condition", "lineno": 3},
    "say_hi": {"type": "response", "lineno": 5}
}
async indexit(service_name: str, type_: str, lineno: int) None[source]

Add service info to the index using indexit_all method.

async indexit_all(services_names: List[str], types: List[str], linenos: List[int]) None[source]

Index multiple services.

The index is added to the index in the form: {service_name: {“type”: type_, “lineno”: lineno}}.

Args:

services_names: list of service names types: list of service types (“condition”, “response”, “service”) linenos: list of service starting line numbers according to its place in the file.

Raises:

FileNotFoundError: if the index file doesn’t exist

Example:
>>> services_names = ["is_upper_case", "say_hi"]
>>> types = ["condition", "response"]
>>> linenos = [3, 5]
>>> await indexit_all(services_names, types, linenos)
{
    "is_upper_case": {"type": "condition", "lineno": 3},
    "say_hi": {"type": "response", "lineno": 5}
}
Returns:

None

async load() None[source]

Load index and services into memory.

property logger
property path
async search_service(service_name: str) List[str] | None[source]

Get the body code of a service based on its indexed info (type, lineno).

Example:
>>> search_service("is_upper_case")
["def is_upper_case(name):

“, “ return name.isupper()”]

set_logger()[source]
set_path(path: Path)[source]

chatsky_ui.services.json_converter module

chatsky_ui.services.process module

Process classes.

Classes for build and run processes.

class chatsky_ui.services.process.BuildProcess(id_: int, preset_end_status: str = '')[source]

Bases: Process

Process for converting a frontned graph to a DFF script.

async get_full_info(attributes: list | None = None) Dict[str, Any][source]

Get the values of the attributes mentioned in the list.

Args:

attributes (list): A list of attributes to get the values of.

Returns:

dict: A dictionary containing the values of the attributes mentioned in the list.

async update_db_info() None[source]
class chatsky_ui.services.process.Process(id_: int, preset_end_status: str = '')[source]

Bases: ABC

Base for build and run processes.

async check_status() Status[source]

Returns the process current status.

Returns:
  • Status.NULL: When a process is initiated but not started yet. This condition is unusual and typically

    indicates incorrect usage or a process misuse in backend logic.

  • Status.ALIVE: process is alive and ready to communicate

  • Status.RUNNING: process is still trying to get alive. no communication

  • Status.COMPLETED: returncode is 0

  • Status.FAILED: returncode is 1

  • Status.STOPPED: returncode is -15

  • Status.FAILED_WITH_UNEXPECTED_CODE: failed with other returncode

async get_full_info(attributes: list) Dict[str, Any][source]

Get the values of the attributes mentioned in the list.

Args:

attributes (list): A list of attributes to get the values of.

Returns:

dict: A dictionary containing the values of the attributes mentioned in the list.

async is_alive() bool[source]

Checks if the process is alive by writing to stdin andreading its stdout.

async periodically_check_status() None[source]

Periodically checks the process status and updates the database.

async read_stdout() bytes[source]

Reads the stdout of the process for communication.

async start(cmd_to_run: str) None[source]

Starts an asyncronous process with the given command.

async stop() None[source]

Stops the process.

Raises:

ProcessLookupError: If the process doesn’t exist or already exited. RuntimeError: If the process has not started yet.

abstract async update_db_info()[source]
async write_stdin(message: bytes) None[source]

Writes a message to the stdin of the process for communication.

class chatsky_ui.services.process.RunProcess(id_: int, build_id: int, preset_end_status: str = '')[source]

Bases: Process

Process for running a DFF pipeline.

async get_full_info(attributes: list | None = None) Dict[str, Any][source]

Get the values of the attributes mentioned in the list.

Args:

attributes (list): A list of attributes to get the values of.

Returns:

dict: A dictionary containing the values of the attributes mentioned in the list.

async update_db_info() None[source]

chatsky_ui.services.process_manager module

Process manager

Process managers are used to manage run and build processes. They are responsible for starting, stopping, updating, and checking status of processes. Processes themselves are stored in the processes dictionary of process managers.

class chatsky_ui.services.process_manager.BuildManager[source]

Bases: ProcessManager

Process manager for converting a frontned graph to a DFF script.

async check_status(id_, index, *args, **kwargs)[source]

Checks the build “id_” process status by calling the periodically_check_status method of the process.

This updates the process status in the database every 2 seconds. The index is refreshed after the build is done/failed.

async fetch_build_logs(build_id: int, offset: int, limit: int) List[str] | None[source]

Returns the logs of one build according to its id.

Number of loglines returned is based on offset as the start line and limited by limit lines.

async get_build_info(id_: int, run_manager: RunManager) Dict[str, Any] | None[source]

Returns metadata of a specific build process identified by its unique ID.

Args:

id_ (int): the id of the build run_manager (RunManager): the run manager to use for getting all runs of this build

async get_full_info(offset: int, limit: int, path: Path | None = None) List[Dict[str, Any]][source]

Returns metadata of limit number of processes, starting from the offset process.

async get_full_info_with_runs_info(run_manager: RunManager, offset: int, limit: int) List[Dict[str, Any]][source]

Returns metadata of limit number of processes, starting from the ``offset``th process.

Args:

run_manager (RunManager): the run manager to use for getting all runs of this build

async start(preset: Preset) int[source]

Starts a new build process.

Increases the maximum existing id by 1 and assigns it to the new process. Starts the process and appends it to the processes list.

Args:

preset (Preset): the preset to use among (“success”, “failure”, “loop”)

Returns:

int: the id of the new started process

class chatsky_ui.services.process_manager.ProcessManager[source]

Bases: object

Base for build and run process managers.

async check_status(id_: int, *args, **kwargs) None[source]

Checks the status of the process with the given id by calling the periodically_check_status method of the process.

This updates the process status in the database every 2 seconds.

async fetch_process_logs(id_: int, offset: int, limit: int, path: Path) List[str] | None[source]

Returns the logs of one process according to its id. If the process is not found, returns None.

async get_full_info(offset: int, limit: int, path: Path) List[Dict[str, Any]][source]

Returns metadata of limit number of processes, starting from the ``offset``th process.

get_last_id()[source]

Gets the maximum id among processes of type BuildProcess or RunProcess.

async get_process_info(id_: int, path: Path) Dict[str, Any] | None[source]

Returns metadata of a specific process identified by its unique ID.

async get_status(id_: int) Status[source]

Checks the status of the process with the given id by calling the check_status method of the process.

property logger
set_logger()[source]
async stop(id_: int) None[source]

Stops the process with the given id.

raises:

ProcessLookupError: If the process with the given id is not found. RuntimeError: If the process has not started yet.

class chatsky_ui.services.process_manager.RunManager[source]

Bases: ProcessManager

Process manager for running a DFF pipeline.

async fetch_run_logs(run_id: int, offset: int, limit: int) List[str] | None[source]

Returns the logs of one run according to its id.

Number of loglines returned is based on offset as the start line and limited by limit lines.

async get_full_info(offset: int, limit: int, path: Path | None = None) List[Dict[str, Any]][source]

Returns metadata of limit number of run processes, starting from the ``offset``th process.

async get_run_info(id_: int) Dict[str, Any] | None[source]

Returns metadata of a specific run process identified by its unique ID.

async start(build_id: int, preset: Preset) int[source]

Starts a new run process.

Increases the maximum existing id by 1 and assigns it to the new process. Starts the process and appends it to the processes list.

Args:

build_id (int): the build id to run preset (Preset): the preset to use among (“success”, “failure”, “loop”)

Returns:

int: the id of the new started process

chatsky_ui.services.websocket_manager module

Websocket class for controling websocket operations.

class chatsky_ui.services.websocket_manager.WebSocketManager[source]

Bases: object

Controls websocket operations connect, disconnect, check status, and communicate.

check_status(websocket: WebSocket)[source]
async connect(websocket: WebSocket)[source]

Accepts the websocket connection and marks it as active connection.

disconnect(websocket: WebSocket)[source]

Cancels pending tasks of the open websocket process and removes it from active connections.

async forward_websocket_messages_to_process(run_id: int, process_manager: ProcessManager, websocket: WebSocket)[source]

Listens for messages from the websocket and sends them to the process.

property logger
async send_process_output_to_websocket(run_id: int, process_manager: ProcessManager, websocket: WebSocket)[source]

Reads and forwards process output to the websocket client.

set_logger()[source]