Skip to content

Commit

Permalink
Scenario Cache init
Browse files Browse the repository at this point in the history
  • Loading branch information
SimFG committed Mar 24, 2023
1 parent f8713f8 commit 00f18e7
Show file tree
Hide file tree
Showing 26 changed files with 518 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,8 @@ dmypy.json

# Pyre type checker
.pyre/

.idea
**/data_map.txt
**/faiss.index
**/sqlite.db
38 changes: 38 additions & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# ChatGPTCache

[English](README-CN.md) | 中文

ChatGPT缓存系统主要用于缓存用户在使用ChatGPT的问答数据。这个系统带来两个好处:

1. 快速响应用户请求:相比于大模型推理,缓存系统中查找数据将具有更低的延迟,从而更快地响应用户请求。
2. 降低服务成本:目前大多数ChatGPT服务都是基于请求次数进行收费,如果用户请求命中缓存,就可以减少请求次数,从而降低服务成本。

## 🧐 系统流程

![ChatGPTCache Flow](design/ChatGPTCache.png)

系统的核心流程如上图所示:

1. 用户发送问题,该系统首先进行处理,将问题转换为向量形式进行查询,通过Embedding操作将问题向量化,作为Search操作的输入,在向量数据库中进行查询。
2. 如果查询结果存在,则将相关数据返回给用户。否则,进行下一步操作。
3. 用户请求转发至ChatGPT服务,得到返回数据,发送给用户。
4. 同时,对问答数据进行Embedding操作,将得到的向量插入到向量数据库中,以便用户下次查询时能够快速响应。

## 🤔 是否有必要使用缓存?

我认为有必要,理由如下:

- 基于ChatGPT开发的某些领域服务,许多问答具有一定的相似性。
- 对于一个用户,使用ChatGPT提出的一系列问题具有一定规律性,与其职业、生活习惯、性格等有一定关联。例如,程序员使用ChatGPT服务的可能性很大程度上与其工作有关。
- 如果您提供的ChatGPT服务面向大量用户群体,将其分为不同的类别,那么相同类别中的用户问的相关问题也有很大概率命中缓存,从而降低服务成本。

## 😵‍💫 系统难点

1. 如何对缓存数据进行Embedding操作
这部分涉及到两个问题:初始化数据来源以及数据转换过程的耗时问题。
- 对于不同场景的数据,其差异性很大。如果使用同一数据来源,缓存的命中率将大打折扣。可以采用两种方案:在使用缓存之前进行数据搜集,或者在系统初始阶段,只将数据插入到缓存系统进行Embedding训练。
- 数据转换的时间也是一个重要指标。如果缓存命中,其总体时间应该低于一次大模型推理的时间,否则系统将失去部分优势,影响用户体验。
2. 如何管理缓存数据
缓存数据管理的核心流程包括数据写入、搜索和清理。这要求接入的系统需要具备增量索引的能力,例如Milvus,也可以考虑使用轻量级的HNSW索引。数据清理可以保证缓存数据不会无限增长,同时也可以保证缓存查询的效率。
3. 如何评估缓存结果
从缓存中获取相应的结果列表后,需要使用模型对结果进行问答相似度匹配。如果相似度达到一定阈值,则直接将该回答返回给用户。否则,将请求转发至ChatGPT。
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# ChatGPT Cache

English | [中文](README-CN.md)

The ChatGPT Cache system is mainly used to cache the question-answer data of users in ChatGPT. This system brings two benefits:

1. Quick response to user requests: compared to large model inference, searching for data in the caching system will have lower latency, enabling faster response to user requests.
2. Reduced service costs: currently, most ChatGPT services are charged based on the number of requests. If user requests hit the cache, it can reduce the number of requests and thus lower service costs.

## 🧐 System flow

![ChatGPTCache Flow](design/ChatGPTCache.png)

The core process of the system is shown in the diagram above:

1. The user sends a question to the system, which first processes the question by converting it to a vector and querying it in the vector database using the Embedding operation.
2. If the query result exists, the relevant data is returned to the user. Otherwise, the system proceeds to the next step.
3. The user request is forwarded to the ChatGPT service, which returns the data and sends it to the user.
4. At the same time, the question-answer data is processed using the Embedding operation, and the resulting vector is inserted into the vector database for fast response to future user queries.

## 🤔 Is Cache necessary?

I believe it is necessary for the following reasons:

