Skip to content

Commit

Permalink
Added retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
adolkhan committed May 12, 2023
1 parent 8f5f539 commit bb2f658
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 59 deletions.
2 changes: 1 addition & 1 deletion deeplake/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ def remote_query(self, org_id: str, ds_name: str, query_string: str) -> List[int
query_string (str): The query string.
Returns:
dict: The indicies matching the query.
Tuple[Any, Any]: The indicies and scores matching the query.
"""
response = self.request(
"POST",
Expand Down
1 change: 1 addition & 0 deletions deeplake/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,4 @@

DEFAULT_DEEPLAKE_PATH = "./deeplake_vector_store"
MAX_RETRY_ATTEMPTS = 5
MAX_CHECKPOINTING_INTERVAL = 100000
20 changes: 11 additions & 9 deletions deeplake/core/vectorstore/deeplake_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ def __init__(
"""DeepLakeVectorStore initialization
Args:
dataset_path (str, optional): path to the deeplake dataset. Defaults to DEFAULT_DEEPLAKE_PATH.
dataset_path (str): path to the deeplake dataset. Defaults to DEFAULT_DEEPLAKE_PATH.
token (str, optional): Activeloop token, used for fetching credentials for Deep Lake datasets. This is Optional, tokens are normally autogenerated. Defaults to None.
embedding_function (Optional[callable], optional): Function that converts query into embedding. Defaults to None.
read_only (bool, optional): Opens dataset in read-only mode if this is passed as True. Defaults to False.
ingestion_batch_size (int, optional): The batch size to use during ingestion. Defaults to 1024.
num_workers (int, optional): The number of workers to use for ingesting data in parallel. Defaults to 0.
exec_option (str, optional): Type of query execution. It could be either "python", "compute_engine" or "db_engine". Defaults to "python".
ingestion_batch_size (int): The batch size to use during ingestion. Defaults to 1024.
num_workers (int): The number of workers to use for ingesting data in parallel. Defaults to 0.
exec_option (str): Type of query execution. It could be either "python", "compute_engine" or "db_engine". Defaults to "python".
- `python` - runs on the client and can be used for any data stored anywhere. WARNING: using this option with big datasets is discouraged, because it can lead to some memory issues.
- `compute_engine` - runs on the client and can be used for any data stored in or connected to Deep Lake.
- `db_engine` - runs on the Deep Lake Managed Database and can be used for any data stored in the Deep Lake Managed.
verbose (bool): Whether to print summary of the dataset created. Defaults to False.
**kwargs (Any): Additional keyword arguments.
"""
self.ingestion_batch_size = ingestion_batch_size
self.num_workers = num_workers
Expand All @@ -68,16 +70,16 @@ def add(
ids: Optional[List[str]] = None,
embeddings: Optional[np.ndarray] = None,
total_samples_processed: Optional[Any] = None,
) -> List[str]:
) -> Optional[List[str]]:
"""Adding elements to deeplake vector store
Args:
texts (Iterable[str]): texts to add to deeplake vector store
metadatas (List[dict], optional): List of metadatas. Defaults to None.
ids (List[str], optional): List of document IDs. Defaults to None.
embeddings (Optional[np.ndarray): embedding of texts. Defaults to None.
embeddings (np.ndarray, optional): embedding of texts. Defaults to None.
Returns:
ids (List[str]): List of document IDs
ids (List[str], optional): List of document IDs
"""
elements = dataset_utils.create_elements(ids, texts, metadatas, embeddings)
ingest_data.run_data_ingestion(
Expand Down Expand Up @@ -145,15 +147,15 @@ def _search(
embedding: Optional[Union[List[float], np.ndarray]] = None,
query: Optional[str] = None,
k: int = 4,
distance_metric: Optional[str] = "L2",
distance_metric: str = "L2",
):
"""Internal DeepLakeVectorStore search method
Args:
query (Optional[str], optional): String representation of the query to run. Defaults to None.
embedding (Optional[Union[List[float], np.ndarray]], optional): Embedding representation of the query to run. Defaults to None.
k (int): Number of elements to return after running query. Defaults to 4.
distance_metric (str, optional): Type of distance metric to use for sorting the data. Avaliable options are: "L1", "L2", "COS", "MAX". Defaults to "L2".
distance_metric (str): Type of distance metric to use for sorting the data. Avaliable options are: "L1", "L2", "COS", "MAX". Defaults to "L2".
filter (Optional[Any], optional): Metadata dictionary for exact search. Defaults to None.
exec_option (str, optional): Type of query execution. It could be either "python", "compute_engine" or "db_engine". Defaults to "python".
- `python` - runs on the client and can be used for any data stored anywhere. WARNING: using this option with big datasets is discouraged, because it can lead to some memory issues.
Expand Down
10 changes: 5 additions & 5 deletions deeplake/core/vectorstore/vector_search/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from deeplake.core import tensor as tensor_utils

try:
from indra import api
from indra import api # type: ignore

_INDRA_INSTALLED = True # pragma: no cover
except ImportError: # pragma: no cover
Expand All @@ -14,7 +14,7 @@
import numpy as np

import uuid
from typing import Iterable, List, Union
from typing import Iterable, List, Union, Optional

from deeplake.constants import DEFAULT_DEEPLAKE_PATH
from deeplake.util.warnings import always_warn
Expand Down Expand Up @@ -173,10 +173,10 @@ def preprocess_tensors(ids, texts, metadatas, embeddings):


def create_elements(
ids: List[str],
texts: Iterable[str],
metadatas: List[dict],
embeddings: Union[List[float], np.ndarray],
ids: Optional[List[str]] = None,
metadatas: Optional[List[dict]] = None,
embeddings: Optional[np.ndarray] = None,
):
processed_tensors = preprocess_tensors(ids, texts, metadatas, embeddings)
utils.check_length_of_each_tensor(processed_tensors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def test_create_or_load_dataset(caplog, hub_cloud_dev_token):
# dataset loading
dataset = dataset_utils.create_or_load_dataset(
dataset_path="hub://activeloop-test/deeplake_vectorstore-test1",
token=None,
creds={},
logger=logger,
exec_option="python",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def vector_search(
query_embedding: Optional[np.ndarray],
query_embedding: Union[List[float], np.ndarray[Any, Any]],
distance_metric: str,
deeplake_dataset: DeepLakeDataset,
k: int,
Expand Down
4 changes: 2 additions & 2 deletions deeplake/core/vectorstore/vector_search/indra/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def create_query(
return query


def convert_tensor_to_str(query_embedding: Union[List[float], np.ndarray]):
def convert_tensor_to_str(query_embedding: np.ndarray):
"""Function for converting a query embedding to a string
We need to convert tensor to a string to be able to use tql
Expand All @@ -68,7 +68,7 @@ def convert_tensor_to_str(query_embedding: Union[List[float], np.ndarray]):
def parse_query(
distance_metric: str,
limit: int,
query_embedding: Union[List[float], np.ndarray],
query_embedding: np.ndarray,
embedding_tensor: str,
):
"""Function for converting query_embedding into tql query.
Expand Down
65 changes: 36 additions & 29 deletions deeplake/core/vectorstore/vector_search/ingestion/data_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
from typing import Dict, List, Any, Callable
from typing import Dict, List, Any, Callable, Optional

import numpy as np

import deeplake
from deeplake.core.dataset import Dataset as DeepLakeDataset
from deeplake.core.vectorstore.vector_search import utils
from deeplake.util.exceptions import TransformError
from deeplake.constants import MAX_RETRY_ATTEMPTS
from deeplake.constants import MAX_RETRY_ATTEMPTS, MAX_CHECKPOINTING_INTERVAL


class DataIngestion:
def __init__(
self,
elements,
dataset,
elements: List[Dict[str, Any]],
dataset: DeepLakeDataset,
embedding_function: Callable,
ingestion_batch_size: int,
num_workers: int,
retry_attempt: int,
total_samples_processed=None,
total_samples_processed: int,
):
self.elements = elements
self.dataset = dataset
Expand All @@ -28,11 +28,13 @@ def __init__(
self.retry_attempt = retry_attempt
self.total_samples_processed = total_samples_processed

def collect_batched_data(self):
batch_size = min(self.ingestion_batch_size, len(self.elements))
def collect_batched_data(self, ingestion_batch_size=None):
ingestion_batch_size = ingestion_batch_size or self.ingestion_batch_size
batch_size = min(ingestion_batch_size, len(self.elements))
if batch_size == 0:
raise ValueError("batch_size must be a positive number greater than zero.")

elements = self.elements
if self.total_samples_processed:
if self.total_samples_processed * batch_size >= len(self.elements):
return []
Expand All @@ -47,21 +49,26 @@ def collect_batched_data(self):
def get_num_workers(self, batched):
return min(self.num_workers, len(batched) // max(self.num_workers, 1))

def get_checkpoint_interval(self, batched):
def get_checkpoint_interval_and_batched_data(self, batched, num_workers):
checkpoint_interval = max(
int(
(0.1 * len(batched) // max(self.num_workers, 1))
* max(self.num_workers, 1),
(0.1 * len(batched) // max(num_workers, 1)) * max(num_workers, 1),
),
self.num_workers,
num_workers,
1,
)

if checkpoint_interval * self.ingestion_batch_size > MAX_CHECKPOINTING_INTERVAL:
checkpoint_interval = 100

return checkpoint_interval

def run(self):
batched_data = self.collect_batched_data()
num_workers = self.get_num_workers(batched_data)
checkpoint_interval = self.get_checkpoint_interval(batched_data)
checkpoint_interval = self.get_checkpoint_interval_and_batched_data(
batched_data, num_workers=num_workers
)

self._ingest(
batched=batched_data,
Expand All @@ -84,23 +91,23 @@ def _ingest(
)
except Exception as e:
self.retry_attempt += 1
last_checkpoint = self.dataset.version_state["commit_node"].parent
self.total_samples_processed += last_checkpoint.total_samples_processed

if self.retry_attempt > MAX_RETRY_ATTEMPTS:
raise Exception(
f"""Maximum retry attempts exceeded. You can resume ingestion, from the latest saved checkpoint.
To do that you should run:
```
deeplake_vector_store.add(
texts=texts,
metadatas=metadatas,
ids=ids,
embeddings=embeddings,
total_samples_processed={self.total_samples_processed},
)
```
"""
f"Maximum retry attempts exceeded. You can resume ingestion, from the latest saved checkpoint.\n"
"To do that you should run:\n"
"```\n"
"deeplake_vector_store.add(\n"
" texts=texts,\n"
" metadatas=metadatas,\n"
" ids=ids,\n"
" embeddings=embeddings,\n"
f" total_samples_processed={self.total_samples_processed},\n"
")\n"
"```"
)
last_checkpoint = self.dataset.version_state["commit_node"].parent
self.total_samples_processed += last_checkpoint.total_samples_processed

data_ingestion = DataIngestion(
elements=self.elements,
Expand All @@ -118,7 +125,7 @@ def _ingest(
def ingest(sample_in: list, sample_out: list, embedding_function) -> None:
text_list = [s["text"] for s in sample_in]

embeds = [None] * len(text_list)
embeds: List[Optional[np.ndarray]] = [None] * len(text_list)
if embedding_function is not None:
try:
embeddings = embedding_function(text_list)
Expand All @@ -128,8 +135,8 @@ def ingest(sample_in: list, sample_out: list, embedding_function) -> None:
)
embeds = [np.array(e, dtype=np.float32) for e in embeddings]

for s, e in zip(sample_in, embeds):
embedding = e if embedding_function else s["embedding"]
for s, emb in zip(sample_in, embeds):
embedding = emb if embedding_function else s["embedding"]
sample_out.append(
{
"text": s["text"],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Any, Callable
from typing import Dict, List, Any, Callable, Optional

from deeplake.core.dataset import Dataset as DeepLakeDataset
from deeplake.core.vectorstore.vector_search.ingestion.data_ingestion import (
Expand All @@ -9,11 +9,11 @@
def run_data_ingestion(
elements: List[Dict[str, Any]],
dataset: DeepLakeDataset,
embedding_function: Callable,
ingestion_batch_size: int,
num_workers: int,
embedding_function: Optional[Callable] = None,
retry_attempt: int = 0,
total_samples_processed=None,
total_samples_processed=0,
):
"""Running data ingestion into deeplake dataset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import pytest
import random
from functools import partial


import deeplake
from deeplake.constants import MB
Expand All @@ -10,9 +12,9 @@
random.seed(1)


def corrupted_embedding_function(emb):
def corrupted_embedding_function(emb, threshold):
p = random.uniform(0, 1)
if p > 0.9:
if p > threshold:
raise Exception("CorruptedEmbeddingFunction")
return np.zeros((len(emb), 1536), dtype=np.float32)

Expand Down Expand Up @@ -89,12 +91,26 @@ def test_ingest_data():
)

assert len(dataset) == 4
extended_data = data * 10000
extended_data = data * 5000
embedding_function = partial(corrupted_embedding_function, threshold=0.9)

ingest_data.run_data_ingestion(
dataset=dataset,
elements=extended_data,
embedding_function=embedding_function,
ingestion_batch_size=1024,
num_workers=2,
)

assert len(dataset) == 20004

extended_data = extended_data * 10
embedding_function = partial(corrupted_embedding_function, threshold=0.95)
with pytest.raises(Exception):
ingest_data.run_data_ingestion(
dataset=dataset,
elements=extended_data,
embedding_function=corrupted_embedding_function,
embedding_function=embedding_function,
ingestion_batch_size=1024,
num_workers=2,
)
Expand Down
5 changes: 3 additions & 2 deletions deeplake/core/vectorstore/vector_search/vector_search.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import numpy as np

from typing import Optional, Any, Dict, Callable, Union, List
from typing import Any, Callable, Dict, List, Union

import deeplake
from deeplake.core.dataset import Dataset as DeepLakeDataset
from deeplake.core import vectorstore

EXEC_OPTION_TO_SEARCH_TYPE = {

EXEC_OPTION_TO_SEARCH_TYPE: Dict[str, Callable] = {
"compute_engine": vectorstore.indra_vector_search,
"python": vectorstore.python_vector_search,
"db_engine": vectorstore.remote_engine_vector_search,
Expand Down
4 changes: 2 additions & 2 deletions deeplake/core/version_control/commit_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class CommitNode:
"""Contains all the Version Control information about a particular commit."""

def __init__(self, branch: str, commit_id: str):
def __init__(self, branch: str, commit_id: str, total_samples_processed: int = 0):
self.commit_id = commit_id
self.branch = branch
self.children: List["CommitNode"] = []
Expand All @@ -17,7 +17,7 @@ def __init__(self, branch: str, commit_id: str):
self.merge_parent: Optional["CommitNode"] = None
self._info_updated: bool = False
self.is_checkpoint: bool = False
self.total_samples_processed: int = 0
self.total_samples_processed: int = total_samples_processed

def add_child(self, node: "CommitNode"):
"""Adds a child to the node, used for branching."""
Expand Down

0 comments on commit bb2f658

Please sign in to comment.