Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blocking I/O in shares Parser #13

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


class UnRAIDServer(object):
def __init__(self, mqtt_config, unraid_config):
def __init__(self, mqtt_config, unraid_config, loop: asyncio.AbstractEventLoop):
# Unraid config
unraid_host = unraid_config.get('host')
unraid_port = unraid_config.get('port')
Expand Down Expand Up @@ -50,6 +50,8 @@ def __init__(self, mqtt_config, unraid_config):
unraid_logger.setFormatter(unraid_logger_formatter)
self.logger.addHandler(unraid_logger)

self.loop = loop

def on_connect(self, client, flags, rc, properties):
self.logger.info('Successfully connected to mqtt server')

Expand Down Expand Up @@ -237,13 +239,13 @@ async def ws_connect(self):
if sub_channel not in self.mqtt_history:
self.logger.info(f'Create config for {sub_channel}')
self.mqtt_history[sub_channel] = (time.time() - self.scan_interval)
msg_parser(self, msg_data, create_config=True)
self.loop.create_task(msg_parser(self, msg_data, create_config=True))

# Parse content
if self.scan_interval <= (time.time() - self.mqtt_history.get(sub_channel, time.time())):
self.logger.info(f'Parse data for {sub_channel}')
self.mqtt_history[sub_channel] = time.time()
msg_parser(self, msg_data, create_config=False)
self.loop.create_task(msg_parser(self, msg_data, create_config=False))

except (httpx.ConnectTimeout, httpx.ConnectError):
self.logger.error('Failed to connect to unraid due to a timeout or connection issue...')
Expand Down Expand Up @@ -280,7 +282,7 @@ async def ws_connect(self):

# Create unraid instances
for unraid_config in config.get('unraid'):
UnRAIDServer(config.get('mqtt'), unraid_config)
UnRAIDServer(config.get('mqtt'), unraid_config, loop)

# Loop forever
loop.run_forever()
165 changes: 83 additions & 82 deletions app/parsers.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import re
import math
import requests
import httpx
from lxml import etree
from utils import Preferences
from humanfriendly import parse_size


def default(self, msg_data, create_config):
async def default(self, msg_data, create_config):
pass


def session(self, msg_data, create_config):
async def session(self, msg_data, create_config):
self.csrf_token = msg_data


def cpuload(self, msg_data, create_config):
async def cpuload(self, msg_data, create_config):
prefs = Preferences(msg_data)
state_value = int(prefs.as_dict()['cpu']['host'])