- Many question-answer pairs in certain domain services based on ChatGPT have a certain similarity.
- For a user, there is a certain regularity in the series of questions raised using ChatGPT, which is related to their occupation, lifestyle, personality, etc. For example, the likelihood of a programmer using ChatGPT services is largely related to their work.
- If your ChatGPT service targets a large user group, categorizing them can increase the probability of relevant questions being cached, thus reducing service costs.

## 😵‍💫 System Highlights

1. How to perform embedding operations on cached data
This part involves two issues: the source of initialization data and the time-consuming data conversion process.
- For different scenarios, the data can be vastly different. If the same data source is used, the hit rate of the cache will be greatly reduced. There are two possible solutions: collecting data before using the cache, or inserting data into the cache system for embedding training during the system's initialization phase.
- The time required for data conversion is also an important indicator. If the cache is hit, the overall time should be lower than the inference time of a large-scale model. Otherwise, the system will lose some advantages and reduce user experience.
2. How to manage cached data
The core process of managing cached data includes data writing, searching, and cleaning. This requires the system being integrated to have the ability of incremental indexing, such as Milvus, and lightweight HNSW index can also meet the requirements. Data cleaning can ensure that the cached data will not increase indefinitely, while also ensuring the efficiency of cache queries.
3. How to evaluate cached results
After obtaining the corresponding result list from the cache, the model needs to perform question-and-answer similarity matching on the results. If the similarity reaches a certain threshold, the answer will be returned directly to the user. Otherwise, the request will be forwarded to ChatGPT.
Binary file added design/ChatGPTCache.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
32 changes: 32 additions & 0 deletions example/map/map_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import os

from scenario_cache.cache.data_manager import MapDataManager
from scenario_cache.view import openai
from scenario_cache.core import cache


def run():
dirname, _ = os.path.split(os.path.abspath(__file__))
cache.init(data_manager=MapDataManager(dirname + "/data_map.txt"))
mock_messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "foo"}
]

# you should open it if you first run it
cache.data_manager.save("receiver the foo", cache.embedding_func({"messages": mock_messages}))
answer = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=mock_messages,
cache_context={
"search": {
"user": "foo"
}
},
)
print(answer)
cache.data_manager.close()


if __name__ == '__main__':
run()
43 changes: 43 additions & 0 deletions example/sf_mock/sf_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from scenario_cache.view import openai
from scenario_cache.core import cache
from scenario_cache.cache.data_manager import SFDataManager
from scenario_cache.similarity_evaluation.faiss import faiss_evaluation
import numpy as np


d = 8


def mock_embeddings(data, **kwargs):
return np.random.random((1, d)).astype('float32')


def run():
cache.init(embedding_func=mock_embeddings,
data_manager=SFDataManager("sqlite.db", "faiss.index", d),
evaluation_func=faiss_evaluation,
similarity_threshold=10000,
similarity_positive=False)

mock_messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "foo"}
]
# you should open it if you first run it
# cache.data_manager.save("receiver the foo", cache.embedding_func({"messages": mock_messages}))

answer = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=mock_messages,
cache_context={
"search": {
"user": "foo"
}
},
)
print(answer)
cache.data_manager.close()


if __name__ == '__main__':
run()
56 changes: 56 additions & 0 deletions example/sf_towhee/sf_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import time

from scenario_cache.view import openai
from scenario_cache.core import cache
from scenario_cache.cache.data_manager import SFDataManager
from scenario_cache.similarity_evaluation.faiss import faiss_evaluation
from scenario_cache.embedding.towhee import to_embeddings as towhee_embedding

d = 768


def run():
cache.init(embedding_func=towhee_embedding,
data_manager=SFDataManager("sqlite.db", "faiss.index", d),
evaluation_func=faiss_evaluation,
similarity_threshold=10000,
similarity_positive=False)

# you should open it if you first run it
# source_messages = [
# {"role": "system", "content": "You are a helpful assistant."},
# {"role": "user", "content": "what do you think about chatgpt"}
# ]
# cache.data_manager.save("chatgpt is a good application", cache.embedding_func({"messages": source_messages}))

# distance 77
mock_messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "what do you feel like chatgpt"}
]

# distance 21
# mock_messages = [
# {"role": "system", "content": "You are a helpful assistant."},
# {"role": "user", "content": "what do you think chatgpt"}
# ]

start_time = time.time()
answer = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=mock_messages,
cache_context={
"search": {
"user": "foo"
}
},
)
end_time = time.time()
print("cache hint time consuming: {:.2f}s".format(end_time - start_time))

print(answer)
cache.data_manager.close()


