forked from Azure-Samples/graphrag-accelerator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanage-indexing-jobs.py
120 lines (108 loc) · 4.45 KB
/
manage-indexing-jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
A naive implementation of a job manager that leverages k8s CronJob and CosmosDB
to schedule graphrag indexing jobs in a first-come-first-serve manner (based on epoch time).
"""
import os
import pandas as pd
import yaml
from kubernetes import (
client,
config,
)
from src.api.azure_clients import AzureStorageClientManager
from src.api.common import sanitize_name
from src.models import PipelineJob
from src.reporting.reporter_singleton import ReporterSingleton
from src.typing.pipeline import PipelineJobState
def schedule_indexing_job(index_name: str):
"""
Schedule a k8s job to run graphrag indexing for a given index name.
"""
try:
config.load_incluster_config()
# get container image name
core_v1 = client.CoreV1Api()
pod_name = os.environ["HOSTNAME"]
pod = core_v1.read_namespaced_pod(
name=pod_name, namespace=os.environ["AKS_NAMESPACE"]
)
# retrieve job manifest template and replace necessary values
job_manifest = _generate_aks_job_manifest(
docker_image_name=pod.spec.containers[0].image,
index_name=index_name,
service_account_name=pod.spec.service_account_name,
)
batch_v1 = client.BatchV1Api()
batch_v1.create_namespaced_job(
body=job_manifest, namespace=os.environ["AKS_NAMESPACE"]
)
except Exception:
reporter = ReporterSingleton().get_instance()
reporter.on_error(
"Index job manager encountered error scheduling indexing job",
)
# In the event of a catastrophic scheduling failure, something in k8s or the job manifest is likely broken.
# Set job status to failed to prevent an infinite loop of re-scheduling
pipelinejob = PipelineJob()
pipeline_job = pipelinejob.load_item(sanitize_name(index_name))
pipeline_job["status"] = PipelineJobState.FAILED
def _generate_aks_job_manifest(
docker_image_name: str,
index_name: str,
service_account_name: str,
) -> dict:
"""Generate an AKS Jobs manifest file with the specified parameters.
The manifest must be valid YAML with certain values replaced by the provided arguments.
"""
# NOTE: this file location is relative to the WORKDIR set in Dockerfile-backend
with open("indexing-job-template.yaml", "r") as f:
manifest = yaml.safe_load(f)
manifest["metadata"]["name"] = f"indexing-job-{sanitize_name(index_name)}"
manifest["spec"]["template"]["spec"]["serviceAccountName"] = service_account_name
manifest["spec"]["template"]["spec"]["containers"][0]["image"] = docker_image_name
manifest["spec"]["template"]["spec"]["containers"][0]["command"] = [
"python",
"run-indexing-job.py",
f"-i={index_name}",
]
return manifest
def main():
azure_storage_client_manager = AzureStorageClientManager()
job_container_store_client = (
azure_storage_client_manager.get_cosmos_container_client(
database_name="graphrag", container_name="jobs"
)
)
# retrieve status for all jobs that are either scheduled or running
job_metadata = []
for item in job_container_store_client.read_all_items():
# exit if a job is running
if item["status"] == PipelineJobState.RUNNING.value:
print(
f"Indexing job for '{item['human_readable_index_name']}' already running. Will not schedule another. Exiting..."
)
exit()
if item["status"] == PipelineJobState.SCHEDULED.value:
job_metadata.append(
{
"human_readable_index_name": item["human_readable_index_name"],
"epoch_request_time": item["epoch_request_time"],
"status": item["status"],
"percent_complete": item["percent_complete"],
}
)
# exit if no jobs found
if not job_metadata:
print("No jobs found")
exit()
# convert to dataframe for easy processing
df = pd.DataFrame(job_metadata)
# jobs are run in the order they were requested - sort by epoch_request_time
df.sort_values(by="epoch_request_time", ascending=True, inplace=True)
index_to_schedule = df.iloc[0]["human_readable_index_name"]
print(f"Scheduling job for index: {index_to_schedule}")
schedule_indexing_job(index_to_schedule)
if __name__ == "__main__":
main()