Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deepmemory fixes #2675

Merged
merged 11 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions deeplake/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,10 @@
"REMOVE": 2,
"UPDATE": 3,
}


DEFAULT_RATE_LIMITER_KEY_TO_VALUE = {
"enabled": False,
"bytes_per_minute": MAX_BYTES_PER_MINUTE,
"batch_byte_size": TARGET_BYTE_SIZE,
}
125 changes: 63 additions & 62 deletions deeplake/core/vectorstore/deep_memory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import uuid
from collections import defaultdict
from typing import Any, Dict, Optional, List, Union, Callable, Tuple
Expand Down Expand Up @@ -32,6 +33,7 @@ def __init__(
self,
dataset: Dataset,
client: DeepMemoryBackendClient,
logger: logging.Logger,
embedding_function: Optional[Any] = None,
token: Optional[str] = None,
creds: Optional[Dict[str, Any]] = None,
Expand All @@ -41,6 +43,7 @@ def __init__(
Args:
dataset (Dataset): deeplake dataset object.
client (DeepMemoryBackendClient): Client to interact with the DeepMemory managed service. Defaults to None.
logger (logging.Logger): Logger object.
embedding_function (Optional[Any], optional): Embedding funtion class used to convert queries/documents to embeddings. Defaults to None.
token (Optional[str], optional): API token for the DeepMemory managed service. Defaults to None.
creds (Optional[Dict[str, Any]], optional): Credentials to access the dataset. Defaults to None.
Expand All @@ -63,6 +66,7 @@ def __init__(
self.embedding_function = embedding_function
self.client = client
self.creds = creds or {}
self.logger = logger

def train(
self,
Expand Down Expand Up @@ -94,6 +98,7 @@ def train(
Raises:
ValueError: if embedding_function is not specified either during initialization or during training.
"""
self.logger.info("Starting DeepMemory training job")
feature_report_path(
path=self.dataset.path,
feature_name="dm.train",
Expand Down Expand Up @@ -126,8 +131,10 @@ def train(
runtime=runtime,
token=token or self.token,
creds=self.creds,
verbose=False,
)

self.logger.info("Preparing training data for deepmemory:")
queries_vs.add(
text=[query for query in queries],
metadata=[
Expand All @@ -143,7 +150,9 @@ def train(
queries_path=queries_path,
)

print(f"DeepMemory training job started. Job ID: {response['job_id']}")
self.logger.info(
f"DeepMemory training job started. Job ID: {response['job_id']}"
)
return response["job_id"]

def cancel(self, job_id: str):
Expand Down Expand Up @@ -304,75 +313,67 @@ def evaluate(
top_k: List[int] = [1, 3, 5, 10, 50, 100],
qvs_params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Dict[str, float]]:
"""Evaluate a model on DeepMemory managed service.
"""
Evaluate a model using the DeepMemory managed service.

Examples:
>>> #1. Evaluate a model with embedding function
>>> relevance: List[List[Tuple[str, int]]] = [[("doc_id_1", 1), ("doc_id_2", 1)], [("doc_id_3", 1)]]
>>> # doc_id_1, doc_id_2, doc_id_3 are the ids of the documents in the corpus dataset that is relevant to the queries. It is stored in the `id` tensor of the corpus dataset.
>>> queries: List[str] = ["What is the capital of India?", "What is the capital of France?"]
>>> embedding_function: Callable[..., List[np.ndarray] = openai_embedding.embed_documents
>>> vectorstore.deep_memory.evaluate(
... relevance=relevance,
... queries=queries,
... embedding_function=embedding_function,
... )
>>> #2. Evaluate a model with precomputed embeddings
>>> relevance: List[List[Tuple[str, int]]] = [[("doc_id_1", 1), ("doc_id_2", 1)], [("doc_id_3", 1)]]
>>> # doc_id_1, doc_id_2, doc_id_3 are the ids of the documents in the corpus dataset that is relevant to the queries. It is stored in the `id` tensor of the corpus dataset.
>>> queries: List[str] = ["What is the capital of India?", "What is the capital of France?"]
>>> embedding: Union[List[np.ndarray[Any, Any]], List[List[float]] = [[-1.2, 12, ...], ...]
>>> vectorstore.deep_memory.evaluate(
... relevance=relevance,
... queries=queries,
... embedding=embedding,
... )
>>> #3. Evaluate a model with precomputed embeddings and log queries
>>> relevance: List[List[Tuple[str, int]]] = [[("doc_id_1", 1), ("doc_id_2", 1)], [("doc_id_3", 1)]]
>>> # doc_id_1, doc_id_2, doc_id_3 are the ids of the documents in the corpus dataset that is relevant to the queries. It is stored in the `id` tensor of the corpus dataset.
>>> queries: List[str] = ["What is the capital of India?", "What is the capital of France?"]
>>> embedding: Union[List[np.ndarray[Any, Any]], List[List[float]] = [[-1.2, 12, ...], ...]
>>> vectorstore.deep_memory.evaluate(
... relevance=relevance,
... queries=queries,
... embedding=embedding,
... qvs_params={
... "log_queries": True,
... }
... )
>>> #4. Evaluate a model with precomputed embeddings and log queries, and custom branch
>>> relevance: List[List[Tuple[str, int]]] = [[("doc_id_1", 1), ("doc_id_2", 1)], [("doc_id_3", 1)]]
>>> # doc_id_1, doc_id_2, doc_id_3 are the ids of the documents in the corpus dataset that is relevant to the queries. It is stored in the `id` tensor of the corpus dataset.
>>> queries: List[str] = ["What is the capital of India?", "What is the capital of France?"]
>>> embedding: Union[List[np.ndarray[Any, Any]], List[List[float]] = [[-1.2, 12, ...], ...]
>>> vectorstore.deep_memory.evaluate(
... relevance=relevance,
... queries=queries,
... embedding=embedding,
... qvs_params={
... "log_queries": True,
... "branch": "queries",
... }
... )
# 1. Evaluate a model using an embedding function:
relevance = [[("doc_id_1", 1), ("doc_id_2", 1)], [("doc_id_3", 1)]]
queries = ["What is the capital of India?", "What is the capital of France?"]
embedding_function = openai_embedding.embed_documents
vectorstore.deep_memory.evaluate(
relevance=relevance,
queries=queries,
embedding_function=embedding_function,
)

# 2. Evaluate a model with precomputed embeddings:
embeddings = [[-1.2, 12, ...], ...]
vectorstore.deep_memory.evaluate(
relevance=relevance,
queries=queries,
embedding=embeddings,
)

# 3. Evaluate a model with precomputed embeddings and log queries:
vectorstore.deep_memory.evaluate(
relevance=relevance,
queries=queries,
embedding=embeddings,
qvs_params={"log_queries": True},
)

# 4. Evaluate with precomputed embeddings, log queries, and a custom branch:
vectorstore.deep_memory.evaluate(
relevance=relevance,
queries=queries,
embedding=embeddings,
qvs_params={
"log_queries": True,
"branch": "queries",
}
)

Args:
queries (List[str]): List of queries to evaluate the model on.
relevance (List[List[Tuple[str, int]]]): List of relevant documents for each query with their respective relevance score.
The outer list corresponds to the queries and the inner list corresponds to the doc_id, relevence_score pair for each query.
doc_id is the document id in the corpus dataset. It is stored in the `id` tensor of the corpus dataset.
relevence_score is the relevance score of the document for the query. The range is between 0 and 1, where 0 stands for not relevant and 1 stands for relevant.
embedding (Optional[np.ndarray], optional): Embedding of the queries. Defaults to None.
embedding_function (Optional[Callable[..., List[np.ndarray]]], optional): Embedding funtion used to convert queries to embeddings. Defaults to None.
top_k (List[int], optional): List of top_k values to evaluate the model on. Defaults to [1, 3, 5, 10, 50, 100].
qvs_params (Optional[Dict], optional): Parameters to initialize the queries vectorstore. Defaults to None.
queries (List[str]): Queries for model evaluation.
relevance (List[List[Tuple[str, int]]]): Relevant documents and scores for each query.
- Outer list: matches the queries.
- Inner list: pairs of doc_id and relevance score.
- doc_id: Document ID from the corpus dataset, found in the `id` tensor.
- relevance_score: Between 0 (not relevant) and 1 (relevant).
embedding (Optional[np.ndarray], optional): Query embeddings. Defaults to None.
embedding_function (Optional[Callable[..., List[np.ndarray]]], optional): Function to convert queries into embeddings. Defaults to None.
top_k (List[int], optional): Ranks for model evaluation. Defaults to [1, 3, 5, 10, 50, 100].
qvs_params (Optional[Dict], optional): Parameters to initialize the queries vectorstore. When specified, creates a new vectorstore to track evaluation queries, the Deep Memory response, and the naive vector search results. Defaults to None.

Returns:
Dict[str, Dict[str, float]]: Dictionary of recalls for each top_k value.
Dict[str, Dict[str, float]]: Recalls for each rank.

Raises:
ImportError: if indra is not installed
ValueError: if embedding_function is not specified either during initialization or during evaluation.
ImportError: If `indra` is not installed.
ValueError: If no embedding_function is provided either during initialization or evaluation.
"""

feature_report_path(
path=self.dataset.path,
feature_name="dm.evaluate",
Expand Down Expand Up @@ -439,7 +440,7 @@ def evaluate(
(True, "deepmemory_distance"),
]:
eval_type = "with" if use_model else "without"
print(f"---- Evaluating {eval_type} model ---- ")
print(f"---- Evaluating {eval_type} Deep Memory ---- ")
avg_recalls, queries_dict = recall_at_k(
indra_dataset,
relevance,
Expand Down
12 changes: 1 addition & 11 deletions deeplake/core/vectorstore/deeplake_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import deeplake
from deeplake.core import index_maintenance
from deeplake.core.distance_type import DistanceType
from deeplake.util.dataset import try_flushing
from deeplake.util.path import convert_pathlib_to_string_if_needed

from deeplake.api import dataset
Expand Down Expand Up @@ -338,10 +337,9 @@ def add(
embedding_data=embedding_data,
embedding_tensor=embedding_tensor,
rate_limiter=rate_limiter,
logger=self.logger,
)

try_flushing(self.dataset)

if self.verbose:
self.dataset.summary()

Expand Down Expand Up @@ -445,8 +443,6 @@ def search(
username=self.username,
)

try_flushing(self.dataset)

if exec_option is None and self.exec_option != "python" and callable(filter):
self.logger.warning(
'Switching exec_option to "python" (runs on client) because filter is specified as a function. '
Expand Down Expand Up @@ -603,8 +599,6 @@ def delete(

self.dataset.pop_multiple(row_ids)

try_flushing(self.dataset)

return True

def update_embedding(
Expand Down Expand Up @@ -677,8 +671,6 @@ def update_embedding(
username=self.username,
)

try_flushing(self.dataset)

(
embedding_function,
embedding_source_tensor,
Expand Down Expand Up @@ -711,8 +703,6 @@ def update_embedding(

self.dataset[row_ids].update(embedding_tensor_data)

try_flushing(self.dataset)

@staticmethod
def delete_by_path(
path: Union[str, pathlib.Path],
Expand Down
1 change: 1 addition & 0 deletions deeplake/core/vectorstore/deepmemory_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, client, *arg, **kwargs):
embedding_function=self.embedding_function,
client=client,
creds=self.creds,
logger=self.logger,
)

def search(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
convert_id_to_row_id,
search_row_ids,
extend,
populate_rate_limiter,
)
41 changes: 38 additions & 3 deletions deeplake/core/vectorstore/vector_search/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
MAX_BYTES_PER_MINUTE,
TARGET_BYTE_SIZE,
VECTORSTORE_EXTEND_BATCH_SIZE,
DEFAULT_RATE_LIMITER_KEY_TO_VALUE,
)
from deeplake.util.exceptions import IncorrectEmbeddingShapeError

Expand Down Expand Up @@ -460,6 +461,7 @@ def extend(
dataset: deeplake.core.dataset.Dataset,
rate_limiter: Dict,
_extend_batch_size: int = VECTORSTORE_EXTEND_BATCH_SIZE,
logger=None,
):
"""
Function to extend the dataset with new data.
Expand All @@ -468,8 +470,15 @@ def extend(
embedding_data = [embedding_data]

if embedding_function:
number_of_batches = ceil(len(embedding_data[0]) / _extend_batch_size)
progressbar_str = (
f"Creating {len(embedding_data[0])} embeddings in "
f"{number_of_batches} batches of size {min(_extend_batch_size, len(embedding_data[0]))}:"
)

for idx in tqdm(
range(0, len(embedding_data[0]), _extend_batch_size), "creating embeddings"
range(0, len(embedding_data[0]), _extend_batch_size),
progressbar_str,
):
batch_start, batch_end = idx, idx + _extend_batch_size

Expand All @@ -488,9 +497,32 @@ def extend(

batched_processed_tensors = {**batched_embeddings, **batched_tensors}

dataset.extend(batched_processed_tensors)
dataset.extend(batched_processed_tensors, progressbar=False)
else:
logger.info("Uploading data to deeplake dataset.")
dataset.extend(processed_tensors, progressbar=True)


def populate_rate_limiter(rate_limiter):
if rate_limiter is None or rate_limiter == {}:
return {
"enabled": False,
"bytes_per_minute": MAX_BYTES_PER_MINUTE,
"batch_byte_size": TARGET_BYTE_SIZE,
}
else:
dataset.extend(processed_tensors)
rate_limiter_keys = ["enabled", "bytes_per_minute", "batch_byte_size"]

for key in rate_limiter_keys:
if key not in rate_limiter:
rate_limiter[key] = DEFAULT_RATE_LIMITER_KEY_TO_VALUE[key]

for item in rate_limiter:
if item not in rate_limiter_keys:
raise ValueError(
f"Invalid rate_limiter key: {item}. Valid keys are: 'enabled', 'bytes_per_minute', 'batch_byte_size'."
)
return rate_limiter


def extend_or_ingest_dataset(
Expand All @@ -500,7 +532,9 @@ def extend_or_ingest_dataset(
embedding_tensor,
embedding_data,
rate_limiter,
logger,
):
rate_limiter = populate_rate_limiter(rate_limiter)
# TODO: Add back the old logic with checkpointing after indexing is fixed
extend(
embedding_function,
Expand All @@ -509,6 +543,7 @@ def extend_or_ingest_dataset(
processed_tensors,
dataset,
rate_limiter,
logger=logger,
)


Expand Down
20 changes: 20 additions & 0 deletions deeplake/core/vectorstore/vector_search/dataset/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,23 @@ def mock_embedding_function(text):
assert (
abs(elapsed_minutes - expected_time) <= tolerance
), "Rate limiting did not work as expected!"


def test_populate_rate_limiter():
rate_limiter = {
"enabled": True,
}

rate_limiter_parsed = dataset_utils.populate_rate_limiter(rate_limiter)
assert rate_limiter_parsed == {
"enabled": True,
"bytes_per_minute": MAX_BYTES_PER_MINUTE,
"batch_byte_size": TARGET_BYTE_SIZE,
}

rate_limiter = {
"enabled": True,
"bytes_per_second": 1000,
}
with pytest.raises(ValueError):
rate_limiter_parsed = dataset_utils.populate_rate_limiter(rate_limiter)
Loading