if __name__ == '__main__':
run()
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
openai
numpy
faiss-cpu
towhee
Empty file added scenario_cache/__init__.py
Empty file.
Empty file.
94 changes: 94 additions & 0 deletions scenario_cache/cache/data_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import hashlib
from abc import abstractmethod, ABCMeta
import pickle
from .scalar_data.sqllite3 import SQLite
from .vector_data.faiss import Faiss


class DataManager(metaclass=ABCMeta):
@abstractmethod
def init(self): pass

@abstractmethod
def save(self, data, embedding_data, **kwargs): pass

@abstractmethod
def get_scalar_data(self, vector_data, **kwargs): pass

@abstractmethod
def search(self, embedding_data, **kwargs): pass

@abstractmethod
def close(self): pass


class MapDataManager(DataManager):
def __init__(self, data_path):
self.data = {}
self.data_path = data_path

def init(self):
try:
f = open(self.data_path, 'rb')
self.data = pickle.load(f)
f.close()
except FileNotFoundError:
print(f'File <${self.data_path}> is not found.')
except PermissionError:
print(f'You don\'t have permission to access this file <${self.data_path}>.')

def save(self, data, embedding_data, **kwargs):
self.data[embedding_data] = (embedding_data, data)

def get_scalar_data(self, vector_data, **kwargs):
return vector_data[1]

def search(self, embedding_data, **kwargs):
return self.data[embedding_data]

def close(self):
try:
f = open(self.data_path, 'wb')
pickle.dump(self.data, f)
f.close()
except PermissionError:
print(f'You don\'t have permission to access this file <${self.data_path}>.')


def sha_data(data):
m = hashlib.sha1()
m.update(data.tobytes())
return m.hexdigest()


# SFDataManager sqlite3 + knowhere
class SFDataManager(DataManager):
s: SQLite
f: Faiss

def __init__(self, sqlite_path, index_path, dimension):
self.sqlite_path = sqlite_path
self.index_path = index_path
self.dimension = dimension

def init(self):
self.s = SQLite(self.sqlite_path)
self.f = Faiss(self.index_path, self.dimension)

def save(self, data, embedding_data, **kwargs):
key = sha_data(embedding_data)
self.s.insert(key, data)
self.f.add(embedding_data)

def get_scalar_data(self, search_data, **kwargs):
distance, vector_data = search_data
key = sha_data(vector_data)
return self.s.select(key)

def search(self, embedding_data, **kwargs):
return self.f.search(embedding_data)

def close(self):
self.s.close()
self.f.close()

Empty file.
36 changes: 36 additions & 0 deletions scenario_cache/cache/scalar_data/sqllite3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import sqlite3


class SQLite:

def __init__(self, db_path):
self.con = sqlite3.connect(db_path)
self.cur = self.con.cursor()
create_tb_cmd = '''
CREATE TABLE IF NOT EXISTS cache_data
(id TEXT PRIMARY KEY,
data TEXT);
'''
self.cur.execute(create_tb_cmd)

def insert(self, key, data):
self.cur.execute("INSERT INTO cache_data VALUES(?, ?)", (key, data))
self.con.commit()

# datas format
# datas = [
# ("1", "Monty Python Live at the Hollywood Bowl"),
# ("2", "Monty Python's The Meaning of Life"),
# ]
def mult_insert(self, datas):
self.cur.executemany("INSERT INTO cache_data VALUES(?, ?)", datas)
self.con.commit()

def select(self, key):
res = self.cur.execute("SELECT data FROM cache_data WHERE id=?", (key, ))
values = res.fetchone()
return values[0] if values is not None else None

def close(self):
self.cur.close()
self.con.close()
Empty file.
33 changes: 33 additions & 0 deletions scenario_cache/cache/vector_data/faiss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os

import faiss
from faiss import IndexHNSWFlat, Index
import numpy as np


class Faiss:
index: Index

def __init__(self, index_file_path, dimension):
self.index_file_path = index_file_path
self.index = IndexHNSWFlat(dimension, 32)
if os.path.isfile(index_file_path):
self.index = faiss.read_index(index_file_path)

def add(self, data):
np_data = np.array(data).astype('float32')
self.index.add(np_data)

def mult_add(self, datas):
np_data = np.array(datas).astype('float32')
self.index.add(np_data)

def search(self, data):
np_data = np.array(data).astype('float32')
D, I = self.index.search(np_data, 1)
distance = int(D[0, 0] * 100)
vector_data = self.index.reconstruct(int(I[0, 0])).reshape(1, -1)
return distance, vector_data

def close(self):
faiss.write_index(self.index, self.index_file_path)
Loading

0 comments on commit 00f18e7

Please sign in to comment.