# GraphRAG Quickstart

## Prerequisites
Install 3rd party packages, not part of the Python Standard Library, to run the notebook

In [None]:
! pip install devtools python-magic requests tqdm

In [None]:
import getpass
import json
import time
from pathlib import Path

import magic
import requests
from devtools import pprint
from tqdm import tqdm

## (REQUIRED) User Configuration
Set the API subscription key, API base endpoint, and some file directory names that will be referenced later in the notebook.

#### API subscription key

APIM supports multiple forms of authentication and access control (e.g. managed identity). For this notebook demonstration, we will use a **[subscription key](https://learn.microsoft.com/en-us/azure/api-management/api-management-subscriptions)**. To locate this key, visit the Azure Portal. The subscription key can be found under ` --> --> --> --> Primary Key`. For multiple API users, individual subscription keys can be generated.

In [None]:
ocp_apim_subscription_key = getpass.getpass(
 "Enter the subscription key to the GraphRag APIM:"
)

"""
"Ocp-Apim-Subscription-Key": 
 This is a custom HTTP header used by Azure API Management service (APIM) to 
 authenticate API requests. The value for this key should be set to the subscription 
 key provided by the Azure APIM instance in your GraphRAG resource group.
"""
headers = {"Ocp-Apim-Subscription-Key": ocp_apim_subscription_key}

#### Setup directories and API endpoint

For demonstration purposes, please use the provided `get-wiki-articles.py` script to download a small set of wikipedia articles or provide your own data (graphrag requires txt files to be utf-8 encoded).

In [None]:
"""
These parameters must be defined by the notebook user:

- file_directory: a local directory of text files. The file structure should be flat,
 with no nested directories. (i.e. file_directory/file1.txt, file_directory/file2.txt, etc.)
- storage_name: a unique name to identify a blob storage container in Azure where files
 from `file_directory` will be uploaded.
- index_name: a unique name to identify a single graphrag knowledge graph index.
 Note: Multiple indexes may be created from the same `storage_name` blob storage container.
- endpoint: the base/endpoint URL for the GraphRAG API (this is the Gateway URL found in the APIM resource).
"""

file_directory = ""
storage_name = ""
index_name = ""
endpoint = ""

In [None]:
assert (
 file_directory != "" and storage_name != "" and index_name != "" and endpoint != ""
)

## Upload Files

For a demonstration of how to index data in graphrag, we first need to ingest a few files into graphrag.

In [None]:
def upload_files(
 file_directory: str,
 storage_name: str,
 batch_size: int = 100,
 overwrite: bool = True,
 max_retries: int = 5,
) -> requests.Response | list[Path]:
 """
 Upload files to a blob storage container.

 Args:
 file_directory - a local directory of .txt files to upload. All files must have utf-8 encoding.
 storage_name - a unique name for the Azure storage blob container.
 batch_size - the number of files to upload in a single batch.
 overwrite - whether or not to overwrite files if they already exist in the storage blob container.
 max_retries - the maximum number of times to retry uploading a batch of files if the API is busy.

 NOTE: Uploading files may sometimes fail if the blob container was recently deleted
 (i.e. a few seconds before. The solution "in practice" is to sleep a few seconds and try again.
 """
 url = endpoint + "/data"

 def upload_batch(
 files: list, storage_name: str, overwrite: bool, max_retries: int
 ) -> requests.Response:
 for _ in range(max_retries):
 response = requests.post(
 url=url,
 files=files,
 params={"storage_name": storage_name, "overwrite": overwrite},
 headers=headers,
 )
 # API may be busy, retry
 if response.status_code == 500:
 print("API busy. Sleeping and will try again.")
 time.sleep(10)
 continue
 return response
 return response

 batch_files = []
 accepted_file_types = ["text/plain"]
 filepaths = list(Path(file_directory).iterdir())
 for file in tqdm(filepaths):
 # validate that file is a file, has acceptable file type, has a .txt extension, and has utf-8 encoding
 if (
 not file.is_file()
 or file.suffix != ".txt"
 or magic.from_file(str(file), mime=True) not in accepted_file_types
 ):
 print(f"Skipping invalid file: {file}")
 continue
 # open and decode file as utf-8, ignore bad characters
 batch_files.append(
 ("files", open(file=file, mode="r", encoding="utf-8", errors="ignore"))
 )
 # upload batch of files
 if len(batch_files) == batch_size:
 response = upload_batch(batch_files, storage_name, overwrite, max_retries)
 # if response is not ok, return early
 if not response.ok:
 return response
 batch_files.clear()
 # upload remaining files
 if len(batch_files) > 0:
 response = upload_batch(batch_files, storage_name, overwrite, max_retries)
 return response

In [None]:
response = upload_files(
 file_directory=file_directory,
 storage_name=storage_name,
 batch_size=100,
 overwrite=True,
)
if not response.ok:
 print(response.text)
else:
 print(response)

## Build an Index

After data files have been uploaded, we can construct a knowledge graph by building a search index.

In [None]:
def build_index(
 storage_name: str,
 index_name: str,
) -> requests.Response:
 """Create a search index.
 This function kicks off a job that builds a knowledge graph index from files located in a blob storage container.
 """
 url = endpoint + "/index"
 request = {"storage_name": storage_name, "index_name": index_name}
 return requests.post(url, params=request, headers=headers)

In [None]:
response = build_index(storage_name=storage_name, index_name=index_name)
print(response)
if response.ok:
 print(response.text)
else:
 print(f"Failed to submit job.\nStatus: {response.text}")

### Check status of an indexing job

Please wait for your index to reach 100 percent completion before continuing on to the next section (running queries). You may rerun the next cell multiple times to monitor status. Note: the indexing speed of graphrag is directly correlated to the TPM quota of the Azure OpenAI model you are using.

In [None]:
def index_status(index_name: str) -> requests.Response:
 url = endpoint + f"/index/status/{index_name}"
 return requests.get(url, headers=headers)


response = index_status(index_name)
pprint(response.json())

## Query

Once an indexing job is complete, the knowledge graph is ready to query. Two types of queries (global and local) are currently supported. We encourage you to try both and experience the difference in responses. Note that query response time is also correlated to the TPM quota of the Azure OpenAI model you are using.

In [None]:
# a helper function to parse out the result from a query response
def parse_query_response(
 response: requests.Response, return_context_data: bool = False
) -> requests.Response | dict[list[dict]]:
 """
 Prints response['result'] value and optionally
 returns associated context data.
 """
 if response.ok:
 print(json.loads(response.text)["result"])
 if return_context_data:
 return json.loads(response.text)["context_data"]
 return response
 else:
 print(response.reason)
 print(response.content)
 return response

### Global Query 

Global queries are resource-intensive, but provide good responses to questions that require an understanding of the dataset as a whole.

In [None]:
%%time


def global_search(index_name: str | list[str], query: str) -> requests.Response:
 """Run a global query over the knowledge graph(s) associated with one or more indexes"""
 url = endpoint + "/query/global"
 request = {"index_name": index_name, "query": query}
 return requests.post(url, json=request, headers=headers)


global_response = global_search(
 index_name=index_name, query="Summarize the main topics of this data"
)
global_response_data = parse_query_response(global_response, return_context_data=True)
global_response_data

### Local Query

Local search queries are best suited for narrow-focused questions that require an understanding of specific entities mentioned in the documents (e.g. What are the healing properties of chamomile?)

In [None]:
%%time


def local_search(index_name: str | list[str], query: str) -> requests.Response:
 """Run a local query over the knowledge graph(s) associated with one or more indexes"""
 url = endpoint + "/query/local"
 request = {"index_name": index_name, "query": query}
 return requests.post(url, json=request, headers=headers)


# perform a local query
local_response = local_search(
 index_name=index_name, query="Who are the primary actors in these communities?"
)
local_response_data = parse_query_response(local_response, return_context_data=True)
local_response_data