-
Notifications
You must be signed in to change notification settings - Fork 130
/
Copy pathlist_compute_submitrun_runs.py
42 lines (34 loc) · 1.57 KB
/
list_compute_submitrun_runs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!env python3
import logging
import sys
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
)
w = WorkspaceClient()
# we set expand_tasks to true because the cluster information will exist in the tasks
job_runs = w.jobs.list_runs(expand_tasks=True)
for run in job_runs:
# filter to SubmitRun jobs
if run.run_type == jobs.RunType.SUBMIT_RUN:
tasks = run.tasks
compute_used = []
# Iterate over tasks in the run
for task in run.tasks:
'''
- Tasks with All Purpose clusters will have an existing_cluster_id
- Tasks with a Jobs cluster will have the new_cluster represented as ClusterSpec
- SQL tasks will have a sql_warehouse_id
'''
task_compute = (
{"existing_cluster_id": task.existing_cluster_id} if task.existing_cluster_id else
{"new_cluster": task.new_cluster} if task.new_cluster else
{"sql_warehouse_id": task.sql_task.warehouse_id} if task.sql_task else
{}
)
# Append the task compute info to a list for the job
compute_used.append(task_compute)
logging.info(f"run_id: {run.run_id}, compute_used: {compute_used}")