Source code for chatsky.context_storages.redis

"""
Redis
-----
The Redis module provides a Redis-based version of the :py:class:`.DBContextStorage` class.
This class is used to store and retrieve context data in a Redis.
It allows Chatsky to easily store and retrieve context data in a format that is highly scalable
and easy to work with.

Redis is an open-source, in-memory data structure store that is known for its
high performance and scalability. It stores data in key-value pairs and supports a variety of data
structures such as strings, hashes, lists, sets, and more.
Additionally, Redis can be used as a cache, message broker, and database, making it a versatile
and powerful choice for data storage and management.
"""

import json
from typing import Hashable

try:
    from redis.asyncio import Redis

    redis_available = True
except ImportError:
    redis_available = False

from chatsky.core import Context

from .database import DBContextStorage, threadsafe_method
from .protocol import get_protocol_install_suggestion


[docs]class RedisContextStorage(DBContextStorage): """ Implements :py:class:`.DBContextStorage` with `redis` as the database backend. :param path: Database URI string. Example: `redis://user:password@host:port`. """ def __init__(self, path: str): DBContextStorage.__init__(self, path) if not redis_available: install_suggestion = get_protocol_install_suggestion("redis") raise ImportError("`redis` package is missing.\n" + install_suggestion) self._redis = Redis.from_url(self.full_path)
[docs] @threadsafe_method async def contains_async(self, key: Hashable) -> bool: return bool(await self._redis.exists(str(key)))
[docs] @threadsafe_method async def set_item_async(self, key: Hashable, value: Context): value = Context.model_validate(value) await self._redis.set(str(key), value.model_dump_json())
[docs] @threadsafe_method async def get_item_async(self, key: Hashable) -> Context: result = await self._redis.get(str(key)) if result: result_dict = json.loads(result.decode("utf-8")) return Context.model_validate(result_dict) raise KeyError(f"No entry for key {key}.")
[docs] @threadsafe_method async def del_item_async(self, key: Hashable): await self._redis.delete(str(key))
[docs] @threadsafe_method async def len_async(self) -> int: return await self._redis.dbsize()
[docs] @threadsafe_method async def clear_async(self): await self._redis.flushdb()