import asyncio
from typing import Any, Dict, List, Optional, Union
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, WebSocket, WebSocketException, status
from chatsky_ui.api import deps
from chatsky_ui.schemas.pagination import Pagination
from chatsky_ui.schemas.preset import Preset
from chatsky_ui.services.index import Index
from chatsky_ui.services.process_manager import BuildManager, ProcessManager, RunManager
from chatsky_ui.services.websocket_manager import WebSocketManager
router = APIRouter()
async def _stop_process(id_: int, process_manager: ProcessManager, process="run") -> Dict[str, str]:
"""Stops a `build` or `run` process with the given id."""
try:
await process_manager.stop(id_)
except (RuntimeError, ProcessLookupError) as e:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Process not found. It may have already exited or not started yet. Please check logs.",
) from e
process_manager.logger.info("%s process '%s' has stopped", process.capitalize(), id_)
return {"status": "ok"}
async def _check_process_status(id_: int, process_manager: ProcessManager) -> Dict[str, str]:
"""Checks the status of a `build` or `run` process with the given id."""
if id_ not in process_manager.processes:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Process not found. It may have already exited.",
)
process_status = await process_manager.get_status(id_)
return {"status": process_status.value}
[docs]@router.post("/build/start", status_code=201)
async def start_build(
preset: Preset,
background_tasks: BackgroundTasks,
build_manager: BuildManager = Depends(deps.get_build_manager),
index: Index = Depends(deps.get_index),
) -> Dict[str, Union[str, int]]:
"""Starts a `build` process with the given preset.
This runs a background task to check the status of the process every 2 seconds.
Args:
preset (Preset): The preset to set the build process for. Must be among ("success", "failure", "loop")
Returns:
{"status": "ok", "build_id": build_id}: in case of **starting** the build process successfully.
"""
await asyncio.sleep(preset.wait_time)
build_id = await build_manager.start(preset)
background_tasks.add_task(build_manager.check_status, build_id, index)
build_manager.logger.info("Build process '%s' has started", build_id)
return {"status": "ok", "build_id": build_id}
[docs]@router.get("/build/stop/{build_id}", status_code=200)
async def stop_build(*, build_id: int, build_manager: BuildManager = Depends(deps.get_build_manager)) -> Dict[str, str]:
"""Stops a `build` process with the given id.
Args:
build_id (int): The id of the process to stop.
build_id (BuildManager): The process manager dependency to stop the process with.
Raises:
HTTPException: With status code 404 if the process is not found.
Returns:
{"status": "ok"}: in case of stopping a process successfully.
"""
return await _stop_process(build_id, build_manager, process="build")
[docs]@router.get("/build/status/{build_id}", status_code=200)
async def check_build_status(
*, build_id: int, build_manager: BuildManager = Depends(deps.get_build_manager)
) -> Dict[str, str]:
"""Checks the status of a `build` process with the given id.
Args:
build_id (int): The id of the process to check.
build_manager (BuildManager): The process manager dependency to check the process with.
Raises:
HTTPException: With status code 404 if the process is not found.
Returns:
{"status": "completed"}: in case of a successfully completed process.
{"status": "running"}: in case of a still running process.
{"status": "stopped"}: in case of a stopped process.
{"status": "failed"}: in case of a failed-to-run process.
"""
return await _check_process_status(build_id, build_manager)
[docs]@router.get("/builds", response_model=Optional[Union[list, dict]], status_code=200)
async def check_build_processes(
build_id: Optional[int] = None,
build_manager: BuildManager = Depends(deps.get_build_manager),
run_manager: RunManager = Depends(deps.get_run_manager),
pagination: Pagination = Depends(),
) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]:
"""Checks the status of all `build` processes and returns them along with their runs info.
The offset and limit parameters can be used to paginate the results.
Args:
build_id (Optional[int]): The id of the process to check. If not specified, all processes will be returned.
"""
if build_id is not None:
return await build_manager.get_build_info(build_id, run_manager)
else:
return await build_manager.get_full_info_with_runs_info(
run_manager, offset=pagination.offset(), limit=pagination.limit
)
[docs]@router.get("/builds/logs/{build_id}", response_model=Optional[list], status_code=200)
async def get_build_logs(
build_id: int, build_manager: BuildManager = Depends(deps.get_build_manager), pagination: Pagination = Depends()
) -> Optional[List[str]]:
"""Gets the logs of a specific `build` process.
The offset and limit parameters can be used to paginate the results.
"""
if build_id is not None:
return await build_manager.fetch_build_logs(build_id, pagination.offset(), pagination.limit)
[docs]@router.post("/run/start/{build_id}", status_code=201)
async def start_run(
*,
build_id: int,
preset: Preset,
background_tasks: BackgroundTasks,
run_manager: RunManager = Depends(deps.get_run_manager)
) -> Dict[str, Union[str, int]]:
"""Starts a `run` process with the given preset.
This runs a background task to check the status of the process every 2 seconds.
Args:
build_id (int): The id of the build process to start running.
preset (Preset): The preset to set the build process for. Must be among ("success", "failure", "loop")
Returns:
{"status": "ok", "build_id": run_id}: in case of **starting** the run process successfully.
"""
await asyncio.sleep(preset.wait_time)
run_id = await run_manager.start(build_id, preset)
background_tasks.add_task(run_manager.check_status, run_id)
run_manager.logger.info("Run process '%s' has started", run_id)
return {"status": "ok", "run_id": run_id}
[docs]@router.get("/run/stop/{run_id}", status_code=200)
async def stop_run(*, run_id: int, run_manager: RunManager = Depends(deps.get_run_manager)) -> Dict[str, str]:
"""Stops a `run` process with the given id.
Args:
run_id (int): The id of the process to stop.
run_manager (RunManager): The process manager dependency to stop the process with.
Raises:
HTTPException: With status code 404 if the process is not found.
Returns:
{"status": "ok"}: in case of stopping a process successfully.
"""
return await _stop_process(run_id, run_manager, process="run")
[docs]@router.get("/run/status/{run_id}", status_code=200)
async def check_run_status(*, run_id: int, run_manager: RunManager = Depends(deps.get_run_manager)) -> Dict[str, Any]:
"""Checks the status of a `run` process with the given id.
Args:
build_id (int): The id of the process to check.
run_manager (RunManager): The process manager dependency to check the process with.
Raises:
HTTPException: With status code 404 if the process is not found.
Returns:
{"status": "alive"}: in case of a successfully run process. Now it is able to communicate.
{"status": "running"}: in case of a still running process.
{"status": "stopped"}: in case of a stopped process.
{"status": "failed"}: in case of a failed-to-run process.
"""
return await _check_process_status(run_id, run_manager)
[docs]@router.get("/runs", response_model=Optional[Union[list, dict]], status_code=200)
async def check_run_processes(
run_id: Optional[int] = None,
run_manager: RunManager = Depends(deps.get_run_manager),
pagination: Pagination = Depends(),
) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]:
"""Checks the status of all `run` processes and returns them.
The offset and limit parameters can be used to paginate the results.
Args:
run_id (Optional[int]): The id of the process to check. If not specified, all processes will be returned.
"""
if run_id is not None:
return await run_manager.get_run_info(run_id)
else:
return await run_manager.get_full_info(offset=pagination.offset(), limit=pagination.limit)
[docs]@router.get("/runs/logs/{run_id}", response_model=Optional[list], status_code=200)
async def get_run_logs(
run_id: int, run_manager: RunManager = Depends(deps.get_run_manager), pagination: Pagination = Depends()
) -> Optional[List[str]]:
"""Gets the logs of a specific `run` process.
The offset and limit parameters can be used to paginate the results.
"""
if run_id is not None:
return await run_manager.fetch_run_logs(run_id, pagination.offset(), pagination.limit)
[docs]@router.websocket("/run/connect")
async def connect(
websocket: WebSocket,
websocket_manager: WebSocketManager = Depends(deps.get_websocket_manager),
run_manager: RunManager = Depends(deps.get_run_manager),
) -> None:
"""Establishes a WebSocket connection to communicate with an alive run process identified by its 'run_id'.
The WebSocket URL should adhere to the format: /bot/run/connect?run_id=<run_id>.
"""
run_manager.logger.debug("Connecting to websocket")
run_id = websocket.query_params.get("run_id")
# Validate run_id
if run_id is None:
run_manager.logger.error("No run_id provided")
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
if not run_id.isdigit():
run_manager.logger.error("A non-digit run run_id provided")
raise WebSocketException(code=status.WS_1003_UNSUPPORTED_DATA)
run_id = int(run_id)
if run_id not in run_manager.processes:
run_manager.logger.error("process with run_id '%s' exited or never existed", run_id)
raise WebSocketException(code=status.WS_1014_BAD_GATEWAY)
await websocket_manager.connect(websocket)
run_manager.logger.info("Websocket for run process '%s' has been opened", run_id)
await websocket.send_text("Start chatting")
output_task = asyncio.create_task(
websocket_manager.send_process_output_to_websocket(run_id, run_manager, websocket)
)
input_task = asyncio.create_task(
websocket_manager.forward_websocket_messages_to_process(run_id, run_manager, websocket)
)
# Wait for either task to finish
_, websocket_manager.pending_tasks[websocket] = await asyncio.wait(
[output_task, input_task],
return_when=asyncio.FIRST_COMPLETED,
)
websocket_manager.disconnect(websocket)