chatsky_ui.services package
chatsky_ui.services.index module
Index service
The Index service is responsible for indexing the user bot’s conditions, responses, and services. By indexing the project, the Index service creates an in-memory representation that can be quickly accessed when needed.
- class chatsky_ui.services.index.Index[source]
Bases:
object
- get_services() dict [source]
Get indexed services.
- Example:
>>> get_services() { "is_upper_case": {"type": "condition", "lineno": 3}, "say_hi": {"type": "response", "lineno": 5} }
- async indexit(service_name: str, type_: str, lineno: int) None [source]
Add service info to the index using indexit_all method.
- async indexit_all(services_names: List[str], types: List[str], linenos: List[int]) None [source]
Index multiple services.
The index is added to the index in the form: {service_name: {“type”:
type_
, “lineno”: lineno}}.- Args:
services_names: list of service names types: list of service types (“condition”, “response”, “service”) linenos: list of service starting line numbers according to its place in the file.
- Raises:
FileNotFoundError: if the index file doesn’t exist
- Example:
>>> services_names = ["is_upper_case", "say_hi"] >>> types = ["condition", "response"] >>> linenos = [3, 5] >>> await indexit_all(services_names, types, linenos) { "is_upper_case": {"type": "condition", "lineno": 3}, "say_hi": {"type": "response", "lineno": 5} }
- Returns:
None
- property logger
- property path
chatsky_ui.services.json_converter module
chatsky_ui.services.process module
Process classes.
Classes for build and run processes.
- class chatsky_ui.services.process.BuildProcess(id_: int, preset_end_status: str = '')[source]
Bases:
Process
Process for converting a frontned graph to a DFF script.
- class chatsky_ui.services.process.Process(id_: int, preset_end_status: str = '')[source]
Bases:
ABC
Base for build and run processes.
- async check_status() Status [source]
Returns the process current status.
- Returns:
- Status.NULL: When a process is initiated but not started yet. This condition is unusual and typically
indicates incorrect usage or a process misuse in backend logic.
Status.ALIVE: process is alive and ready to communicate
Status.RUNNING: process is still trying to get alive. no communication
Status.COMPLETED: returncode is 0
Status.FAILED: returncode is 1
Status.STOPPED: returncode is -15
Status.FAILED_WITH_UNEXPECTED_CODE: failed with other returncode
- async get_full_info(attributes: list) Dict[str, Any] [source]
Get the values of the attributes mentioned in the list.
- Args:
attributes (list): A list of attributes to get the values of.
- Returns:
dict: A dictionary containing the values of the attributes mentioned in the list.
- async is_alive() bool [source]
Checks if the process is alive by writing to stdin andreading its stdout.
- async periodically_check_status() None [source]
Periodically checks the process status and updates the database.
- class chatsky_ui.services.process.RunProcess(id_: int, build_id: int, preset_end_status: str = '')[source]
Bases:
Process
Process for running a DFF pipeline.
chatsky_ui.services.process_manager module
Process manager
Process managers are used to manage run and build processes. They are responsible for starting, stopping, updating, and checking status of processes. Processes themselves are stored in the processes dictionary of process managers.
- class chatsky_ui.services.process_manager.BuildManager[source]
Bases:
ProcessManager
Process manager for converting a frontned graph to a DFF script.
- async check_status(id_, index, *args, **kwargs)[source]
Checks the build “id_” process status by calling the periodically_check_status method of the process.
This updates the process status in the database every 2 seconds. The index is refreshed after the build is done/failed.
- async fetch_build_logs(build_id: int, offset: int, limit: int) List[str] | None [source]
Returns the logs of one build according to its id.
Number of loglines returned is based on offset as the start line and limited by limit lines.
- async get_build_info(id_: int, run_manager: RunManager) Dict[str, Any] | None [source]
Returns metadata of a specific build process identified by its unique ID.
- Args:
id_
(int): the id of the buildrun_manager
(RunManager): the run manager to use for getting all runs of this build
- async get_full_info(offset: int, limit: int, path: Path | None = None) List[Dict[str, Any]] [source]
Returns metadata of
limit
number of processes, starting from theoffset
process.
- async get_full_info_with_runs_info(run_manager: RunManager, offset: int, limit: int) List[Dict[str, Any]] [source]
Returns metadata of
limit
number of processes, starting from the ``offset``th process.- Args:
run_manager (RunManager): the run manager to use for getting all runs of this build
- async start(preset: Preset) int [source]
Starts a new build process.
Increases the maximum existing id by 1 and assigns it to the new process. Starts the process and appends it to the processes list.
- Args:
preset (Preset): the preset to use among (“success”, “failure”, “loop”)
- Returns:
int: the id of the new started process
- class chatsky_ui.services.process_manager.ProcessManager[source]
Bases:
object
Base for build and run process managers.
- async check_status(id_: int, *args, **kwargs) None [source]
Checks the status of the process with the given id by calling the periodically_check_status method of the process.
This updates the process status in the database every 2 seconds.
- async fetch_process_logs(id_: int, offset: int, limit: int, path: Path) List[str] | None [source]
Returns the logs of one process according to its id. If the process is not found, returns None.
- async get_full_info(offset: int, limit: int, path: Path) List[Dict[str, Any]] [source]
Returns metadata of
limit
number of processes, starting from the ``offset``th process.
- async get_process_info(id_: int, path: Path) Dict[str, Any] | None [source]
Returns metadata of a specific process identified by its unique ID.
- async get_status(id_: int) Status [source]
Checks the status of the process with the given id by calling the check_status method of the process.
- property logger
- class chatsky_ui.services.process_manager.RunManager[source]
Bases:
ProcessManager
Process manager for running a DFF pipeline.
- async fetch_run_logs(run_id: int, offset: int, limit: int) List[str] | None [source]
Returns the logs of one run according to its id.
Number of loglines returned is based on offset as the start line and limited by limit lines.
- async get_full_info(offset: int, limit: int, path: Path | None = None) List[Dict[str, Any]] [source]
Returns metadata of
limit
number of run processes, starting from the ``offset``th process.
- async get_run_info(id_: int) Dict[str, Any] | None [source]
Returns metadata of a specific run process identified by its unique ID.
- async start(build_id: int, preset: Preset) int [source]
Starts a new run process.
Increases the maximum existing id by 1 and assigns it to the new process. Starts the process and appends it to the processes list.
- Args:
build_id (int): the build id to run preset (Preset): the preset to use among (“success”, “failure”, “loop”)
- Returns:
int: the id of the new started process
chatsky_ui.services.websocket_manager module
Websocket class for controling websocket operations.
- class chatsky_ui.services.websocket_manager.WebSocketManager[source]
Bases:
object
Controls websocket operations connect, disconnect, check status, and communicate.
- async connect(websocket: WebSocket)[source]
Accepts the websocket connection and marks it as active connection.
- disconnect(websocket: WebSocket)[source]
Cancels pending tasks of the open websocket process and removes it from active connections.
- async forward_websocket_messages_to_process(run_id: int, process_manager: ProcessManager, websocket: WebSocket)[source]
Listens for messages from the websocket and sends them to the process.
- property logger
- async send_process_output_to_websocket(run_id: int, process_manager: ProcessManager, websocket: WebSocket)[source]
Reads and forwards process output to the websocket client.