Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/log despam #2022

Merged
merged 2 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions backend/danswer/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,9 @@ def _run_indexing(
run_dt=run_end_dt,
)

elapsed_time = time.time() - start_time
logger.info(
f"Indexed or refreshed {document_count} total documents for a total of {chunk_count} indexed chunks"
)
logger.info(
f"Connector successfully finished, elapsed time: {time.time() - start_time} seconds"
f"Connector succeeded: docs={document_count} chunks={chunk_count} elapsed={elapsed_time:.2f}s"
)


Expand Down Expand Up @@ -330,17 +328,19 @@ def run_indexing_entrypoint(index_attempt_id: int, is_ee: bool = False) -> None:
attempt = _prepare_index_attempt(db_session, index_attempt_id)

logger.info(
f"Running indexing attempt for connector: '{attempt.connector_credential_pair.connector.name}', "
f"with config: '{attempt.connector_credential_pair.connector.connector_specific_config}', and "
f"with credentials: '{attempt.connector_credential_pair.connector_id}'"
f"Indexing starting: "
f"connector='{attempt.connector_credential_pair.connector.name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'"
)

_run_indexing(db_session, attempt)

logger.info(
f"Completed indexing attempt for connector: '{attempt.connector_credential_pair.connector.name}', "
f"with config: '{attempt.connector_credential_pair.connector.connector_specific_config}', and "
f"with credentials: '{attempt.connector_credential_pair.connector_id}'"
f"Indexing finished: "
f"connector='{attempt.connector_credential_pair.connector.name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'"
)
except Exception as e:
logger.exception(f"Indexing job with ID '{index_attempt_id}' failed due to {e}")
36 changes: 26 additions & 10 deletions backend/danswer/background/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,13 @@ def kickoff_indexing_jobs(
if attempt.id not in existing_jobs
]

logger.info(f"Found {len(new_indexing_attempts)} new indexing tasks.")
logger.debug(f"Found {len(new_indexing_attempts)} new indexing task(s).")

if not new_indexing_attempts:
return existing_jobs

indexing_attempt_count = 0

for attempt, embedding_model in new_indexing_attempts:
use_secondary_index = (
embedding_model.status == IndexModelStatus.FUTURE
Expand Down Expand Up @@ -329,15 +331,29 @@ def kickoff_indexing_jobs(
)

if run:
secondary_str = "(secondary index) " if use_secondary_index else ""
if indexing_attempt_count == 0:
logger.info(
f"Indexing dispatch starts: pending={len(new_indexing_attempts)}"
)

indexing_attempt_count += 1
secondary_str = " (secondary index)" if use_secondary_index else ""
logger.info(
f"Kicked off {secondary_str}"
f"indexing attempt for connector: '{attempt.connector_credential_pair.connector.name}', "
f"with config: '{attempt.connector_credential_pair.connector.connector_specific_config}', and "
f"with credentials: '{attempt.connector_credential_pair.credential_id}'"
f"Indexing dispatched{secondary_str}: "
f"connector='{attempt.connector_credential_pair.connector.name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' "
f"credentials='{attempt.connector_credential_pair.credential_id}'"
)
existing_jobs_copy[attempt.id] = run

if indexing_attempt_count > 0:
logger.info(
f"Indexing dispatch results: "
f"initial_pending={len(new_indexing_attempts)} "
f"started={indexing_attempt_count} "
f"remaining={len(new_indexing_attempts) - indexing_attempt_count}"
)

return existing_jobs_copy


Expand All @@ -355,7 +371,7 @@ def update_loop(
# batch of documents indexed

if db_embedding_model.cloud_provider_id is None:
logger.info("Running a first inference to warm up embedding model")
logger.debug("Running a first inference to warm up embedding model")
warm_up_encoders(
embedding_model=db_embedding_model,
model_server_host=INDEXING_MODEL_SERVER_HOST,
Expand Down Expand Up @@ -392,11 +408,11 @@ def update_loop(
while True:
start = time.time()
start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"Running update, current UTC time: {start_time_utc}")
logger.debug(f"Running update, current UTC time: {start_time_utc}")

if existing_jobs:
# TODO: make this debug level once the "no jobs are being scheduled" issue is resolved
logger.info(
logger.debug(
"Found existing indexing jobs: "
f"{[(attempt_id, job.status) for attempt_id, job in existing_jobs.items()]}"
)
Expand All @@ -422,7 +438,7 @@ def update__main() -> None:
set_is_ee_based_on_env_variable()
init_sqlalchemy_engine(POSTGRES_INDEXER_APP_NAME)

logger.info("Starting Indexing Loop")
logger.info("Starting indexing service")
update_loop()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def warm_up_encoders(
)

# May not be the exact same tokenizer used for the indexing flow
logger.info(f"Warming up encoder model: {model_name}")
logger.debug(f"Warming up encoder model: {model_name}")
get_tokenizer(model_name=model_name, provider_type=provider_type).encode(
warm_up_str
)
Expand Down
Loading