-
Notifications
You must be signed in to change notification settings - Fork 1
/
archiver_proxy.py
119 lines (94 loc) · 3.47 KB
/
archiver_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
import datetime
from itertools import count
from pathlib import Path
import urllib
from dateutil import parser, tz
from dateutil.relativedelta import relativedelta
import yaml
from caproto.server import PVGroup, template_arg_parser, pvproperty, run, SubGroup
import httpx
import numpy as np
KEYS = ("mean", "std", "min", "max", "num_samples")
async def get_data(url, keys=KEYS):
async with httpx.AsyncClient() as client:
r = await client.get(url)
payload = r.json()[0]
n_vals = len(payload["data"])
# TODO also do the alarms
out = {k: np.zeros(n_vals) for k in keys}
out["time"] = np.zeros(n_vals)
# transpose the data
for j, step in enumerate(payload["data"]):
out["time"][j] = step["secs"]
if isinstance(step["val"], float):
out['mean'][j] = step["val"]
else:
for k, v in zip(keys, step["val"]):
out[k][j] = v
return out
def format_url(base: str, pv: str, since: datetime.datetime, until: datetime.datetime):
query = {
"pv": f"optimized_800({pv})",
"from": since.isoformat(),
"to": until.isoformat(),
"fetchLatestMetadata": True,
}
return f"{base}/retrieval/data/getData.json?{urllib.parse.urlencode(query)}"
class ArchiverProxy(PVGroup):
base_url: str
target_pv: str
# TODO do all of the keys
mean = pvproperty(
name=":archived_{window}_mean", dtype=float, max_length=850, value=[]
)
time = pvproperty(
name=":archived_{window}_timebase", dtype=float, max_length=850, value=[]
)
read_count = pvproperty(name=":read_counter", dtype=int, value=0)
def __init__(self, base_url: str, pv: str, window: int, **kwargs):
super().__init__(**kwargs)
self.base_url = base_url
self.target_pv = pv
self.window_in_hours = window
@read_count.scan(period=60)
async def read_count(self, instance, async_lib):
payload = await get_data(self.get_current_url())
for k in KEYS + ("time",):
if k not in ("time", "mean"):
continue
await getattr(self, k).write(payload[k])
await instance.write(instance.value + 1)
def get_current_url(self):
now = datetime.datetime.now(tz.UTC)
then = now + relativedelta(hours=-self.window_in_hours)
return format_url(self.base_url, self.target_pv, then, now)
if __name__ == "__main__":
parser, split_args = template_arg_parser(
default_prefix="",
desc="Proxy queries to the archiver to a wavefrom for phebous web.",
)
parser.add_argument(
"--config", help="path to configrutaion file to use", required=True, type=Path
)
args = parser.parse_args()
ioc_options, run_options = split_args(args)
print(args.config)
with open(args.config, "r") as fin:
config = list(yaml.safe_load(fin.read()))
pv_count = count()
body = {}
print(config)
for archiver in config:
for j, pv_spec in zip(pv_count, archiver["pvs"]):
body[f"pv{j}"] = SubGroup(
ArchiverProxy,
base_url=archiver["archiver_url"],
pv=pv_spec["name"],
prefix=pv_spec["name"].replace("{", "{{").replace("}", "}}"),
macros={"window": f'{pv_spec["window"]}h'},
window=pv_spec["window"],
)
IOCClass = type("IOCClass", (PVGroup,), body)
ioc = IOCClass(**ioc_options)
run(ioc.pvdb, **run_options)