forked from lafrech/oem_gateway
-
Notifications
You must be signed in to change notification settings - Fork 0
/
oemgatewaybuffer.py
160 lines (122 loc) · 5.2 KB
/
oemgatewaybuffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""
This code is released under the GNU Affero General Public License.
OpenEnergyMonitor project:
http://openenergymonitor.org
"""
import urllib2, httplib
import time
import logging
"""class OemGatewayBuffer
Stores server parameters and buffers the data between two HTTP requests
This class is meant to be inherited by subclasses specific to their
destination server.
"""
class OemGatewayBuffer(object):
def __init__(self):
"""Create a server data buffer initialized with server settings."""
# Initialize logger
self._log = logging.getLogger("OemGateway")
# Initialize variables
self._data_buffer = []
self._settings = {}
def set(self, **kwargs):
"""Update settings.
**kwargs (dict): settings to be modified.
domain (string): domain name (eg: 'domain.tld')
path (string): emoncms path with leading slash (eg: '/emoncms')
apikey (string): API key with write access
active (string): whether the data buffer is active (True/False)
"""
for key, value in kwargs.iteritems():
self._settings[key] = value
def add(self, data):
"""Append data to buffer.
data (list): node and values (eg: '[node,val1,val2,...]')
"""
if self._settings['active'] == 'False':
return
# Timestamp = now
t = round(time.time(),2)
self._log.debug("Server " +
self._settings['domain'] + self._settings['path'] +
" -> buffer data: " + str(data) +
", timestamp: " + str(t))
# Append data set [timestamp, [node, val1, val2, val3,...]]
# to _data_buffer
self._data_buffer.append([t, data])
def _send_data(self, data, time):
"""Send data to server.
data (list): node and values (eg: '[node,val1,val2,...]')
time (int): timestamp, time when sample was recorded
return True if data sent correctly
To be implemented in subclass.
"""
pass
def flush(self):
"""Send oldest data in buffer, if any."""
# Buffer management
# If data buffer not empty, send a set of values
if self._data_buffer != []:
time, data = self._data_buffer[0]
self._log.debug("Server " +
self._settings['domain'] + self._settings['path'] +
" -> send data: " + str(data) +
", timestamp: " + str(time))
if self._send_data(data, time):
# In case of success, delete sample set from buffer
del self._data_buffer[0]
# If buffer size reaches maximum, trash oldest values
# TODO: optionnal write to file instead of losing data
size = len(self._data_buffer)
if size > 1000:
self._data_buffer = self._data_buffer[size - 1000:]
"""class OemGatewayEmoncmsBuffer
Stores server parameters and buffers the data between two HTTP requests
"""
class OemGatewayEmoncmsBuffer(OemGatewayBuffer):
def _send_data(self, data, time):
"""Send data to server."""
# Prepare data string with the values in data buffer
data_string = ''
# Timestamp
data_string += '&time=' + str(time)
# Node ID
data_string += '&node=' + str(data[0])
# Data
data_string += '&json={'
for i, val in enumerate(data[1:]):
data_string += str(i+1) + ':' + str(val)
data_string += ','
# Remove trailing comma and close braces
data_string = data_string[0:-1]+'}'
self._log.debug("Data string: " + data_string)
# Prepare URL string of the form
# 'http://domain.tld/emoncms/input/post.json?apikey=12345
# &node=10&json={1:1806, 2:1664}'
url_string = self._settings['protocol'] + self._settings['domain'] + \
self._settings['path'] + '/input/post.json?apikey=' + \
self._settings['apikey'] + data_string
self._log.debug("URL string: " + url_string)
# Send data to server
self._log.info("Sending to " +
self._settings['domain'] + self._settings['path'])
try:
result = urllib2.urlopen(url_string, timeout=60)
except urllib2.HTTPError as e:
self._log.warning("Couldn't send to server, HTTPError: " +
str(e.code))
except urllib2.URLError as e:
self._log.warning("Couldn't send to server, URLError: " +
str(e.reason))
except httplib.HTTPException:
self._log.warning("Couldn't send to server, HTTPException")
except Exception:
import traceback
self._log.warning("Couldn't send to server, Exception: " +
traceback.format_exc())
else:
if (result.readline() == 'ok'):
self._log.debug("Send ok")
return True
else:
self._log.warning("Send failure")