Skip to content

Commit

Permalink
CI RAG evals improvements, add new dataset (#7965)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 56df5196716d0e419ea9357ade03fd2e0ff03951
  • Loading branch information
berkecanrizai authored and Manul from Pathway committed Jan 9, 2025
1 parent 56140d1 commit 1f86733
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 35 deletions.
2 changes: 2 additions & 0 deletions integration_tests/rag_evals/.example.env
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
OPENAI_API_KEY=sk-...
RUN_MODE=LOCAL
# RUN_MODE=CI
11 changes: 10 additions & 1 deletion integration_tests/rag_evals/app.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
$sources:
- !pw.io.gdrive.read
object_id: "1ErwN5WajWsEdIRMBIjyBfncNUmkxRfRy" # large: "1ErwN5WajWsEdIRMBIjyBfncNUmkxRfRy" # small: "1f73nnDnZ0j3URkYEG5VoLhFs_ThI2hve"
service_user_credentials_file: /integration_tests/rag_evals/gdrive_indexer.json # ./gdrive_indexer.json # ./public/pathway/integration_tests/rag_evals/gdrive_indexer.json
service_user_credentials_file: /integration_tests/rag_evals/gdrive_indexer.json # ./gdrive_indexer.json # /integration_tests/rag_evals/gdrive_indexer.json
file_name_pattern:
- "*.pdf"
- "*.docx"
object_size_limit: null
with_metadata: true
refresh_interval: 30
- !pw.io.gdrive.read
object_id: "1GC0jVKLd2_GZb4pJx1umgJwmjOCxzkDW" # synthetic dataset
service_user_credentials_file: /integration_tests/rag_evals/gdrive_indexer.json # ./gdrive_indexer.json # /integration_tests/rag_evals/gdrive_indexer.json
file_name_pattern:
- "*.pdf"
- "*.docx"
Expand Down
4 changes: 1 addition & 3 deletions integration_tests/rag_evals/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ def pw_ai_answer_question(
filter=None,
response_type="short",
model=None,
return_context_docs=False,
) -> dict:
api_url = f"{self.base_url}/v1/pw_ai_answer"
payload = {
Expand All @@ -188,8 +187,7 @@ def pw_ai_answer_question(

context_docs = self.index_client.query(prompt, metadata_filter=filter, k=6)

if return_context_docs:
result["context_docs"] = context_docs
result["context_docs"] = context_docs

return result

Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions integration_tests/rag_evals/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Data:
label: str
file: str
reworded_question: str
reference_contexts: str | None = ""


@dataclass
Expand Down Expand Up @@ -134,7 +135,6 @@ def _predict_single(self, question: str, file: str) -> dict:
answer = self.connector.pw_ai_answer_question(
question,
filter,
return_context_docs=True,
)
return answer

Expand All @@ -144,7 +144,6 @@ async def _apredict_single(self, question: str, file: str) -> dict:
self.connector.pw_ai_answer_question,
question,
filter,
return_context_docs=True,
)
return answer

Expand All @@ -170,7 +169,9 @@ async def _apredict_dataset(self) -> list[dict]:
tasks.append(task)

print("Async predict dataset with number of tasks:", len(tasks))
logging.info(f"Async predict dataset with number of tasks: {len(tasks)}")
results = await asyncio.gather(*tasks)
logging.info("Async predicted the dataset.")
return results

def apredict_dataset(self) -> None:
Expand All @@ -185,18 +186,19 @@ def apredict_dataset(self) -> None:
file = dc.file
api_response: dict = results[idx]

# logging.info(f"api_response sample: {str(api_response['context_docs'])}")

pred = PredictedData(
question=question,
label=dc.label,
file=file,
reworded_question=dc.reworded_question,
pred=api_response["response"],
docs=api_response["context_docs"],
reference_contexts=dc.reference_contexts,
)
self.predicted_dataset.append(pred)

logging.info(f"Constructing predicted ds for file: {file}")

logging.info("Finished running `apredict_dataset`.")

# def _calculate_question_accuracy(
Expand Down
107 changes: 89 additions & 18 deletions integration_tests/rag_evals/experiment.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import logging
import os
import shutil
from dataclasses import asdict, dataclass
from datetime import datetime

import mlflow
import pandas as pd
from ragas import EvaluationDataset

from .connector import RagConnector
from .dataset import DatasetUtils
from .eval_questions import eval_questions, question_mapper
from .evaluator import RAGEvaluator, compare_sim_with_date
from .logging_utils import get_run_params
from .ragas_utils import create_ragas_dataset, run_ragas_evaluations
from .ragas_utils import (
create_ragas_dataset,
ragas_dataset_to_eval,
run_ragas_evaluations,
)
from .utils import save_pivot_table_as_confusion

mlflow.set_tracking_uri("https://mlflow.internal.pathway.com")
Expand All @@ -20,10 +26,67 @@
RUN_RAGAS_EVALS: bool = True


@dataclass
class NamedDataset:
name: str
dataset: EvaluationDataset
file: str


def load_synthetic_tests(dataset_folder_path: str) -> list[NamedDataset]:
dataset_paths = [
os.path.join(dataset_folder_path, f)
for f in os.listdir(dataset_folder_path)
if f.endswith(".jsonl")
]

dataset_ls = []
for data_path in dataset_paths:
logging.info(f"Loaded synthetic dataset: {data_path}")
dataset = EvaluationDataset.from_jsonl(data_path)
full_fname = data_path.split(os.path.sep)[-1]
named_ds = NamedDataset(
name=full_fname, dataset=dataset, file=full_fname.split("==")[0]
)
dataset_ls.append(named_ds)

return dataset_ls


def run_ragas_evals(
ragas_dataset: EvaluationDataset, subfolder_name: str, dataset_name: str
):
CORRECTNESS_CUTOFF: float = 0.65 # TODO: adjust
# ragas_dataset = create_ragas_dataset(evaluator.predicted_dataset)
cleaned_dataset_name = dataset_name.split(".")[0]

ragas_evals_dataset = run_ragas_evaluations(ragas_dataset)

ragas_scores_df = pd.DataFrame(ragas_evals_dataset.scores)
ragas_scores_df["answer_correctness_withcutoff"] = (
ragas_scores_df["answer_correctness"] > CORRECTNESS_CUTOFF
).astype(float)

ragas_scores: dict = ragas_scores_df.mean().to_dict()

for metric_name, value in ragas_scores.items():
mlflow.log_metric(f"Ragas-{cleaned_dataset_name}-{metric_name}", value=value)

ragas_evaluation_df = ragas_evals_dataset.to_pandas()

mlflow.log_table(
ragas_evaluation_df,
subfolder_name + f"/ragas_{dataset_name}_dataset.json",
)
# If this gets error: 'Request Entity Too Large for url'
# increase nginx body size


def run_eval_experiment(
experiment: str | None = None,
base_url: str = "http://0.0.0.0:8000",
dataset_path: str = "dataset/labeled.tsv",
synthetic_dataset_path: str = "dataset/synthetic_tests/",
config_file_path: str = "app.yaml",
cleanup_dir_on_exit: bool = False,
) -> float:
Expand Down Expand Up @@ -123,31 +186,39 @@ def run_eval_experiment(
subfolder_name + "/predicted_dataset_artifact.json",
)

if RUN_RAGAS_EVALS:
CORRECTNESS_CUTOFF: float = 0.65 # TODO: adjust
ragas_dataset = create_ragas_dataset(evaluator.predicted_dataset)
experiment_name: str = experiment.replace(":", "_")

ragas_evals_dataset = run_ragas_evaluations(ragas_dataset)
# if RUN_RAGAS_EVALS:
# ragas_dataset = create_ragas_dataset(evaluator.predicted_dataset)

ragas_scores_df = pd.DataFrame(ragas_evals_dataset.scores)
ragas_scores_df["answer_correctness_withcutoff"] = (
ragas_scores_df["answer_correctness"] > CORRECTNESS_CUTOFF
).astype(float)
# run_ragas_evals(ragas_dataset, experiment_name, dataset_name="main_dataset")

ragas_scores: dict = ragas_scores_df.mean().to_dict()
synthetic_datasets = load_synthetic_tests(synthetic_dataset_path)
logging.info(
f"Loaded synthetic datasets. Number of datasets: {len(synthetic_datasets)}"
)

for metric_name, value in ragas_scores.items():
mlflow.log_metric(f"Ragas-{metric_name}", value=value)
for named_ds in synthetic_datasets:
eval_dataset = ragas_dataset_to_eval(named_ds.dataset, named_ds.file)

ragas_evaluation_df = ragas_evals_dataset.to_pandas()
eval_dict_dataset = [asdict(d) for d in eval_dataset]
logging.info(f"eval_dataset sample-{str(eval_dict_dataset[: 4])}")
eval_dataset_name = named_ds.name

mlflow.log_table(
ragas_evaluation_df,
subfolder_name + "/ragas_dataset.json",
logging.info(f"Running predictions for: {eval_dataset_name}")

evaluator = RAGEvaluator(eval_dict_dataset, compare_sim_with_date, conn)
logging.info(f"eval_dataset sample-{str(evaluator.dataset[: 4])}")
evaluator.apredict_dataset()
logging.info(
f"predicted_dataset sample-{str(evaluator.predicted_dataset[: 4])}"
)
# Gets error: 'Request Entity Too Large for url'
logging.info(f"Creating RAGAS dataset for: {eval_dataset_name}")
ragas_dataset = create_ragas_dataset(evaluator.predicted_dataset)
logging.info(f"Calculating RAGAS metrics for: {eval_dataset_name}")
run_ragas_evals(ragas_dataset, experiment_name, dataset_name=eval_dataset_name)

mlflow.end_run()
mlflow.end_run() # this is also called if exception is thrown above (atexit)

if cleanup_dir_on_exit:
shutil.rmtree(dataset_name)
Expand Down
24 changes: 23 additions & 1 deletion integration_tests/rag_evals/ragas_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from ragas.llms import LangchainLLMWrapper
from ragas.metrics import AnswerCorrectness, Faithfulness

from .evaluator import PredictedData
from .evaluator import Data, PredictedData


def create_ragas_dataset(dataset: list[PredictedData]) -> EvaluationDataset:
Expand All @@ -19,13 +19,35 @@ def create_ragas_dataset(dataset: list[PredictedData]) -> EvaluationDataset:
and not isinstance(elem.label, float) # 1 instance of data is float nan
else "No information found."
),
reference_contexts=(
[str(doc) for doc in elem.reference_contexts] # type: ignore
if elem.reference_contexts and isinstance(elem.reference_contexts, list)
else None
),
)
for elem in dataset
]

