From f0c4800bfef48bb5a81cd51d94864e6e8617b221 Mon Sep 17 00:00:00 2001 From: chris_mc1 <29151206+chris-mc1@users.noreply.github.com> Date: Mon, 9 Sep 2024 14:30:20 +0200 Subject: [PATCH] Fix blocking I/O in shares Parser (#13) * Make msg_parser async * use httpx in shares parser * run msg_parser as async task * remove requests from requirements.txt * fix indentation --- app/main.py | 10 +-- app/parsers.py | 165 ++++++++++++++++++++++--------------------- app/requirements.txt | 3 +- 3 files changed, 90 insertions(+), 88 deletions(-) diff --git a/app/main.py b/app/main.py index ea18776..db42334 100644 --- a/app/main.py +++ b/app/main.py @@ -15,7 +15,7 @@ class UnRAIDServer(object): - def __init__(self, mqtt_config, unraid_config): + def __init__(self, mqtt_config, unraid_config, loop: asyncio.AbstractEventLoop): # Unraid config unraid_host = unraid_config.get('host') unraid_port = unraid_config.get('port') @@ -50,6 +50,8 @@ def __init__(self, mqtt_config, unraid_config): unraid_logger.setFormatter(unraid_logger_formatter) self.logger.addHandler(unraid_logger) + self.loop = loop + def on_connect(self, client, flags, rc, properties): self.logger.info('Successfully connected to mqtt server') @@ -237,13 +239,13 @@ async def ws_connect(self): if sub_channel not in self.mqtt_history: self.logger.info(f'Create config for {sub_channel}') self.mqtt_history[sub_channel] = (time.time() - self.scan_interval) - msg_parser(self, msg_data, create_config=True) + self.loop.create_task(msg_parser(self, msg_data, create_config=True)) # Parse content if self.scan_interval <= (time.time() - self.mqtt_history.get(sub_channel, time.time())): self.logger.info(f'Parse data for {sub_channel}') self.mqtt_history[sub_channel] = time.time() - msg_parser(self, msg_data, create_config=False) + self.loop.create_task(msg_parser(self, msg_data, create_config=False)) except (httpx.ConnectTimeout, httpx.ConnectError): self.logger.error('Failed to connect to unraid due to a timeout or connection issue...') @@ -280,7 +282,7 @@ async def ws_connect(self): # Create unraid instances for unraid_config in config.get('unraid'): - UnRAIDServer(config.get('mqtt'), unraid_config) + UnRAIDServer(config.get('mqtt'), unraid_config, loop) # Loop forever loop.run_forever() diff --git a/app/parsers.py b/app/parsers.py index 97845fa..06fae5e 100644 --- a/app/parsers.py +++ b/app/parsers.py @@ -1,20 +1,20 @@ import re import math -import requests +import httpx from lxml import etree from utils import Preferences from humanfriendly import parse_size -def default(self, msg_data, create_config): +async def default(self, msg_data, create_config): pass -def session(self, msg_data, create_config): +async def session(self, msg_data, create_config): self.csrf_token = msg_data -def cpuload(self, msg_data, create_config): +async def cpuload(self, msg_data, create_config): prefs = Preferences(msg_data) state_value = int(prefs.as_dict()['cpu']['host']) @@ -28,7 +28,7 @@ def cpuload(self, msg_data, create_config): self.mqtt_publish(payload, 'sensor', state_value, create_config=create_config) -def disks(self, msg_data, create_config): +async def disks(self, msg_data, create_config): prefs = Preferences(msg_data) disks = prefs.as_dict() @@ -57,7 +57,7 @@ def disks(self, msg_data, create_config): self.mqtt_publish(payload, 'sensor', disk_temp, json_attributes, create_config=create_config, retain=True) -def shares(self, msg_data, create_config): +async def shares(self, msg_data, create_config): prefs = Preferences(msg_data) shares = prefs.as_dict() @@ -72,77 +72,78 @@ def shares(self, msg_data, create_config): if share_use_cache in ['no', 'yes', 'prefer']: - # unRAID 6.11 - if self.unraid_version.startswith('6.11'): - - # Auth header - headers = {'Cookie': self.unraid_cookie + ';ssz=ssz'} - - # Calculate used space - params = { - 'cmd': '/webGui/scripts/share_size', - 'arg1': share_nameorig, - 'arg2': 'ssz1', - 'arg3': share_cachepool, - 'csrf_token': self.csrf_token - } - requests.get(f'{self.unraid_url}/update.htm', params=params, headers=headers) - - # Read result - params = { - 'compute': 'no', - 'path': 'Shares', - 'scale': 1, - 'fill': 'ssz', - 'number': '.' - } - - r = requests.get(f'{self.unraid_url}/webGui/include/ShareList.php', params=params, headers=headers) - - # unRAID 6.12+ - else: - - # Auth header - headers = {'Cookie': self.unraid_cookie} - - # Read result - data = { - 'compute': share_nameorig, - 'path': 'Shares', - 'all': 1, - 'csrf_token': self.csrf_token - } - - r = requests.get(f'{self.unraid_url}/webGui/include/ShareList.php', data=data, headers=headers) - - if r.ok: - tree = etree.HTML(r.text) - - size_total_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[6]/text()') - size_total_used = next(iter(size_total_used or []), '0').strip() - size_total_used = parse_size(size_total_used) - - size_total_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[7]/text()') - size_total_free = next(iter(size_total_free or []), '0').strip() - size_total_free = parse_size(size_total_free) - - size_cache_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[6]/text()') - size_cache_used = next(iter(size_cache_used or []), '0').strip() - size_cache_used = parse_size(size_cache_used) - - size_cache_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[7]/text()') - size_cache_free = next(iter(size_cache_free or []), '0').strip() - size_cache_free = parse_size(size_cache_free) - - # # Debug - # from humanfriendly import format_size - # print(f'Share: {share_nameorig}') - # print(f'Used (total): {format_size(size_total_used)} Free (total): {format_size(size_total_free)}') - # print(f'Used (cache): {format_size(size_cache_used)} Free (total): {format_size(size_cache_free)}') - - # Recalculate used and free space, converted from bytes to kbytes - share['used'] = int(size_total_used / 1000) - share['free'] = int((size_total_free - size_cache_free - size_cache_used) / 1000) + async with httpx.AsyncClient() as http: + + # unRAID 6.11 + if self.unraid_version.startswith('6.11'): + + # Auth header + headers = {'Cookie': self.unraid_cookie + ';ssz=ssz'} + + # Calculate used space + params = { + 'cmd': '/webGui/scripts/share_size', + 'arg1': share_nameorig, + 'arg2': 'ssz1', + 'arg3': share_cachepool, + 'csrf_token': self.csrf_token + } + await http.get(f'{self.unraid_url}/update.htm', params=params, headers=headers) + + # Read result + params = { + 'compute': 'no', + 'path': 'Shares', + 'scale': 1, + 'fill': 'ssz', + 'number': '.' + } + + r = await http.get(f'{self.unraid_url}/webGui/include/ShareList.php', params=params, headers=headers, timeout=600) + + # unRAID 6.12+ + else: + + # Auth header + headers = {'Cookie': self.unraid_cookie} + + # Read result + data = { + 'compute': share_nameorig, + 'path': 'Shares', + 'all': 1, + 'csrf_token': self.csrf_token + } + r = await http.request("GET", url=f'{self.unraid_url}/webGui/include/ShareList.php', data=data, headers=headers, timeout=600) + + if r.ok: + tree = etree.HTML(r.text) + + size_total_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[6]/text()') + size_total_used = next(iter(size_total_used or []), '0').strip() + size_total_used = parse_size(size_total_used) + + size_total_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[7]/text()') + size_total_free = next(iter(size_total_free or []), '0').strip() + size_total_free = parse_size(size_total_free) + + size_cache_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[6]/text()') + size_cache_used = next(iter(size_cache_used or []), '0').strip() + size_cache_used = parse_size(size_cache_used) + + size_cache_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[7]/text()') + size_cache_free = next(iter(size_cache_free or []), '0').strip() + size_cache_free = parse_size(size_cache_free) + + # # Debug + # from humanfriendly import format_size + # print(f'Share: {share_nameorig}') + # print(f'Used (total): {format_size(size_total_used)} Free (total): {format_size(size_total_free)}') + # print(f'Used (cache): {format_size(size_cache_used)} Free (total): {format_size(size_cache_free)}') + + # Recalculate used and free space, converted from bytes to kbytes + share['used'] = int(size_total_used / 1000) + share['free'] = int((size_total_free - size_cache_free - size_cache_used) / 1000) # Skip empty shares if share['used'] == 0: @@ -169,7 +170,7 @@ def shares(self, msg_data, create_config): self.mqtt_publish(payload, 'sensor', share_used_pct, json_attributes, create_config=create_config, retain=True) -def temperature(self, msg_data, create_config): +async def temperature(self, msg_data, create_config): tree = etree.HTML(msg_data) sensors = tree.xpath('.//span[@title]') @@ -205,7 +206,7 @@ def temperature(self, msg_data, create_config): self.mqtt_publish(payload, 'sensor', device_value, create_config=create_config) -def update1(self, msg_data, create_config): +async def update1(self, msg_data, create_config): memory_categories = ['RAM', 'Flash', 'Log', 'Docker'] for (memory_name, memory_usage) in zip(memory_categories, re.findall(re.compile(r'(\d+%)'), msg_data)): memory_value = ''.join(c for c in memory_usage if c.isdigit()) @@ -240,7 +241,7 @@ def update1(self, msg_data, create_config): self.mqtt_publish(payload, 'sensor', fan_value, create_config=create_config) -def update3(self, msg_data, create_config): +async def update3(self, msg_data, create_config): network_download = 0 network_upload = 0 @@ -272,7 +273,7 @@ def update3(self, msg_data, create_config): self.mqtt_publish(payload_upload, 'sensor', network_upload, create_config=create_config) -def parity(self, msg_data, create_config): +async def parity(self, msg_data, create_config): data = msg_data.split(';') if len(data) < 5: @@ -303,7 +304,7 @@ def parity(self, msg_data, create_config): self.mqtt_publish(payload, 'sensor', state_value, json_attributes, create_config=create_config) -def var(self, msg_data, create_config): +async def var(self, msg_data, create_config): msg_data = f'[var]\n{msg_data}' prefs = Preferences(msg_data) var = prefs.as_dict() diff --git a/app/requirements.txt b/app/requirements.txt index c5d2083..f7139e3 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -13,5 +13,4 @@ pyreadline3==3.4.1 PyYAML==6.0.1 rfc3986==1.5.0 sniffio==1.2.0 -websockets==10.3 -requests==2.28.1 \ No newline at end of file +websockets==10.3 \ No newline at end of file