-
Notifications
You must be signed in to change notification settings - Fork 23.9k
/
api.py
178 lines (144 loc) · 5.65 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# Copyright: (c) 2015, Brian Coca, <bcoca@ansible.com>
#
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
"""
This module adds shared support for generic api modules
In order to use this module, include it as part of a custom
module as shown below.
The 'api' module provides the following common argument specs:
* rate limit spec
- rate: number of requests per time unit (int)
- rate_limit: time window in which the limit is applied in seconds
* retry spec
- retries: number of attempts
- retry_pause: delay between attempts in seconds
"""
from __future__ import annotations
import copy
import functools
import itertools
import secrets
import sys
import time
import ansible.module_utils.compat.typing as t
def rate_limit_argument_spec(spec=None):
"""Creates an argument spec for working with rate limiting"""
arg_spec = (dict(
rate=dict(type='int'),
rate_limit=dict(type='int'),
))
if spec:
arg_spec.update(spec)
return arg_spec
def retry_argument_spec(spec=None):
"""Creates an argument spec for working with retrying"""
arg_spec = (dict(
retries=dict(type='int'),
retry_pause=dict(type='float', default=1),
))
if spec:
arg_spec.update(spec)
return arg_spec
def basic_auth_argument_spec(spec=None):
arg_spec = (dict(
api_username=dict(type='str'),
api_password=dict(type='str', no_log=True),
api_url=dict(type='str'),
validate_certs=dict(type='bool', default=True)
))
if spec:
arg_spec.update(spec)
return arg_spec
def rate_limit(rate=None, rate_limit=None):
"""rate limiting decorator"""
minrate = None
if rate is not None and rate_limit is not None:
minrate = float(rate_limit) / float(rate)
def wrapper(f):
last = [0.0]
def ratelimited(*args, **kwargs):
if sys.version_info >= (3, 8):
real_time = time.process_time
else:
real_time = time.clock
if minrate is not None:
elapsed = real_time() - last[0]
left = minrate - elapsed
if left > 0:
time.sleep(left)
last[0] = real_time()
ret = f(*args, **kwargs)
return ret
return ratelimited
return wrapper
def retry(retries=None, retry_pause=1):
"""Retry decorator"""
def wrapper(f):
def retried(*args, **kwargs):
retry_count = 0
if retries is not None:
ret = None
while True:
retry_count += 1
if retry_count >= retries:
raise Exception("Retry limit exceeded: %d" % retries)
try:
ret = f(*args, **kwargs)
except Exception:
pass
if ret:
break
time.sleep(retry_pause)
return ret
return retried
return wrapper
def generate_jittered_backoff(retries=10, delay_base=3, delay_threshold=60):
"""The "Full Jitter" backoff strategy.
Ref: https://www.awsarchitectureblog.com/2015/03/backoff.html
:param retries: The number of delays to generate.
:param delay_base: The base time in seconds used to calculate the exponential backoff.
:param delay_threshold: The maximum time in seconds for any delay.
"""
for retry in range(0, retries):
yield secrets.randbelow(min(delay_threshold, delay_base * 2 ** retry))
def retry_never(exception_or_result):
return False
def retry_with_delays_and_condition(backoff_iterator, should_retry_error=None):
"""Generic retry decorator.
:param backoff_iterator: An iterable of delays in seconds.
:param should_retry_error: A callable that takes an exception of the decorated function and decides whether to retry or not (returns a bool).
"""
def _emit_isolated_iterator_copies(original_iterator): # type: (t.Iterable[t.Any]) -> t.Generator
# Ref: https://stackoverflow.com/a/30232619/595220
_copiable_iterator, _first_iterator_copy = itertools.tee(original_iterator)
yield _first_iterator_copy
while True:
yield copy.copy(_copiable_iterator)
backoff_iterator_generator = _emit_isolated_iterator_copies(backoff_iterator)
del backoff_iterator # prevent accidental use elsewhere
if should_retry_error is None:
should_retry_error = retry_never
def function_wrapper(function):
@functools.wraps(function)
def run_function(*args, **kwargs):
"""This assumes the function has not already been called.
If backoff_iterator is empty, we should still run the function a single time with no delay.
"""
call_retryable_function = functools.partial(function, *args, **kwargs)
for delay in next(backoff_iterator_generator):
try:
return call_retryable_function()
except Exception as e:
if not should_retry_error(e):
raise
time.sleep(delay)
# Only or final attempt
return call_retryable_function()
return run_function
return function_wrapper