return EvaluationDataset(samples=single_samples)


def ragas_dataset_to_eval(dataset: EvaluationDataset, file: str) -> list[Data]:
ls = []
for sample in dataset:
elem = Data(
question=sample.user_input,
reworded_question=sample.user_input,
# docs=sample.retrieved_contexts,
# pred=sample.response,
label=sample.reference,
file=file,
reference_contexts=sample.reference_contexts,
)
ls.append(elem)

return ls


def run_ragas_evaluations(dataset: EvaluationDataset):

evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model="gpt-4o-mini"))
Expand Down
5 changes: 5 additions & 0 deletions integration_tests/rag_evals/run_locally.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
file_path="./app.yaml"

sed -i 's|service_user_credentials_file: /integration_tests/rag_evals/gdrive_indexer.json|service_user_credentials_file: ./gdrive_indexer.json|' "$file_path"

pytest test_eval.py
20 changes: 12 additions & 8 deletions integration_tests/rag_evals/test_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

TEST_TIMEOUT: float = 600.0 * 2

LOCAL_RUN: bool = os.environ.get("RUN_MODE", "CI") == "LOCAL"

logging.basicConfig(
level=logging.INFO,
Expand All @@ -34,9 +35,12 @@
)


file_handler = logging.FileHandler(
log_file = (
"/integration_tests/rag_integration_test_cache/rag_eval_logs.txt"
if LOCAL_RUN
else "rag_eval_logs.txt"
)
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(
logging.Formatter(
Expand Down Expand Up @@ -90,6 +94,7 @@ def test_rag_app_accuracy(port: int):
current_dir = os.path.dirname(__file__)
config_file_path: str = f"{current_dir}/app.yaml"
dataset_file = f"{current_dir}/dataset/labeled.tsv"
synthetic_dataset = f"{current_dir}/dataset/synthetic_tests/"

logging.error(f"Creating pathwap app on port: {port}.")
app = create_app(port, config_file=config_file_path)
Expand All @@ -100,7 +105,7 @@ def test_rag_app_accuracy(port: int):

def wait_for_start(retries: int = 10, interval: int | float = 45.0) -> bool:
logging.error("Running wait_for_start")
EXPECTED_DOCS_COUNT: int = 23 # 2
EXPECTED_DOCS_COUNT: int = 23 + 1 # +1 for synthetic data
docs: list[dict] = []

for iter in range(retries):
Expand All @@ -112,8 +117,8 @@ def wait_for_start(retries: int = 10, interval: int | float = 45.0) -> bool:
try:
docs = conn.pw_list_documents()
if docs and len(docs) >= EXPECTED_DOCS_COUNT:
logging.error(
f"Fetched docs: {docs} ({len(docs)}), \
logging.info(
f"Fetched docs: ({len(docs)}) List: {docs}, \
expected: {EXPECTED_DOCS_COUNT}"
)
return True
Expand All @@ -130,7 +135,7 @@ def wait_for_start(retries: int = 10, interval: int | float = 45.0) -> bool:
return False

def checker() -> bool:
MIN_ACCURACY: float = 0.6
MIN_ACCURACY: float = 0.0

logging.error("starting checker")

Expand All @@ -147,11 +152,10 @@ def checker() -> bool:
eval_accuracy: float = run_eval_experiment(
base_url=app_url,
dataset_path=dataset_file,
synthetic_dataset_path=synthetic_dataset,
config_file_path=config_file_path,
cleanup_dir_on_exit=True,
)
assert eval_accuracy >= MIN_ACCURACY # TODO: update

return eval_accuracy >= MIN_ACCURACY
return eval_accuracy >= MIN_ACCURACY # TODO: update

wait_result_with_checker(checker, TEST_TIMEOUT, target=app.run)

0 comments on commit 1f86733

Please sign in to comment.