Source code for chatsky_ui.services.process

"""
Process classes.
-----------------

Classes for build and run processes.
"""
import asyncio
import logging
import os
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional

from dotenv import load_dotenv

from chatsky_ui.core.config import settings
from chatsky_ui.core.logger_config import get_logger, setup_logging
from chatsky_ui.db.base import read_conf, write_conf
from chatsky_ui.schemas.process_status import Status

load_dotenv()

GRACEFUL_TERMINATION_TIMEOUT = float(os.getenv("GRACEFUL_TERMINATION_TIMEOUT", 2))
PING_PONG_TIMEOUT = float(os.getenv("PING_PONG_TIMEOUT", 0.5))


def _map_to_str(params: Dict[str, Any]):
    for k, v in params.items():
        if isinstance(v, datetime):
            params[k] = v.strftime("%Y-%m-%dT%H:%M:%S")
        elif isinstance(v, Path):
            params[k] = str(v)


[docs]class Process(ABC): """Base for build and run processes.""" def __init__(self, id_: int, preset_end_status: str = ""): self.id: int = id_ self.preset_end_status: str = preset_end_status self.status: Status = Status.NULL self.timestamp: datetime = datetime.now() self.log_path: Path self.lock: asyncio.Lock = asyncio.Lock() self.process: asyncio.subprocess.Process # pylint: disable=no-member #TODO: is naming ok? self.logger: logging.Logger
[docs] async def start(self, cmd_to_run: str) -> None: """Starts an asyncronous process with the given command.""" self.process = await asyncio.create_subprocess_exec( *cmd_to_run.split(), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, )
[docs] async def get_full_info(self, attributes: list) -> Dict[str, Any]: """ Get the values of the attributes mentioned in the list. Args: attributes (list): A list of attributes to get the values of. Returns: dict: A dictionary containing the values of the attributes mentioned in the list. """ await self.check_status() info = {key: getattr(self, key) for key in self.__dict__ if key in attributes} if "status" in attributes: info["status"] = self.status.value return info
[docs] @abstractmethod async def update_db_info(self): raise NotImplementedError
[docs] async def periodically_check_status(self) -> None: """Periodically checks the process status and updates the database.""" while True: await self.update_db_info() # check status and update db self.logger.info("Status of process '%s': %s", self.id, self.status) if self.status in [Status.STOPPED, Status.COMPLETED, Status.FAILED]: break await asyncio.sleep(2) # TODO: ?sleep time shouldn't be constant
[docs] async def check_status(self) -> Status: """Returns the process current status. Returns: - Status.NULL: When a process is initiated but not started yet. This condition is unusual and typically indicates incorrect usage or a process misuse in backend logic. - Status.ALIVE: process is alive and ready to communicate - Status.RUNNING: process is still trying to get alive. no communication - Status.COMPLETED: returncode is 0 - Status.FAILED: returncode is 1 - Status.STOPPED: returncode is -15 - Status.FAILED_WITH_UNEXPECTED_CODE: failed with other returncode """ if self.process is None: self.status = Status.NULL # if process is already alive, don't interrupt potential open channels by checking status periodically. elif self.process.returncode is None: if self.status == Status.ALIVE: self.status = Status.ALIVE else: if await self.is_alive(): self.status = Status.ALIVE else: self.status = Status.RUNNING elif self.process.returncode == 0: self.status = Status.COMPLETED elif self.process.returncode == 1: self.status = Status.FAILED elif self.process.returncode == -15: self.status = Status.STOPPED else: self.logger.error( "Unexpected code was returned: '%s'. A non-zero return code indicates an error.", self.process.returncode, ) self.status = Status.FAILED_WITH_UNEXPECTED_CODE if self.status not in [Status.NULL, Status.RUNNING, Status.ALIVE, Status.STOPPED]: stdout, stderr = await self.process.communicate() if stdout: self.logger.info(f"[stdout]\n{stdout.decode()}") if stderr: self.logger.error(f"[stderr]\n{stderr.decode()}") return self.status
[docs] async def stop(self) -> None: """Stops the process. Raises: ProcessLookupError: If the process doesn't exist or already exited. RuntimeError: If the process has not started yet. """ if self.process is None: # Check if a process has been started self.logger.error("Cannot stop a process '%s' that has not started yet.", self.id) raise RuntimeError try: self.logger.debug("Terminating process '%s'", self.id) self.process.terminate() try: await asyncio.wait_for(self.process.wait(), timeout=GRACEFUL_TERMINATION_TIMEOUT) self.logger.debug("Process '%s' was gracefully terminated.", self.id) except asyncio.TimeoutError: self.process.kill() await self.process.wait() self.logger.debug("Process '%s' was forcefully killed.", self.id) self.logger.debug("Process returencode '%s' ", self.process.returncode) except ProcessLookupError as exc: self.logger.error("Process '%s' not found. It may have already exited.", self.id) raise ProcessLookupError from exc
[docs] async def read_stdout(self) -> bytes: """Reads the stdout of the process for communication.""" async with self.lock: if self.process is None: self.logger.error("Cannot read stdout from a process '%s' that has not started yet.", self.id) raise RuntimeError if self.process.stdout is None: raise RuntimeError(f"The process '{self.id}' stdout is None. It might be still running.") return await self.process.stdout.readline()
[docs] async def write_stdin(self, message: bytes) -> None: """Writes a message to the stdin of the process for communication.""" if self.process is None: self.logger.error("Cannot write into stdin of a process '%s' that has not started yet.", self.id) raise RuntimeError if self.process.stdin is None: raise RuntimeError(f"The process '{self.id}' stdin is None. It might be still running.") self.process.stdin.write(message) await self.process.stdin.drain()
[docs] async def is_alive(self) -> bool: """Checks if the process is alive by writing to stdin andreading its stdout.""" message = b"Hi\n" try: # Attempt to write and read from the process with a timeout. await self.write_stdin(message) output = await asyncio.wait_for(self.read_stdout(), timeout=PING_PONG_TIMEOUT) if not output: return False self.logger.debug("Process is alive and output afer communication is: %s", output.decode()) return True except asyncio.exceptions.TimeoutError: self.logger.debug("Process is still running.") return False
[docs]class RunProcess(Process): """Process for running a DFF pipeline.""" def __init__(self, id_: int, build_id: int, preset_end_status: str = ""): super().__init__(id_, preset_end_status) self.build_id: int = build_id self.log_path: Path = setup_logging("runs", self.id, self.timestamp) self.logger = get_logger(str(id_), self.log_path)
[docs] async def get_full_info(self, attributes: Optional[list] = None) -> Dict[str, Any]: if attributes is None: attributes = ["id", "preset_end_status", "status", "timestamp", "log_path", "build_id"] return await super().get_full_info(attributes)
[docs] async def update_db_info(self) -> None: # save current run info into runs_path self.logger.debug("Updating db run info") runs_conf = await read_conf(settings.runs_path) run_params = await self.get_full_info() _map_to_str(run_params) for run in runs_conf: if run.id == run_params["id"]: # type: ignore for key, value in run_params.items(): setattr(run, key, value) break else: runs_conf.append(run_params) await write_conf(runs_conf, settings.runs_path) # save current run id into the correspoinding build in builds_path builds_conf = await read_conf(settings.builds_path) for build in builds_conf: if build.id == run_params["build_id"]: # type: ignore if run_params["id"] not in build.run_ids: # type: ignore build.run_ids.append(run_params["id"]) # type: ignore break await write_conf(builds_conf, settings.builds_path)
[docs]class BuildProcess(Process): """Process for converting a frontned graph to a DFF script.""" def __init__(self, id_: int, preset_end_status: str = ""): super().__init__(id_, preset_end_status) self.run_ids: List[int] = [] self.log_path: Path = setup_logging("builds", self.id, self.timestamp) self.logger = get_logger(str(id_), self.log_path)
[docs] async def get_full_info(self, attributes: Optional[list] = None) -> Dict[str, Any]: if attributes is None: attributes = ["id", "preset_end_status", "status", "timestamp", "log_path", "run_ids"] return await super().get_full_info(attributes)
[docs] async def update_db_info(self) -> None: # save current build info into builds_path builds_conf = await read_conf(settings.builds_path) build_params = await self.get_full_info() _map_to_str(build_params) for build in builds_conf: if build.id == build_params["id"]: # type: ignore for key, value in build_params.items(): setattr(build, key, value) break else: builds_conf.append(build_params) await write_conf(builds_conf, settings.builds_path)