forked from Flexget/Flexget
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplugin.py
663 lines (548 loc) · 22.5 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
import os
import re
import time
from functools import total_ordering
from http.client import BadStatusLine
from importlib import import_module
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Union
from urllib.error import HTTPError, URLError
import loguru
from requests import RequestException
try:
# This is in our requirements for python versions older than 3.10 to get the new style selectable entry points
from importlib_metadata import entry_points
except ImportError:
# Python 3.10 and higher has the new functionality
from importlib.metadata import entry_points
from flexget import components as components_pkg
from flexget import config_schema
from flexget import plugins as plugins_pkg
from flexget.event import Event, event, fire_event, remove_event_handlers
from flexget.event import add_event_handler as add_phase_handler
logger = loguru.logger.bind(name='plugin')
PRIORITY_DEFAULT = 128
PRIORITY_LAST = -255
PRIORITY_FIRST = 255
class DependencyError(Exception):
"""Plugin depends on other plugin, but it cannot be loaded.
Args:
issued_by: name of the plugin trying to do the import
missing: name of the plugin or library that is missing
message: customized user readable error message
All args are optional.
"""
def __init__(
self,
issued_by: Optional[str] = None,
missing: Optional[str] = None,
message: Optional[str] = None,
silent: bool = False,
):
super().__init__()
self.issued_by = issued_by
self.missing = missing
self._message = message
self.silent = silent
def _get_message(self) -> str:
if self._message:
return self._message
return f'Plugin `{self.issued_by}` requires dependency `{self.missing}`'
def _set_message(self, message: str) -> None:
self._message = message
def has_message(self) -> bool:
return self._message is not None
message = property(_get_message, _set_message)
def __str__(self) -> str:
return f'<DependencyError(issued_by={self.issued_by!r},missing={self.missing!r},message={self.message!r},silent={self.silent!r})>'
class RegisterException(Exception):
def __init__(self, value):
super().__init__()
self.value = value
def __str__(self):
return repr(self.value)
class PluginWarning(Warning):
def __init__(self, value, logger: 'loguru.Logger' = logger, **kwargs):
super().__init__()
self.value = value
self.logger = logger
self.kwargs = kwargs
def __str__(self):
return self.value
class PluginError(Exception):
def __init__(self, value, logger: 'loguru.Logger' = logger, **kwargs):
super().__init__()
# Value is expected to be a string
if not isinstance(value, str):
value = str(value)
self.value = value
self.logger = logger
self.kwargs = kwargs
def __str__(self):
return self.value
# TODO: move to utils or somewhere more appropriate
class internet:
"""@internet decorator for plugin phase methods.
Catches all internet related exceptions and raises PluginError with relevant message.
Task handles PluginErrors by aborting the task.
"""
def __init__(self, logger_: 'loguru.Logger' = logger):
if logger_:
self.logger = logger_
else:
self.logger = logger.bind(name='@internet')
def __call__(self, func: Callable) -> Callable:
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except RequestException as e:
logger.opt(exception=True).debug(
'decorator caught RequestException. handled traceback:'
)
raise PluginError(f'RequestException: {e}')
except HTTPError as e:
raise PluginError(f'HTTPError {e.code}', self.logger)
except URLError as e:
logger.opt(exception=True).debug('decorator caught urlerror. handled traceback:')
raise PluginError(f'URLError {e.reason}', self.logger)
except BadStatusLine:
logger.opt(exception=True).debug(
'decorator caught badstatusline. handled traceback:'
)
raise PluginError('Got BadStatusLine', self.logger)
except ValueError as e:
logger.opt(exception=True).debug('decorator caught ValueError. handled traceback:')
raise PluginError(e)
except OSError as e:
logger.opt(exception=True).debug('decorator caught OSError. handled traceback:')
if hasattr(e, 'reason'):
raise PluginError(f'Failed to reach server. Reason: {e.reason}', self.logger)
if hasattr(e, 'code'):
raise PluginError(
f"The server couldn't fulfill the request. Error code: {e.code}",
self.logger,
)
raise PluginError(f'OSError when connecting to server: {e}', self.logger)
return wrapped_func
def priority(value: int) -> Callable[[Callable], Callable]:
"""Priority decorator for phase methods"""
def decorator(target: Callable) -> Callable:
target.priority = value
return target
return decorator
# task phases, in order of their execution; note that this can be extended by
# registering new phases at runtime
task_phases = [
'prepare',
'start',
'input',
'metainfo',
'filter',
'download',
'modify',
'output',
'learn',
'exit',
]
# map phase names to method names
phase_methods = {
# task
'abort': 'on_task_abort' # special; not a task phase that gets called normally
}
phase_methods.update((_phase, 'on_task_' + _phase) for _phase in task_phases) # DRY
# Mapping of plugin name to PluginInfo instance (logical singletons)
plugins: Dict[str, 'PluginInfo'] = {}
# Loading done?
plugins_loaded = False
_loaded_plugins = {}
_plugin_options = []
_new_phase_queue: Dict[str, List[Optional[str]]] = {}
def register_task_phase(name: str, before: Optional[str] = None, after: Optional[str] = None):
"""Adds a new task phase to the available phases."""
if before and after:
raise RegisterException('You can only give either before or after for a phase.')
if not before and not after:
raise RegisterException('You must specify either a before or after phase.')
if name in task_phases or name in _new_phase_queue:
raise RegisterException(f'Phase {name} already exists.')
def add_phase(phase_name: str, before: Optional[str], after: Optional[str]):
if before is not None and before not in task_phases:
return False
if after is not None and after not in task_phases:
return False
# add method name to phase -> method lookup table
phase_methods[phase_name] = 'on_task_' + phase_name
# place phase in phase list
if before is None and after is not None:
task_phases.insert(task_phases.index(after) + 1, phase_name)
if after is None and before is not None:
task_phases.insert(task_phases.index(before), phase_name)
return True
# if can't add yet (dependencies) queue addition
if not add_phase(name, before, after):
_new_phase_queue[name] = [before, after]
for phase_name, args in list(_new_phase_queue.items()):
if add_phase(phase_name, *args):
del _new_phase_queue[phase_name]
@total_ordering
class PluginInfo(dict):
"""
Allows accessing key/value pairs of this dictionary subclass via
attributes. Also instantiates a plugin and initializes properties.
"""
# Counts duplicate registrations
dupe_counter = 0
def __init__(
self,
plugin_class: type,
name: Optional[str] = None,
interfaces: Optional[List[str]] = None,
builtin: bool = False,
debug: bool = False,
api_ver: int = 1,
category: Optional[str] = None,
) -> None:
"""
Register a plugin.
:param plugin_class: The plugin factory.
:param string name: Name of the plugin (if not given, default to factory class name in underscore form).
:param list interfaces: Interfaces this plugin implements.
:param bool builtin: Auto-activated?
:param bool debug: True if plugin is for debugging purposes.
:param int api_ver: Signature of callback hooks (1=task; 2=task,config).
:param string category: The type of plugin. Can be one of the task phases.
Defaults to the package name containing the plugin.
"""
dict.__init__(self)
if interfaces is None:
interfaces = ['task']
if name is None:
# Convention is to take camel-case class name and rewrite it to an underscore form,
# e.g. 'PluginName' to 'plugin_name'
name = re.sub(
'[A-Z]+', lambda i: '_' + i.group(0).lower(), plugin_class.__name__
).lstrip('_')
if category is None and plugin_class.__module__.startswith('flexget.plugins'):
# By default look at the containing package of the plugin.
category = plugin_class.__module__.split('.')[-2]
# Check for unsupported api versions
if api_ver < 2:
raise PluginError(f'Api versions <2 are no longer supported. Plugin {name}')
# Set basic info attributes
self.api_ver = api_ver
self.name = name
self.interfaces = interfaces
self.builtin = builtin
self.debug = debug
self.category = category
self.phase_handlers: Dict[str, Event] = {}
self.schema: config_schema.JsonSchema = {}
self.schema_id: Optional[str] = None
self.plugin_class: type = plugin_class
self.instance: object = None
if self.name in plugins:
PluginInfo.dupe_counter += 1
logger.critical(
'Error while registering plugin {}. A plugin with the same name is already registered',
self.name,
)
else:
plugins[self.name] = self
def initialize(self) -> None:
if self.instance is not None:
# We already initialized
return
# Create plugin instance
self.instance = self.plugin_class()
self.instance.plugin_info = self # give plugin easy access to its own info
self.instance.logger = logger.bind(
name=getattr(self.instance, "LOGGER_NAME", None) or self.name
)
if hasattr(self.instance, 'schema'):
self.schema = self.instance.schema
elif hasattr(self.instance, 'validator'):
self.schema = self.instance.validator().schema()
else:
# TODO: I think plugins without schemas should not be allowed in config, maybe rethink this
self.schema = {}
if self.schema is not None:
self.schema_id = f'/schema/plugin/{self.name}'
config_schema.register_schema(self.schema_id, self.schema)
self.build_phase_handlers()
def build_phase_handlers(self) -> None:
"""(Re)build phase_handlers in this plugin"""
for phase, method_name in phase_methods.items():
if phase in self.phase_handlers:
continue
if hasattr(self.instance, method_name):
method = getattr(self.instance, method_name)
if not callable(method):
continue
# check for priority decorator
if hasattr(method, 'priority'):
handler_prio = method.priority
else:
handler_prio = PRIORITY_DEFAULT
event = add_phase_handler(f'plugin.{self.name}.{phase}', method, handler_prio)
# provides backwards compatibility
event.plugin = self
self.phase_handlers[phase] = event
def __getattr__(self, attr: str):
if attr in self:
return self[attr]
return dict.__getattribute__(self, attr)
def __setattr__(self, attr: str, value):
self[attr] = value
def __str__(self):
return f'<PluginInfo(name={self.name})>'
def _is_valid_operand(self, other):
return hasattr(other, 'name')
def __eq__(self, other):
return self.name == other.name
def __lt__(self, other):
return self.name < other.name
__repr__ = __str__
register = PluginInfo
def _get_standard_plugins_path() -> List[str]:
"""
:returns: List of directories where traditional plugins should be tried to load from.
"""
# Get basic path from environment
paths = []
env_path = os.environ.get('FLEXGET_PLUGIN_PATH')
if env_path:
paths = env_path.split(os.pathsep)
# Add flexget.plugins directory (core plugins)
paths.append(os.path.abspath(os.path.dirname(plugins_pkg.__file__)))
return paths
def _get_standard_components_path() -> List[str]:
"""
:returns: List of directories where component plugins should be tried to load from.
"""
# Get basic path from environment
paths = []
env_path = os.environ.get('FLEXGET_COMPONENT_PATH')
if env_path:
paths = env_path.split(os.pathsep)
# Add flexget.plugins directory (core plugins)
paths.append(os.path.abspath(os.path.dirname(components_pkg.__file__)))
return paths
def _check_phase_queue() -> None:
if _new_phase_queue:
for phase, args in _new_phase_queue.items():
logger.error(
'Plugin {} requested new phase {}, but it could not be created at requested point (before, after). '
'Plugin is not working properly.',
args[0],
phase,
)
def _import_plugin(module_name: str, plugin_path: Union[str, Path]) -> None:
try:
import_module(module_name)
except DependencyError as e:
if e.has_message():
msg = e.message
else:
msg = 'Plugin `{}` requires plugin `{}` to load.'.format(
e.issued_by or module_name,
e.missing or 'N/A',
)
if not e.silent:
logger.warning(msg)
else:
logger.debug(msg)
except ImportError:
logger.opt(exception=True).critical(
'Plugin `{}` failed to import dependencies', module_name
)
except ValueError as e:
# Debugging #2755
logger.error(
'ValueError attempting to import `{}` (from {}): {}', module_name, plugin_path, e
)
except Exception:
logger.opt(exception=True).critical('Exception while loading plugin {}', module_name)
raise
else:
logger.trace('Loaded module {} from {}', module_name, plugin_path)
def _load_plugins_from_dirs(dirs: List[str]) -> None:
"""
:param list dirs: Directories from where plugins are loaded from
"""
logger.debug(f'Trying to load plugins from: {dirs}')
dir_paths = [Path(d) for d in dirs if os.path.isdir(d)]
# add all dirs to plugins_pkg load path so that imports work properly from any of the plugin dirs
plugins_pkg.__path__ = [str(d) for d in dir_paths]
for plugins_dir in dir_paths:
for plugin_path in plugins_dir.glob('**/*.py'):
if plugin_path.name == '__init__.py':
continue
# Split the relative path from the plugins dir to current file's parent dir to find subpackage names
plugin_subpackages = [
_f for _f in plugin_path.relative_to(plugins_dir).parent.parts if _f
]
module_name = '.'.join([plugins_pkg.__name__, *plugin_subpackages, plugin_path.stem])
_import_plugin(module_name, plugin_path)
_check_phase_queue()
# TODO: this is now identical to _load_plugins_from_dirs, REMOVE
def _load_components_from_dirs(dirs: List[str]) -> None:
"""
:param list dirs: Directories where plugin components are loaded from
"""
logger.debug('Trying to load components from: {}', dirs)
dir_paths = [Path(d) for d in dirs if os.path.isdir(d)]
for component_dir in dir_paths:
for component_path in component_dir.glob('**/*.py'):
if component_path.name == '__init__.py':
continue
# Split the relative path from the plugins dir to current file's parent dir to find subpackage names
plugin_subpackages = [
_f for _f in component_path.relative_to(component_dir).parent.parts if _f
]
package_name = '.'.join(
[components_pkg.__name__, *plugin_subpackages, component_path.stem]
)
_import_plugin(package_name, component_path)
_check_phase_queue()
def _load_plugins_from_packages() -> None:
"""Load plugins installed via PIP"""
for entrypoint in entry_points(group='FlexGet.plugins'):
try:
plugin_module = entrypoint.load()
except DependencyError as e:
if e.has_message():
msg = e.message
else:
msg = (
'Plugin `%s` requires `%s` to load.',
e.issued_by or entrypoint.module_name,
e.missing or 'N/A',
)
if not e.silent:
logger.warning(msg)
else:
logger.debug(msg)
except ImportError:
logger.opt(exception=True).critical(
'Plugin `{}` failed to import dependencies', entrypoint.module_name
)
except Exception:
logger.opt(exception=True).critical(
'Exception while loading plugin {}', entrypoint.module_name
)
raise
else:
logger.trace(
'Loaded packaged module {} from {}', entrypoint.module, plugin_module.__file__
)
_check_phase_queue()
def load_plugins(
extra_plugins: Optional[List[str]] = None, extra_components: Optional[List[str]] = None
) -> None:
"""
Load plugins from the standard plugin and component paths.
:param list extra_plugins: Extra directories from where plugins are loaded.
:param list extra_components: Extra directories from where components are loaded.
"""
global plugins_loaded
if extra_plugins is None:
extra_plugins = []
if extra_components is None:
extra_components = []
# Add flexget.plugins and flexget.components directories (core dist)
extra_plugins.extend(_get_standard_plugins_path())
extra_components.extend(_get_standard_components_path())
start_time = time.time()
# Import all the plugins
_load_plugins_from_dirs(extra_plugins)
_load_components_from_dirs(extra_components)
_load_plugins_from_packages()
# Register them
fire_event('plugin.register')
# Plugins should only be registered once, remove their handlers after
remove_event_handlers('plugin.register')
# After they have all been registered, instantiate them
for plugin in list(plugins.values()):
plugin.initialize()
took = time.time() - start_time
plugins_loaded = True
logger.debug(
'Plugins took {:.2f} seconds to load. {} plugins in registry.', took, len(plugins.keys())
)
def get_plugins(
phase: Optional[str] = None,
interface: Optional[str] = None,
category: Optional[str] = None,
name: Optional[str] = None,
min_api: Optional[int] = None,
) -> Iterable[PluginInfo]:
"""
Query other plugins characteristics.
:param string phase: Require phase
:param string interface: Plugin must implement this interface.
:param string category: Type of plugin, phase names.
:param string name: Name of the plugin.
:param int min_api: Minimum api version.
:return: List of PluginInfo instances.
:rtype: list
"""
def matches(plugin):
if phase is not None and phase not in phase_methods:
raise ValueError(f'Unknown phase {phase}')
if phase and phase not in plugin.phase_handlers:
return False
if interface and interface not in plugin.interfaces:
return False
if category and not category == plugin.category:
return False
if name is not None and name != plugin.name:
return False
if min_api is not None and plugin.api_ver < min_api:
return False
return True
return filter(matches, iter(plugins.values()))
def plugin_schemas(**kwargs) -> 'config_schema.JsonSchema':
"""Create a dict schema that matches plugins specified by `kwargs`"""
return {
'type': 'object',
'properties': {p.name: {'$ref': p.schema_id} for p in get_plugins(**kwargs)},
'additionalProperties': False,
'error_additionalProperties': '{{message}} Only known plugin names are valid keys.',
'patternProperties': {'^_': {'title': 'Disabled Plugin'}},
}
@event('config.register')
def register_schema():
config_schema.register_schema('/schema/plugins', plugin_schemas)
def get_phases_by_plugin(name: str) -> List[str]:
"""Return all phases plugin :name: hooks"""
return list(get_plugin_by_name(name).phase_handlers)
def get_plugin_by_name(name: str, issued_by: str = '???') -> PluginInfo:
"""
Get plugin by name, preferred way since this structure may be changed at some point.
Getting plugin via `.get` function is recommended for normal use.
This results much shorter and cleaner code::
plugin.get_plugin_by_name('parsing').instance.parse_movie(data=entry['title'])
Shortens into::
plugin.get('parsing', self).parse_movie(data=entry['title'])
This function is still useful if you need to access plugin information (PluginInfo).
:returns PluginInfo instance
"""
if name not in plugins:
raise DependencyError(issued_by=issued_by, missing=name)
return plugins[name]
def get(name: str, requested_by: Union[str, object]) -> object:
"""
:param str name: Name of the requested plugin
:param requested_by: Plugin class instance OR string value who is making the request.
:return: Instance of Plugin class
"""
if name not in plugins:
if hasattr(requested_by, 'plugin_info'):
who = requested_by.plugin_info.name
else:
who = requested_by
raise DependencyError(issued_by=who, missing=name)
instance = plugins[name].instance
if instance is None:
raise Exception('Plugin referred before system initialized?')
return instance