Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Variable Embedding Dim for Vespa #985

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ COPY ./alembic /app/alembic
COPY ./alembic.ini /app/alembic.ini
COPY supervisord.conf /usr/etc/supervisord.conf

# Create Vespa app zip
WORKDIR /app/danswer/document_index/vespa/app_config
RUN zip -r /app/danswer/vespa-app.zip .
WORKDIR /app

ENV PYTHONPATH /app

# Default command which does nothing
Expand Down
3 changes: 2 additions & 1 deletion backend/danswer/document_index/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any

from danswer.access.models import DocumentAccess
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
from danswer.indexing.models import DocMetadataAwareIndexChunk
from danswer.indexing.models import InferenceChunk
from danswer.search.models import IndexFilters
Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(self, index_name: str, *args: Any, **kwargs: Any) -> None:
self.index_name = index_name

@abc.abstractmethod
def ensure_indices_exist(self) -> None:
def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None:
raise NotImplementedError


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ schema danswer_chunk {
summary: dynamic
}
# Title embedding (x1)
field title_embedding type tensor<float>(x[384]) {
field title_embedding type tensor<float>(x[VARIABLE_DIM]) {
indexing: attribute
attribute {
distance-metric: angular
}
}
# Content embeddings (chunk + optional mini chunks embeddings)
# "t" and "x" are arbitrary names, not special keywords
field embeddings type tensor<float>(t{},x[384]) {
field embeddings type tensor<float>(t{},x[VARIABLE_DIM]) {
indexing: attribute
attribute {
distance-metric: angular
Expand Down Expand Up @@ -143,7 +143,7 @@ schema danswer_chunk {

rank-profile hybrid_search inherits default, default_rank {
inputs {
query(query_embedding) tensor<float>(x[384])
query(query_embedding) tensor<float>(x[VARIABLE_DIM])
}

# This must be separate function for normalize_linear to work
Expand Down Expand Up @@ -224,7 +224,7 @@ schema danswer_chunk {

rank-profile semantic_search inherits default, default_rank {
inputs {
query(query_embedding) tensor<float>(x[384])
query(query_embedding) tensor<float>(x[VARIABLE_DIM])
}

first-phase {
Expand Down
51 changes: 43 additions & 8 deletions backend/danswer/document_index/vespa/index.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import concurrent.futures
import io
import json
import os
import string
import time
import zipfile
from collections.abc import Callable
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from typing import Any
from typing import BinaryIO
from typing import cast

import httpx
Expand Down Expand Up @@ -49,6 +53,7 @@
from danswer.configs.constants import TITLE
from danswer.configs.constants import TITLE_EMBEDDING
from danswer.configs.constants import TITLE_SEPARATOR
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
get_experts_stores_representations,
Expand All @@ -70,7 +75,7 @@

logger = setup_logger()


VESPA_DIM_REPLACEMENT_PAT = "VARIABLE_DIM"
VESPA_CONFIG_SERVER_URL = f"http://{VESPA_HOST}:{VESPA_TENANT_PORT}"
VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}"
VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"
Expand Down Expand Up @@ -566,6 +571,15 @@ def _inference_chunk_by_vespa_id(vespa_id: str) -> InferenceChunk:
return _vespa_hit_to_inference_chunk(res.json())


def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO:
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
for filename, content in file_contents.items():
zipf.writestr(filename, content)
zip_buffer.seek(0)
return zip_buffer


class VespaIndex(DocumentIndex):
yql_base = (
f"select "
Expand Down Expand Up @@ -593,21 +607,42 @@ def __init__(self, deployment_zip: str = VESPA_DEPLOYMENT_ZIP) -> None:
# to be updated + zipped + deployed, not supporting the option for simplicity
self.deployment_zip = deployment_zip

def ensure_indices_exist(self) -> None:
def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None:
"""Verifying indices is more involved as there is no good way to
verify the deployed app against the zip locally. But deploying the latest app.zip will ensure that
the index is up-to-date with the expected schema and this does not erase the existing index.
If the changes cannot be applied without conflict with existing data, it will fail with a non 200
"""
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
logger.debug(f"Sending Vespa zip to {deploy_url}")

vespa_schema_path = os.path.join(
os.getcwd(), "danswer", "document_index", "vespa", "app_config"
)
schema_file = os.path.join(vespa_schema_path, "schemas", "danswer_chunk.sd")
services_file = os.path.join(vespa_schema_path, "services.xml")

with open(schema_file, "r") as schema_f:
schema = schema_f.read()
schema = schema.replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim))
schema_bytes = schema.encode("utf-8")

with open(services_file, "rb") as services_f:
services_bytes = services_f.read()

zip_dict = {
"schemas/danswer_chunk.sd": schema_bytes,
"services.xml": services_bytes,
}

zip_file = in_memory_zip_from_file_bytes(zip_dict)

headers = {"Content-Type": "application/zip"}
with open(self.deployment_zip, "rb") as f:
response = requests.post(deploy_url, headers=headers, data=f)
if response.status_code != 200:
raise RuntimeError(
f"Failed to prepare Vespa Danswer Index. Response: {response.text}"
)
response = requests.post(deploy_url, headers=headers, data=zip_file)
if response.status_code != 200:
raise RuntimeError(
f"Failed to prepare Vespa Danswer Index. Response: {response.text}"
)

def index(
self,
Expand Down