forked from OpenMTC/OpenMTC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
data-aggregation.py
102 lines (89 loc) · 3.41 KB
/
data-aggregation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import time
from collections import deque
from math import sqrt
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class DataAggregation(XAE):
remove_registration = True
remote_cse = '/mn-cse-1/onem2m'
period = 10
def _on_register(self):
# init variables
self.sensor_register = {}
self.dev_cnt_list = []
# start endless loop
self.periodic_discover(self.remote_cse,
{'labels': ["openmtc:sensor_data"]},
self.period, self.handle_discovery_sensor)
@staticmethod
def _time():
return format(round(time.time(), 3), '.3f')
def handle_discovery_sensor(self, discovery):
for uri in discovery:
self.sensor_register[uri] = {
'values': deque([], 10)
}
content = self.get_content(uri)
if content:
self.handle_sensor(uri, content)
self.add_container_subscription(uri, self.handle_sensor)
def create_sensor_structure(self, sensor_entry, content):
# dev_cnt
cnt_name = '_'.join(content[0]['bn'].split(':')[2:])
cnt_name += '_' + content[0]['n']
dev_cnt = Container(resourceName=cnt_name)
if dev_cnt not in self.dev_cnt_list:
sensor_entry['dev_cnt'] = dev_cnt = self.create_container(None, dev_cnt)
# mean cnt
mean_cnt = Container(resourceName='mean', labels=["openmtc:mean_data"])
sensor_entry['mean_cnt'] = self.create_container(dev_cnt, mean_cnt)
# Standard_deviation cnt
deviation_cnt = Container(resourceName='Standard_deviation', labels=["openmtc:Standard_deviation_data"])
sensor_entry['deviation_cnt'] = self.create_container(dev_cnt, deviation_cnt)
self.dev_cnt_list.append(dev_cnt)
else:
return dev_cnt,"already exists "
def handle_sensor(self, container, content):
sensor_entry = self.sensor_register[container]
values = sensor_entry['values']
try :
values.append(content[0]['v'])
except KeyError:
return
# check if container exists
try:
sensor_entry['dev_cnt']
except KeyError:
self.create_sensor_structure(sensor_entry, content)
# mean value
mean = sum(values) / len(values)
data = [{
'bn': content[0]['bn'],
'n': content[0]['n'] + '_mean',
'v': mean,
't': self._time()
}]
# Standard_deviation value
num_item = len(values)
standard_mean = sum(values) / num_item
differences = [((x - standard_mean) ** 2) ** 2 for x in values]
ssd = sum(differences)
variance = ssd / num_item
sd = sqrt(variance)
print sd
deviation_data = [{
'bn': content[0]['bn'],
'n': content[0]['n'] + '_Standard_deviation',
'v': sd,
't': self._time()
}]
try:
data[0]['u'] = content[0]['u']
except KeyError:
pass
self.push_content(sensor_entry['mean_cnt'], data)
self.push_content(sensor_entry['deviation_cnt'], deviation_data)
if __name__ == "__main__":
from openmtc_app.flask_runner import SimpleFlaskRunner as Runner
ep = "http://localhost:8000"
Runner(DataAggregation(), port=6050, host='auto').run(ep)