Skip to content

Commit

Permalink
Fix document counts (onyx-dot-app#3671)
Browse files Browse the repository at this point in the history
* Various fixes/improvements to document counting

* Add new column + index

* Avoid double scan

* comment fixes

* Fix revision history

* Fix IT

* Fix IT

* Fix migration

* Rebase
  • Loading branch information
Weves authored Jan 19, 2025
1 parent b25668c commit 342bb9f
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 49 deletions.
48 changes: 48 additions & 0 deletions backend/alembic/versions/c7bf5721733e_add_has_been_indexed_to_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Add has_been_indexed to DocumentByConnectorCredentialPair
Revision ID: c7bf5721733e
Revises: fec3db967bf7
Create Date: 2025-01-13 12:39:05.831693
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "c7bf5721733e"
down_revision = "027381bce97c"
branch_labels = None
depends_on = None


def upgrade() -> None:
# assume all existing rows have been indexed, no better approach
op.add_column(
"document_by_connector_credential_pair",
sa.Column("has_been_indexed", sa.Boolean(), nullable=True),
)
op.execute(
"UPDATE document_by_connector_credential_pair SET has_been_indexed = TRUE"
)
op.alter_column(
"document_by_connector_credential_pair",
"has_been_indexed",
nullable=False,
)

# Add index to optimize get_document_counts_for_cc_pairs query pattern
op.create_index(
"idx_document_cc_pair_counts",
"document_by_connector_credential_pair",
["connector_id", "credential_id", "has_been_indexed"],
unique=False,
)


def downgrade() -> None:
# Remove the index first before removing the column
op.drop_index(
"idx_document_cc_pair_counts",
table_name="document_by_connector_credential_pair",
)
op.drop_column("document_by_connector_credential_pair", "has_been_indexed")
27 changes: 17 additions & 10 deletions backend/onyx/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,17 @@ def strip_null_characters(doc_batch: list[Document]) -> list[Document]:
for doc in doc_batch:
cleaned_doc = doc.model_copy()

# Postgres cannot handle NUL characters in text fields
if "\x00" in cleaned_doc.id:
logger.warning(f"NUL characters found in document ID: {cleaned_doc.id}")
cleaned_doc.id = cleaned_doc.id.replace("\x00", "")

if cleaned_doc.title and "\x00" in cleaned_doc.title:
logger.warning(
f"NUL characters found in document title: {cleaned_doc.title}"
)
cleaned_doc.title = cleaned_doc.title.replace("\x00", "")

if "\x00" in cleaned_doc.semantic_identifier:
logger.warning(
f"NUL characters found in document semantic identifier: {cleaned_doc.semantic_identifier}"
Expand All @@ -120,6 +127,9 @@ def strip_null_characters(doc_batch: list[Document]) -> list[Document]:
)
section.link = section.link.replace("\x00", "")

# since text can be longer, just replace to avoid double scan
section.text = section.text.replace("\x00", "")

cleaned_batch.append(cleaned_doc)

return cleaned_batch
Expand Down Expand Up @@ -277,8 +287,6 @@ def _run_indexing(
tenant_id=tenant_id,
)

all_connector_doc_ids: set[str] = set()

tracer_counter = 0
if INDEXING_TRACER_INTERVAL > 0:
tracer.snap()
Expand Down Expand Up @@ -347,16 +355,15 @@ def _run_indexing(
index_attempt_md.batch_num = batch_num + 1 # use 1-index for this

# real work happens here!
new_docs, total_batch_chunks = indexing_pipeline(
index_pipeline_result = indexing_pipeline(
document_batch=doc_batch_cleaned,
index_attempt_metadata=index_attempt_md,
)

batch_num += 1
net_doc_change += new_docs
chunk_count += total_batch_chunks
document_count += len(doc_batch_cleaned)
all_connector_doc_ids.update(doc.id for doc in doc_batch_cleaned)
net_doc_change += index_pipeline_result.new_docs
chunk_count += index_pipeline_result.total_chunks
document_count += index_pipeline_result.total_docs

# commit transaction so that the `update` below begins
# with a brand new transaction. Postgres uses the start
Expand All @@ -365,9 +372,6 @@ def _run_indexing(
# be inaccurate
db_session.commit()

if callback:
callback.progress("_run_indexing", len(doc_batch_cleaned))

# This new value is updated every batch, so UI can refresh per batch update
with get_session_with_tenant(tenant_id) as db_session_temp:
update_docs_indexed(
Expand All @@ -378,6 +382,9 @@ def _run_indexing(
docs_removed_from_index=0,
)

if callback:
callback.progress("_run_indexing", len(doc_batch_cleaned))

tracer_counter += 1
if (
INDEXING_TRACER_INTERVAL > 0
Expand Down
39 changes: 33 additions & 6 deletions backend/onyx/db/document.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import time
from collections.abc import Generator
from collections.abc import Iterable
from collections.abc import Sequence
from datetime import datetime
from datetime import timezone
Expand All @@ -13,6 +14,7 @@
from sqlalchemy import Select
from sqlalchemy import select
from sqlalchemy import tuple_
from sqlalchemy import update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine.util import TransactionalContext
from sqlalchemy.exc import OperationalError
Expand Down Expand Up @@ -226,10 +228,13 @@ def get_document_counts_for_cc_pairs(
func.count(),
)
.where(
tuple_(
DocumentByConnectorCredentialPair.connector_id,
DocumentByConnectorCredentialPair.credential_id,
).in_(cc_ids)
and_(
tuple_(
DocumentByConnectorCredentialPair.connector_id,
DocumentByConnectorCredentialPair.credential_id,
).in_(cc_ids),
DocumentByConnectorCredentialPair.has_been_indexed.is_(True),
)
)
.group_by(
DocumentByConnectorCredentialPair.connector_id,
Expand Down Expand Up @@ -382,18 +387,40 @@ def upsert_document_by_connector_credential_pair(
id=doc_id,
connector_id=connector_id,
credential_id=credential_id,
has_been_indexed=False,
)
)
for doc_id in document_ids
]
)
# for now, there are no columns to update. If more metadata is added, then this
# needs to change to an `on_conflict_do_update`
# this must be `on_conflict_do_nothing` rather than `on_conflict_do_update`
# since we don't want to update the `has_been_indexed` field for documents
# that already exist
on_conflict_stmt = insert_stmt.on_conflict_do_nothing()
db_session.execute(on_conflict_stmt)
db_session.commit()


def mark_document_as_indexed_for_cc_pair__no_commit(
db_session: Session,
connector_id: int,
credential_id: int,
document_ids: Iterable[str],
) -> None:
"""Should be called only after a successful index operation for a batch."""
db_session.execute(
update(DocumentByConnectorCredentialPair)
.where(
and_(
DocumentByConnectorCredentialPair.connector_id == connector_id,
DocumentByConnectorCredentialPair.credential_id == credential_id,
DocumentByConnectorCredentialPair.id.in_(document_ids),
)
)
.values(has_been_indexed=True)
)


def update_docs_updated_at__no_commit(
ids_to_new_updated_at: dict[str, datetime],
db_session: Session,
Expand Down
14 changes: 14 additions & 0 deletions backend/onyx/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,12 @@ class DocumentByConnectorCredentialPair(Base):
ForeignKey("credential.id"), primary_key=True
)

# used to better keep track of document counts at a connector level
# e.g. if a document is added as part of permission syncing, it should
# not be counted as part of the connector's document count until
# the actual indexing is complete
has_been_indexed: Mapped[bool] = mapped_column(Boolean)

connector: Mapped[Connector] = relationship(
"Connector", back_populates="documents_by_connector"
)
Expand All @@ -955,6 +961,14 @@ class DocumentByConnectorCredentialPair(Base):
"credential_id",
unique=False,
),
# Index to optimize get_document_counts_for_cc_pairs query pattern
Index(
"idx_document_cc_pair_counts",
"connector_id",
"credential_id",
"has_been_indexed",
unique=False,
),
)


Expand Down
Loading

0 comments on commit 342bb9f

Please sign in to comment.