Expand All @@ -28,7 +28,7 @@ def cpuload(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', state_value, create_config=create_config)


def disks(self, msg_data, create_config):
async def disks(self, msg_data, create_config):
prefs = Preferences(msg_data)
disks = prefs.as_dict()

Expand Down Expand Up @@ -57,7 +57,7 @@ def disks(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', disk_temp, json_attributes, create_config=create_config, retain=True)


def shares(self, msg_data, create_config):
async def shares(self, msg_data, create_config):
prefs = Preferences(msg_data)
shares = prefs.as_dict()

Expand All @@ -72,77 +72,78 @@ def shares(self, msg_data, create_config):

if share_use_cache in ['no', 'yes', 'prefer']:

# unRAID 6.11
if self.unraid_version.startswith('6.11'):

# Auth header
headers = {'Cookie': self.unraid_cookie + ';ssz=ssz'}

# Calculate used space
params = {
'cmd': '/webGui/scripts/share_size',
'arg1': share_nameorig,
'arg2': 'ssz1',
'arg3': share_cachepool,
'csrf_token': self.csrf_token
}
requests.get(f'{self.unraid_url}/update.htm', params=params, headers=headers)

# Read result
params = {
'compute': 'no',
'path': 'Shares',
'scale': 1,
'fill': 'ssz',
'number': '.'
}

r = requests.get(f'{self.unraid_url}/webGui/include/ShareList.php', params=params, headers=headers)

# unRAID 6.12+
else:

# Auth header
headers = {'Cookie': self.unraid_cookie}

# Read result
data = {
'compute': share_nameorig,
'path': 'Shares',
'all': 1,
'csrf_token': self.csrf_token
}

r = requests.get(f'{self.unraid_url}/webGui/include/ShareList.php', data=data, headers=headers)

if r.ok:
tree = etree.HTML(r.text)

size_total_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[6]/text()')
size_total_used = next(iter(size_total_used or []), '0').strip()
size_total_used = parse_size(size_total_used)

size_total_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[7]/text()')
size_total_free = next(iter(size_total_free or []), '0').strip()
size_total_free = parse_size(size_total_free)

size_cache_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[6]/text()')
size_cache_used = next(iter(size_cache_used or []), '0').strip()
size_cache_used = parse_size(size_cache_used)

size_cache_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[7]/text()')
size_cache_free = next(iter(size_cache_free or []), '0').strip()
size_cache_free = parse_size(size_cache_free)

# # Debug
# from humanfriendly import format_size
# print(f'Share: {share_nameorig}')
# print(f'Used (total): {format_size(size_total_used)} Free (total): {format_size(size_total_free)}')
# print(f'Used (cache): {format_size(size_cache_used)} Free (total): {format_size(size_cache_free)}')

# Recalculate used and free space, converted from bytes to kbytes
share['used'] = int(size_total_used / 1000)
share['free'] = int((size_total_free - size_cache_free - size_cache_used) / 1000)
async with httpx.AsyncClient() as http:

# unRAID 6.11
if self.unraid_version.startswith('6.11'):

# Auth header
headers = {'Cookie': self.unraid_cookie + ';ssz=ssz'}

# Calculate used space
params = {
'cmd': '/webGui/scripts/share_size',
'arg1': share_nameorig,
'arg2': 'ssz1',
'arg3': share_cachepool,
'csrf_token': self.csrf_token
}
await http.get(f'{self.unraid_url}/update.htm', params=params, headers=headers)

# Read result
params = {
'compute': 'no',
'path': 'Shares',
'scale': 1,
'fill': 'ssz',
'number': '.'
}

r = await http.get(f'{self.unraid_url}/webGui/include/ShareList.php', params=params, headers=headers, timeout=600)

# unRAID 6.12+
else:

# Auth header
headers = {'Cookie': self.unraid_cookie}

# Read result
data = {
'compute': share_nameorig,
'path': 'Shares',
'all': 1,
'csrf_token': self.csrf_token
}
r = await http.request("GET", url=f'{self.unraid_url}/webGui/include/ShareList.php', data=data, headers=headers, timeout=600)

if r.ok:
tree = etree.HTML(r.text)

size_total_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[6]/text()')
size_total_used = next(iter(size_total_used or []), '0').strip()
size_total_used = parse_size(size_total_used)

size_total_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/ancestor::tr[1]/td[7]/text()')
size_total_free = next(iter(size_total_free or []), '0').strip()
size_total_free = parse_size(size_total_free)

size_cache_used = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[6]/text()')
size_cache_used = next(iter(size_cache_used or []), '0').strip()
size_cache_used = parse_size(size_cache_used)

size_cache_free = tree.xpath(f'//td/a[text()="{share_nameorig}"]/following::tr[1]/td[1][not(contains(text(), "Disk "))]/../td[7]/text()')
size_cache_free = next(iter(size_cache_free or []), '0').strip()
size_cache_free = parse_size(size_cache_free)

# # Debug
# from humanfriendly import format_size
# print(f'Share: {share_nameorig}')
# print(f'Used (total): {format_size(size_total_used)} Free (total): {format_size(size_total_free)}')
# print(f'Used (cache): {format_size(size_cache_used)} Free (total): {format_size(size_cache_free)}')

# Recalculate used and free space, converted from bytes to kbytes
share['used'] = int(size_total_used / 1000)
share['free'] = int((size_total_free - size_cache_free - size_cache_used) / 1000)

# Skip empty shares
if share['used'] == 0:
Expand All @@ -169,7 +170,7 @@ def shares(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', share_used_pct, json_attributes, create_config=create_config, retain=True)


def temperature(self, msg_data, create_config):
async def temperature(self, msg_data, create_config):
tree = etree.HTML(msg_data)
sensors = tree.xpath('.//span[@title]')

Expand Down Expand Up @@ -205,7 +206,7 @@ def temperature(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', device_value, create_config=create_config)


def update1(self, msg_data, create_config):
async def update1(self, msg_data, create_config):
memory_categories = ['RAM', 'Flash', 'Log', 'Docker']
for (memory_name, memory_usage) in zip(memory_categories, re.findall(re.compile(r'(\d+%)'), msg_data)):
memory_value = ''.join(c for c in memory_usage if c.isdigit())
Expand Down Expand Up @@ -240,7 +241,7 @@ def update1(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', fan_value, create_config=create_config)


def update3(self, msg_data, create_config):
async def update3(self, msg_data, create_config):
network_download = 0
network_upload = 0

Expand Down Expand Up @@ -272,7 +273,7 @@ def update3(self, msg_data, create_config):
self.mqtt_publish(payload_upload, 'sensor', network_upload, create_config=create_config)


def parity(self, msg_data, create_config):
async def parity(self, msg_data, create_config):
data = msg_data.split(';')

if len(data) < 5:
Expand Down Expand Up @@ -303,7 +304,7 @@ def parity(self, msg_data, create_config):
self.mqtt_publish(payload, 'sensor', state_value, json_attributes, create_config=create_config)


def var(self, msg_data, create_config):
async def var(self, msg_data, create_config):
msg_data = f'[var]\n{msg_data}'
prefs = Preferences(msg_data)
var = prefs.as_dict()
Expand Down
3 changes: 1 addition & 2 deletions app/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ pyreadline3==3.4.1
PyYAML==6.0.1
rfc3986==1.5.0
sniffio==1.2.0
websockets==10.3
requests==2.28.1
websockets==10.3