Skip to content

Commit

Permalink
Fix blocking I/O in shares Parser (#13)
Browse files Browse the repository at this point in the history
* Make msg_parser async

* use httpx in shares parser

* run msg_parser as async task

* remove requests from requirements.txt

* fix indentation
  • Loading branch information
chris-mc1 authored Sep 9, 2024
1 parent c203536 commit f0c4800
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 88 deletions.
10 changes: 6 additions & 4 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


class UnRAIDServer(object):
def __init__(self, mqtt_config, unraid_config):
def __init__(self, mqtt_config, unraid_config, loop: asyncio.AbstractEventLoop):
# Unraid config
unraid_host = unraid_config.get('host')
unraid_port = unraid_config.get('port')
Expand Down Expand Up @@ -50,6 +50,8 @@ def __init__(self, mqtt_config, unraid_config):
unraid_logger.setFormatter(unraid_logger_formatter)
self.logger.addHandler(unraid_logger)

self.loop = loop

def on_connect(self, client, flags, rc, properties):
self.logger.info('Successfully connected to mqtt server')

Expand Down Expand Up @@ -237,13 +239,13 @@ async def ws_connect(self):
if sub_channel not in self.mqtt_history:
self.logger.info(f'Create config for {sub_channel}')
self.mqtt_history[sub_channel] = (time.time() - self.scan_interval)
msg_parser(self, msg_data, create_config=True)
self.loop.create_task(msg_parser(self, msg_data, create_config=True))

# Parse content
if self.scan_interval <= (time.time() - self.mqtt_history.get(sub_channel, time.time())):
self.logger.info(f'Parse data for {sub_channel}')
self.mqtt_history[sub_channel] = time.time()
msg_parser(self, msg_data, create_config=False)
self.loop.create_task(msg_parser(self, msg_data, create_config=False))

except (httpx.ConnectTimeout, httpx.ConnectError):
self.logger.error('Failed to connect to unraid due to a timeout or connection issue...')
Expand Down Expand Up @@ -280,7 +282,7 @@ async def ws_connect(self):

# Create unraid instances
for unraid_config in config.get('unraid'):
UnRAIDServer(config.get('mqtt'), unraid_config)
UnRAIDServer(config.get('mqtt'), unraid_config, loop)

# Loop forever
loop.run_forever()
165 changes: 83 additions & 82 deletions app/parsers.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import re
import math
import requests
import httpx
from lxml import etree
from utils import Preferences
from humanfriendly import parse_size


def default(self, msg_data, create_config):
async def default(self, msg_data, create_config):
pass


def session(self, msg_data, create_config):
async def session(self, msg_data, create_config):
self.csrf_token = msg_data


def cpuload(self, msg_data, create_config):
async def cpuload(self, msg_data, create_config):
prefs = Preferences(msg_data)
state_value = int(prefs.as_dict()['cpu']['host'])

Expand All @@ -28,7 +28,7 @@ def cpuload(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', state_value, create_config=create_config)


def disks(self, msg_data, create_config):
async def disks(self, msg_data, create_config):
prefs = Preferences(msg_data)
disks = prefs.as_dict()

Expand Down Expand Up @@ -57,7 +57,7 @@ def disks(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', disk_temp, json_attributes, create_config=create_config, retain=True)


def shares(self, msg_data, create_config):
async def shares(self, msg_data, create_config):
prefs = Preferences(msg_data)
shares = prefs.as_dict()

Expand All @@ -72,77 +72,78 @@ def shares(self, msg_data, create_config):

if share_use_cache in ['no', 'yes', 'prefer']:

# unRAID 6.11
if self.unraid_version.startswith('6.11'):

# Auth header
headers = {'Cookie': self.unraid_cookie + ';ssz=ssz'}

# Calculate used space
params = {
'cmd': '/webGui/scripts/share_size',
'arg1': share_nameorig,
'arg2': 'ssz1',
'arg3': share_cachepool,
'csrf_token': self.csrf_token
}
requests.get(f'{self.unraid_url}/update.htm', params=params, headers=headers)

# Read result
params = {
'compute': 'no',
'path': 'Shares',
'scale': 1,
'fill': 'ssz',
'number': '.'
}

r = requests.get(f'{self.unraid_url}/webGui/include/ShareList.php', params=params, headers=headers)

# unRAID 6.12+
else:

# Auth header
headers = {'Cookie': self.unraid_cookie}

# Read result
data = {
'compute': share_nameorig,
'path': 'Shares',
'all': 1,
'csrf_token': self.csrf_token
}

r = requests.get(f'{self.unraid_url}/webGui/include/ShareList.php', data=data, headers=headers)

if r.ok:
tree = etree.HTML(r.text)

size_total_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[6]/text()')
size_total_used = next(iter(size_total_used or []), '0').strip()
size_total_used = parse_size(size_total_used)

size_total_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[7]/text()')
size_total_free = next(iter(size_total_free or []), '0').strip()
size_total_free = parse_size(size_total_free)

size_cache_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[6]/text()')
size_cache_used = next(iter(size_cache_used or []), '0').strip()
size_cache_used = parse_size(size_cache_used)

size_cache_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[7]/text()')
size_cache_free = next(iter(size_cache_free or []), '0').strip()
size_cache_free = parse_size(size_cache_free)

# # Debug
# from humanfriendly import format_size
# print(f'Share: {share_nameorig}')
# print(f'Used (total): {format_size(size_total_used)} Free (total): {format_size(size_total_free)}')
# print(f'Used (cache): {format_size(size_cache_used)} Free (total): {format_size(size_cache_free)}')

# Recalculate used and free space, converted from bytes to kbytes
share['used'] = int(size_total_used / 1000)
share['free'] = int((size_total_free - size_cache_free - size_cache_used) / 1000)
async with httpx.AsyncClient() as http:

# unRAID 6.11
if self.unraid_version.startswith('6.11'):

# Auth header
headers = {'Cookie': self.unraid_cookie + ';ssz=ssz'}

# Calculate used space
params = {
'cmd': '/webGui/scripts/share_size',
'arg1': share_nameorig,
'arg2': 'ssz1',
'arg3': share_cachepool,
'csrf_token': self.csrf_token
}
await http.get(f'{self.unraid_url}/update.htm', params=params, headers=headers)

# Read result
params = {
'compute': 'no',
'path': 'Shares',
'scale': 1,
'fill': 'ssz',
'number': '.'
}

r = await http.get(f'{self.unraid_url}/webGui/include/ShareList.php', params=params, headers=headers, timeout=600)

# unRAID 6.12+
else:

# Auth header
headers = {'Cookie': self.unraid_cookie}

# Read result
data = {
'compute': share_nameorig,
'path': 'Shares',
'all': 1,
'csrf_token': self.csrf_token
}
r = await http.request("GET", url=f'{self.unraid_url}/webGui/include/ShareList.php', data=data, headers=headers, timeout=600)

if r.ok:
tree = etree.HTML(r.text)

size_total_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[6]/text()')
size_total_used = next(iter(size_total_used or []), '0').strip()
size_total_used = parse_size(size_total_used)

size_total_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[7]/text()')
size_total_free = next(iter(size_total_free or []), '0').strip()
size_total_free = parse_size(size_total_free)

size_cache_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[6]/text()')
size_cache_used = next(iter(size_cache_used or []), '0').strip()
size_cache_used = parse_size(size_cache_used)

size_cache_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[7]/text()')
size_cache_free = next(iter(size_cache_free or []), '0').strip()
size_cache_free = parse_size(size_cache_free)

# # Debug
# from humanfriendly import format_size
# print(f'Share: {share_nameorig}')
# print(f'Used (total): {format_size(size_total_used)} Free (total): {format_size(size_total_free)}')
# print(f'Used (cache): {format_size(size_cache_used)} Free (total): {format_size(size_cache_free)}')

# Recalculate used and free space, converted from bytes to kbytes
share['used'] = int(size_total_used / 1000)
share['free'] = int((size_total_free - size_cache_free - size_cache_used) / 1000)

# Skip empty shares
if share['used'] == 0:
Expand All @@ -169,7 +170,7 @@ def shares(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', share_used_pct, json_attributes, create_config=create_config, retain=True)


def temperature(self, msg_data, create_config):
async def temperature(self, msg_data, create_config):
tree = etree.HTML(msg_data)
sensors = tree.xpath('.//span[@title]')

Expand Down Expand Up @@ -205,7 +206,7 @@ def temperature(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', device_value, create_config=create_config)


def update1(self, msg_data, create_config):
async def update1(self, msg_data, create_config):
memory_categories = ['RAM', 'Flash', 'Log', 'Docker']
for (memory_name, memory_usage) in zip(memory_categories, re.findall(re.compile(r'(\d+%)'), msg_data)):
memory_value = ''.join(c for c in memory_usage if c.isdigit())
Expand Down Expand Up @@ -240,7 +241,7 @@ def update1(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', fan_value, create_config=create_config)


def update3(self, msg_data, create_config):
async def update3(self, msg_data, create_config):
network_download = 0
network_upload = 0

Expand Down Expand Up @@ -272,7 +273,7 @@ def update3(self, msg_data, create_config):
self.mqtt_publish(payload_upload, 'sensor', network_upload, create_config=create_config)


def parity(self, msg_data, create_config):
async def parity(self, msg_data, create_config):
data = msg_data.split(';')

if len(data) < 5:
Expand Down Expand Up @@ -303,7 +304,7 @@ def parity(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', state_value, json_attributes, create_config=create_config)


def var(self, msg_data, create_config):
async def var(self, msg_data, create_config):
msg_data = f'[var]\n{msg_data}'
prefs = Preferences(msg_data)
var = prefs.as_dict()
Expand Down
3 changes: 1 addition & 2 deletions app/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ pyreadline3==3.4.1
PyYAML==6.0.1
rfc3986==1.5.0
sniffio==1.2.0
websockets==10.3
requests==2.28.1
websockets==10.3

0 comments on commit f0c4800

Please sign in to comment.