Skip to content

Commit

Permalink
OemGatewayBuffer: if send fails, buffer and retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Jérôme Lafréchoux committed Jun 21, 2013
1 parent 2d96eea commit 46e68cb
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 19 deletions.
5 changes: 5 additions & 0 deletions oemgateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def run(self):
for b in self._buffers.itervalues():
b.send(values)

# For all buffers
for b in self._buffers.itervalues():
# Execture run method
b.run()

# Sleep until next iteration
time.sleep(0.2);

Expand Down
58 changes: 39 additions & 19 deletions oemgatewaybuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self):
self._log = logging.getLogger("OemGateway")

# Initialize variables
# self._data_buffer = []
self._data_buffer = []
self._settings = {}

def set(self, **kwargs):
Expand All @@ -49,7 +49,7 @@ def set(self, **kwargs):
def send(self, data):
"""Send data to server.
TODO: In case of send error, buffer data to retry later.
In case of send error, buffer data to retry later.
data (list): node and values (eg: '[node,val1,val2,...]')
Expand All @@ -62,26 +62,17 @@ def send(self, data):
self._settings['domain'] + self._settings['path'] +
" -> send data: " + str(data))

# Distinct copy: we don't want to modify data
datacopy = list(data)

# Try to send data, if failure, store to resend later
self.send_data(datacopy)
# TODO:
# - detect failure, store data in buffer array
# - retry sending and flush buffer regularly
# - write to file when buffer size too big
#if not self.send_data(datacopy):
# Insert timestamp before data
# datacopy.insert(0,time.time())
# Append data set [timestamp, node, val1, val2, val3,...]
if not self._send_data(data):
# Append data set [timestamp, [node, val1, val2, val3,...]]
# to _data_buffer
# self._data_buffer.append(datacopy)
self._data_buffer.append([int(time.time()), data])

def send_data(self, data):
def _send_data(self, data, time=None):
"""Send data to server.
data (list): node and values (eg: '[node,val1,val2,...]')
time (int): timestamp, time when sample was recorded
return True if data sent correctly
Expand All @@ -90,6 +81,30 @@ def send_data(self, data):
"""
pass

def run(self):
"""Placeholder for background tasks.
Buffer management: Retry to send data that couldn't be sent
Can be extended in subclasses to add specific actions to be executed
on a regular basis.
"""

# Buffer management
# If data buffer not empty, try to send a sample
if self._data_buffer != []:
time, data = self._data_buffer[0]
self._log.debug("Server " +
self._settings['domain'] + self._settings['path'] +
" -> send again data: " + str(data))
if self._send_data(data, time=time):
del self._data_buffer[0]
# If buffer size reaches maximum, trash oldest values
# TODO: optionnal write to file instead of losing data
size = len(self._data_buffer)
if size > 1000:
self._data_buffer = self._data_buffer[size - 1000:]

"""class OemGatewayEmoncmsBuffer
Expand All @@ -98,13 +113,18 @@ def send_data(self, data):
"""
class OemGatewayEmoncmsBuffer(OemGatewayBuffer):

def send_data(self, data):
def _send_data(self, data, time=None):
"""Send data to server."""

# Prepare data string with the values in data buffer
data_string = ''
# Timestamp
if time is not None:
data_string += '&time=' + str(time)
# Node ID
data_string = 'node=' + str(data[0]) + '&json={'
data_string += '&node=' + str(data[0])
# Data
data_string += '&json={'
for i, val in enumerate(data[1:]):
data_string += str(i+1) + ':' + str(val)
data_string += ','
Expand All @@ -117,7 +137,7 @@ def send_data(self, data):
# &node=10&json={1:1806, 2:1664}'
url_string = self._settings['protocol'] + self._settings['domain'] + \
self._settings['path'] + '/input/post.json?apikey=' + \
self._settings['apikey'] + '&' + data_string
self._settings['apikey'] + data_string
self._log.debug("URL string: " + url_string)

# Send data to server
Expand Down

0 comments on commit 46e68cb

Please sign in to comment.