Skip to content

Commit

Permalink
Distributed cache feature using Redis (#518)
Browse files Browse the repository at this point in the history
* [add] initial draft for distributed caching

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [mod] Simplify Model schema for distributed cache management.

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [mod] update expiry time for keys when accessed or saved

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [add] NoOpEviction mechanism to avoid redundant calls to redis

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [add] unit tests for distributed cache

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [add] temporary directory for sqlite

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [mod] skip adding scalar ids to cache if NoOpEviction is used

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [add] documentation, example code and refactoring

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [refactor] added docstrings, consistent quotes, removed literal comparison

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [refactor] removed unused imports

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [fix] add lazy import for redis in distributed_cache.py

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [add] `pylint: disable=wrong-import-position`

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [refactor] grouped import statements for distributed cache

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [refactor] revert `get_data_manager` and `manager_factory` signatures to include memory cache config

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [mod] add default values to memory cache params

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [mod] add param to set `mammemory-samples` config for redis cache

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [mod] updated cache config for testing lru cache

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [del] removed `test-lru-cache`

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [add] description for `maxmemory_samples` param

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [add] unit test for validating ttl configuration

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [add] unit tests for validating cache configuration, ttl access

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

* [add] unit test to validate str based access for eviction base in `get_data_manager`

Signed-off-by: Anurag Wagh <a9raag@gmail.com>

---------

Signed-off-by: Anurag Wagh <a9raag@gmail.com>
  • Loading branch information
a9raag authored Aug 17, 2023
1 parent d6d484d commit c2deaec
Show file tree
Hide file tree
Showing 9 changed files with 658 additions and 151 deletions.
73 changes: 73 additions & 0 deletions examples/eviction/distributed_eviction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from gptcache import Cache
from gptcache.embedding import Onnx

from gptcache.manager.eviction import EvictionBase

from gptcache.manager import get_data_manager, CacheBase, VectorBase, manager_factory


def get_data_manager_example():
"""
This example shows how to create a data manager with a mongo as a scalar storage, faiss vector base,
and redis eviction base.
This type of configuration can be used to scale GPTCache horizontally.
Where keys will be maintained in redis key-value store instead of in-memory.
The eviction of the keys will be handled based on the eviction policy of redis.
"""
onnx = Onnx()
data_manager = get_data_manager(cache_base=CacheBase("mongo", url="mongodb://localhost:27017/"),
vector_base=VectorBase("faiss", dimension=onnx.dimension),
eviction_base=EvictionBase("redis",
maxmemory="100mb",
policy="allkeys-lru",
ttl=100))

cache = Cache()
cache.init(data_manager=data_manager)
question = "What is github?"
answer = "Online platform for version control and code collaboration."
embedding = onnx.to_embeddings(question)
cache.import_data([question], [answer], [embedding])


def get_manager_example_redis_only():
"""
Note: Since, `RedisScalarStorage` can be configured to internally handle the ttl of the keys and their eviction.
In this scenario, `no_op_eviction` is used as the eviction base. It will not add any keys or update their ttls.
This example shows how to create a data manager with a redis as a scalar storage, as well as eviction base.
This type of configuration can be used to scale GPTCache horizontally.
Where keys will be maintained in redis key-value store instead of in-memory.
The eviction of the keys will be handled based on the eviction policy of redis.
"""
onnx = Onnx()
data_manager = get_data_manager(cache_base=CacheBase("redis", maxmemory="100mb", policy="allkeys-lru", ttl=100),
vector_base=VectorBase("faiss", dimension=onnx.dimension),
eviction_base=EvictionBase("no_op_eviction"))

cache = Cache()
cache.init(data_manager=data_manager)
question = "What is github?"
answer = "Online platform for version control and code collaboration."
embedding = onnx.to_embeddings(question)
cache.import_data([question], [answer], [embedding])


def manager_factory_example():
onnx = Onnx()
data_manager = manager_factory("redis,faiss",
eviction_manager="redis",
scalar_params={"url": "redis://localhost:6379"},
vector_params={"dimension": onnx.dimension},
eviction_params={"maxmemory": "100mb",
"policy": "allkeys-lru",
"ttl": 1}
)

cache = Cache()
cache.init(data_manager=data_manager)
question = "What is github?"
answer = "Online platform for version control and code collaboration."
embedding = onnx.to_embeddings(question)
cache.import_data([question], [answer], [embedding])
45 changes: 23 additions & 22 deletions gptcache/manager/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import requests

from gptcache.manager.eviction import EvictionBase
from gptcache.manager.eviction.distributed_cache import NoOpEviction
from gptcache.manager.eviction_manager import EvictionManager
from gptcache.manager.object_data.base import ObjectBase
from gptcache.manager.scalar_data.base import (
Expand All @@ -30,11 +31,11 @@ def save(self, question, answer, embedding_data, **kwargs):

@abstractmethod
def import_data(
self,
questions: List[Any],
answers: List[Any],
embedding_datas: List[Any],
session_ids: List[Optional[str]],
self,
questions: List[Any],
answers: List[Any],
embedding_datas: List[Any],
session_ids: List[Optional[str]],
):
pass

Expand Down Expand Up @@ -213,37 +214,37 @@ class SSDataManager(DataManager):
:type s: CacheStorage
:param v: VectorBase to manager the vector data, it can be generated with :meth:`gptcache.manager.VectorBase`.
:type v: VectorBase
:param max_size: the max size for the cache, defaults to 1000.
:type max_size: int
:param clean_size: the size to clean up, defaults to `max_size * 0.2`.
:type clean_size: int
:param eviction: The eviction policy, it is support "LRU" and "FIFO" now, and defaults to "LRU".
:type eviction: str
:param o: ObjectBase to manager the object data, it can be generated with :meth:`gptcache.manager.ObjectBase`.
:type o: ObjectBase
:param e: EvictionBase to manager the eviction data, it can be generated with :meth:`gptcache.manager.EvictionBase`.
:type e: EvictionBase
"""

def __init__(
self,
s: CacheStorage,
v: VectorBase,
o: Optional[ObjectBase],
e: Optional[EvictionBase],
max_size,
clean_size,
policy="LRU",
policy="LRU"
):
self.max_size = max_size
self.clean_size = clean_size
self.s = s
self.v = v
self.o = o
self.eviction_manager = EvictionManager(self.s, self.v)
self.eviction_base = EvictionBase(
name="memory",
policy=policy,
maxsize=max_size,
clean_size=clean_size,
on_evict=self._clear,
)
self.eviction_base.put(self.s.get_ids(deleted=False))
if e is None:
e = EvictionBase(name="memory",
maxsize=max_size,
clean_size=clean_size,
policy=policy,
on_evict=self._clear)
self.eviction_base = e

if not isinstance(self.eviction_base, NoOpEviction):
# if eviction manager is no op redis, we don't need to put data into eviction base
self.eviction_base.put(self.s.get_ids(deleted=False))

def _clear(self, marked_keys):
self.eviction_manager.soft_evict(marked_keys)
Expand Down
121 changes: 121 additions & 0 deletions gptcache/manager/eviction/distributed_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# pylint: disable=wrong-import-position
from abc import ABC, abstractmethod
from typing import List

from gptcache.utils import import_redis
from gptcache.manager.eviction.base import EvictionBase

import_redis()
import redis
from redis_om import get_redis_connection


class DistributedEviction(EvictionBase, ABC):
"""
Base class for Distributed Eviction Strategy.
"""

@abstractmethod
def put(self, objs: List[str]):
pass

@abstractmethod
def get(self, obj: str):
pass

@property
@abstractmethod
def policy(self) -> str:
pass


class RedisCacheEviction(DistributedEviction, ABC):
"""eviction: Distributed Cache Eviction Strategy using Redis.
:param host: the host of redis
:type host: str
:param port: the port of redis
:type port: int
:param policy: eviction strategy policy of redis such as allkeys-lru, volatile-lru, allkeys-random, volatile-random, etc.
refer https://redis.io/docs/reference/eviction/ for more information.
:type policy: str
:param maxsize: the maxsize of cache data
:type maxsize: int
:param on_evict: the function for cleaning the data in the store
:type on_evict: Callable[[List[Any]], None]
:param maxmemory: the maxmemory of redis
:type maxmemory: str
:param global_key_prefix: the global key prefix
:type global_key_prefix: str
:param ttl: the ttl of the cache data
:type ttl: int
:param maxmemory_samples: Number of keys to sample when evicting keys
:type maxmemory_samples: int
:param kwargs: the kwargs
:type kwargs: Any
"""

def __init__(self,
host="localhost",
port=6379,
maxmemory: str = None,
policy: str = None,
global_key_prefix="gptcache",
ttl: int = None,
maxmemory_samples: int = None,
**kwargs):
self._redis = get_redis_connection(host=host, port=port, **kwargs)
if maxmemory:
self._redis.config_set("maxmemory", maxmemory)
if maxmemory_samples:
self._redis.config_set("maxmemory-samples", maxmemory_samples)
if policy:
self._redis.config_set("maxmemory-policy", policy)
self._policy = policy.lower()

self._global_key_prefix = global_key_prefix
self._ttl = ttl

def _create_key(self, key: str) -> str:
return f"{self._global_key_prefix}:evict:{key}"

def put(self, objs: List[str], expire=False):
ttl = self._ttl if expire else None
for key in objs:
self._redis.set(self._create_key(key), "True", ex=ttl)

def get(self, obj: str):

try:
value = self._redis.get(self._create_key(obj))
# update key expire time when accessed
if self._ttl:
self._redis.expire(self._create_key(obj), self._ttl)
return value
except redis.RedisError:
print(f"Error getting key {obj} from cache")
return None

@property
def policy(self) -> str:
return self._policy


class NoOpEviction(EvictionBase):
"""eviction: No Op Eviction Strategy. This is used when Eviction is managed internally
by the Databases such as Redis or memcached and no eviction is required to perform.
"""

@property
def policy(self) -> str:
return ""

def __init__(self, **kwargs):
pass

def put(self, objs: List[str]):
pass

def get(self, obj: str):
pass
17 changes: 14 additions & 3 deletions gptcache/manager/eviction/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def __init__(self):
@staticmethod
def get(
name: str,
policy: str,
maxsize: int,
policy: str = "LRU",
maxsize: int = 1000,
clean_size: int = 0,
on_evict: Callable[[List[Any]], None] = None,
**kwargs
Expand All @@ -32,6 +32,17 @@ def get(
eviction_base = MemoryCacheEviction(
policy, maxsize, clean_size, on_evict, **kwargs
)
return eviction_base
if name == "redis":
from gptcache.manager.eviction.distributed_cache import RedisCacheEviction
if policy == "LRU":
policy = None
eviction_base = RedisCacheEviction(policy=policy, **kwargs)
return eviction_base
if name == "no_op_eviction":
from gptcache. manager.eviction.distributed_cache import NoOpEviction
eviction_base = NoOpEviction()
return eviction_base

else:
raise NotFoundError("eviction base", name)
return eviction_base
12 changes: 6 additions & 6 deletions gptcache/manager/eviction/memory_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ class MemoryCacheEviction(EvictionBase):
"""

def __init__(
self,
policy: str,
maxsize: int,
clean_size: int = 0,
on_evict: Callable[[List[Any]], None] = None,
**kwargs,
self,
policy: str = "LRU",
maxsize: int = 1000,
clean_size: int = 0,
on_evict: Callable[[List[Any]], None] = None,
**kwargs,
):
self._policy = policy.upper()
if self._policy == "LRU":
Expand Down
Loading

0 comments on commit c2deaec

Please sign in to comment.