-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrequest_monitor.py
145 lines (125 loc) · 5.22 KB
/
request_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import json
import sys
import pymongo
import requests
from time import sleep, time
from traceback import print_exception
from typing import List
from datetime import datetime, timedelta
from loguru import logger
from lib.constant import *
from lib.state import State
from lib.tool import tool_utils
from lib.monitor.database_mgr import StateRecord
from lib.monitor.info_report import *
from lib.monitor import post_utils
from lib.utils import misc
from lib.monitor.extend_config import extend_run_config
# MAX_CONCURRENT_PIPELINE_NUM = 20
# WAIT_UNTIL_START = 15 * 60
WAIT_UNTIL_START = 30
# REQUEST_PERIOD = 60
REQUEST_PERIOD = 10
def request_due(r, d=3):
time = datetime.strptime(r["time_stamp"], "%Y%m%d_%H%M%S")
return datetime.now() - time > timedelta(days=d)
def compose_requests(records: List[StateRecord], info_report: InfoReport) -> List[dict]:
if len(records) == 0:
return []
res = []
for r in records:
r_dict = json.loads(r['request_json'])
try:
# _r_dict = extend_run_config(r_dict)
info_report.update_state(hash_id=r_dict[HASH_ID], state=State.POST_RECEIVE)
res.append(r_dict)
except Exception as e:
error_message = str(e)
logger.error(error_message)
info_report.update_state(hash_id=r_dict[HASH_ID], state=State.RECEIVE_ERROR)
info_report.update_error_message(
hash_id=r_dict[HASH_ID], error_msg=error_message
)
print_exception(*sys.exc_info())
return res
def pipelineWorker(request_dicts):
with tool_utils.tmpdir_manager(base_dir="/tmp") as tmpdir:
os.path.join(tmpdir, "requests.pkl")
# pip_request= {"requests" : request_dicts}
pipeline_url = f"http://10.0.0.12:8081/pipeline"
try:
logger.info(f"------- Requests of pipeline task: {request_dicts}")
requests.post(pipeline_url , json={'requests': request_dicts})
except Exception as e:
logger.error(str(e))
if __name__ == "__main__":
logger.configure(**MONITOR_LOGGING_CONFIG)
logger.info("------- Start to monitor...")
info_report = InfoReport()
last_received_time = time()
while True:
_cur_time = time()
try:
_requests = post_utils.pull_visible() # todo what if this fails
if len(_requests) > 0:
logger.info(f"------- The number of requests: {len(_requests)}")
except requests.exceptions.RequestException:
sleep(REQUEST_PERIOD)
continue
if len(_requests) == 0:
waited_time = _cur_time - last_received_time
# logger.info(f"------- The time has waited: {waited_time}")
if waited_time >= WAIT_UNTIL_START:
records = info_report.dbmgr.query(
{VISIBLE: 1, STATE: State.RECEIVED.name}
)
records = list(records)
logger.info(f"------- Received records: {records}")
if len(records) > 0:
for rcds in misc.chunk_generate(records, chunk_size=1):
try:
# todo
request_dicts = compose_requests(
rcds, info_report=info_report
)
logger.info(
f"start processing {len(request_dicts)} requests"
f"\n{json.dumps(request_dicts)}"
)
pipelineWorker(request_dicts)
except:
print_exception(*sys.exc_info())
# For test
break
last_received_time = _cur_time
sleep(REQUEST_PERIOD)
continue
for r in _requests:
if request_due(r) and "submit" not in r:
r["submit"] = False
try:
if not r[SENDER].startswith("test"):
info_report.insert_new_request(extend_run_config(r))
except pymongo.errors.PyMongoError as e:
logger.warning(
f"Error when update with hash_id {r[HASH_ID]} : {str(e)}\n "
f"retrying to reset existing record",
)
info_report.update_state(hash_id=r[HASH_ID], state=State.RECEIVED)
if "run_config" not in r:
logger.warning(
f"run_config not in {json.dumps(r)}, \n"
f"trying extending the request to "
f"{json.dumps(extend_run_config(r))}"
)
info_report.update_request(
hash_id=r[HASH_ID], request=extend_run_config(r)
)
info_report.update_visible(hash_id=r[HASH_ID], visible=1)
try:
post_utils.set_invisible(hash_id=r[HASH_ID])
except requests.exceptions.RequestException:
sleep(REQUEST_PERIOD)
continue
last_received_time = _cur_time
sleep(REQUEST_PERIOD)