-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathapi.py
408 lines (371 loc) · 17.2 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
"""Module to access iNaturalist API."""
from json import JSONDecodeError
import logging
from time import time
from types import SimpleNamespace
from typing import List, Optional, Union
from aiohttp import (
ClientConnectorError,
ClientSession,
ContentTypeError,
ServerDisconnectedError,
TraceConfig,
TraceRequestStartParams,
)
from aiohttp_retry import RetryClient, ExponentialRetry
from aiolimiter import AsyncLimiter
from bs4 import BeautifulSoup
import html2markdown
logger = logging.getLogger("red.dronefly." + __name__)
API_BASE_URL = "https://api.inaturalist.org"
RETRY_EXCEPTIONS = [
ServerDisconnectedError,
ConnectionResetError,
ClientConnectorError,
JSONDecodeError,
TimeoutError,
]
class INatAPI:
"""Access the iNat API and assets via (api|static).inaturalist.org."""
def __init__(self):
# pylint: disable=unused-argument
async def on_request_start(
session: ClientSession,
trace_config_ctx: SimpleNamespace,
params: TraceRequestStartParams,
) -> None:
current_attempt = trace_config_ctx.trace_request_ctx["current_attempt"]
if current_attempt > 1:
logger.info(
"iNat request attempt #%d: %s", current_attempt, repr(params)
)
trace_config = TraceConfig()
trace_config.on_request_start.append(on_request_start)
self.session = RetryClient(
raise_for_status=False,
trace_configs=[trace_config],
)
self.request_time = time()
self.places_cache = {}
self.projects_cache = {}
self.users_cache = {}
self.users_login_cache = {}
self.taxa_cache = {}
# api_v1_limiter:
# ---------------
# - Allow up to 50 requests over a 60 second time period (i.e.
# a burst of up to 50 within the period, after which requests
# are throttled until more capacity is available)
# - This honours "try to keep it to 60 requests per minute or lower"
# - https://api.inaturalist.org/v1/docs/
self.api_v1_limiter = AsyncLimiter(50, 60)
async def _get_rate_limited(self, full_url, **kwargs):
"""Query API, respecting 60 requests per minute rate limit."""
logger.debug('_get_rate_limited("%s", %s)', full_url, repr(kwargs))
async with self.api_v1_limiter:
# i.e. wait 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, and finally give up
retry_options = ExponentialRetry(
attempts=6,
exceptions=RETRY_EXCEPTIONS,
)
try:
async with self.session.get(
full_url, params=kwargs, retry_options=retry_options
) as response:
if response.status == 200:
return await response.json()
else:
try:
json = await response.json()
msg = f"{json.get('error')} ({json.get('status')})"
except ContentTypeError:
data = await response.text()
document = BeautifulSoup(data, "html.parser")
# Only use the body, if present
if document.body:
text = document.body.find().text
else:
text = document
# Treat as much as we can as markdown
markdown = html2markdown.convert(text)
# Punt the rest back to bs4 to drop unhandled tags
msg = BeautifulSoup(markdown, "html.parser").text
lookup_failed_msg = f"Lookup failed: {msg}"
logger.error(lookup_failed_msg)
raise LookupError(lookup_failed_msg)
except Exception as e: # pylint: disable=broad-except,invalid-name
if any(isinstance(e, exc) for exc in retry_options.exceptions):
attempts = retry_options.attempts
msg = f"iNat not responding after {attempts} attempts. Please try again later."
logger.error(msg)
raise LookupError(msg) from e
raise e
return None
async def get_controlled_terms(self, *args, **kwargs):
"""Query API for controlled terms."""
endpoint = "/".join(("/v1/controlled_terms", *args))
full_url = f"{API_BASE_URL}{endpoint}"
return await self._get_rate_limited(full_url, **kwargs)
async def get_observations(self, *args, **kwargs):
"""Query API for observations.
Parameters
----------
*args
- If first positional argument is given, it is passed through
as-is, appended to the /v1/observations endpoint.
**kwargs
- All kwargs are passed as params on the API call.
"""
endpoint = "/v1/observations"
id_arg = f"/{args[0]}" if args else ""
full_url = f"{API_BASE_URL}{endpoint}{id_arg}"
return await self._get_rate_limited(full_url, **kwargs)
async def get_observation_bounds(self, taxon_ids):
"""Get the bounds for the specified observations."""
kwargs = {
"return_bounds": "true",
"verifiable": "true",
"taxon_id": ",".join(map(str, taxon_ids)),
"per_page": 0,
}
result = await self.get_observations(**kwargs)
if result and "total_bounds" in result:
return result["total_bounds"]
return None
async def get_places(
self, query: Union[int, str, list], refresh_cache=False, **kwargs
):
"""Get places for the specified ids or text query."""
first_place_id = None
if isinstance(query, list):
cached = set(query).issubset(set(self.places_cache))
request = f"/v1/places/{','.join(map(str, query))}"
elif isinstance(query, int):
cached = query in self.places_cache
if cached:
first_place_id = query
request = f"/v1/places/{query}"
else:
cached = False
request = f"/v1/places/{query}"
full_url = f"{API_BASE_URL}{request}"
if refresh_cache or not cached:
results = await self._get_rate_limited(full_url, **kwargs)
if results:
places = results.get("results") or []
for place in places:
key = place.get("id")
if key:
if not first_place_id:
first_place_id = key
record = {
"total_results": 1,
"page": 1,
"per_page": 1,
"results": [place],
}
self.places_cache[key] = record
if isinstance(query, list):
return {
place_id: self.places_cache[int(place_id)]
for place_id in query
if int(place_id) in self.places_cache
}
if first_place_id in self.places_cache:
return self.places_cache[first_place_id]
return None
async def get_projects(
self, query: Union[str, int, list], refresh_cache=False, **kwargs
):
"""Get projects for the specified ids or text query."""
first_project_id = None
if isinstance(query, list):
cached = set(query).issubset(set(self.projects_cache))
request = f"/v1/projects/{','.join(map(str, query))}"
elif isinstance(query, int):
cached = query in self.projects_cache
if cached:
first_project_id = query
request = f"/v1/projects/{query}"
else:
cached = False
request = f"/v1/projects/{query}"
full_url = f"{API_BASE_URL}{request}"
if refresh_cache or not cached:
results = await self._get_rate_limited(full_url, **kwargs)
if results:
projects = results.get("results") or []
for project in projects:
key = project.get("id")
if key:
if not first_project_id:
first_project_id = key
record = {
"total_results": 1,
"page": 1,
"per_page": 1,
"results": [project],
}
self.projects_cache[key] = record
if isinstance(query, list):
return {
project_id: self.projects_cache[int(project_id)]
for project_id in query
if self.projects_cache.get(int(project_id))
}
if first_project_id in self.projects_cache:
return self.projects_cache[first_project_id]
return None
async def get_observers_stats(self, **kwargs):
"""Query API for user counts & rankings."""
request = "/v1/observations/observers"
# TODO: validate kwargs includes project_id
# TODO: support queries with > 500 observers (one page, default)
full_url = f"{API_BASE_URL}{request}"
return await self._get_rate_limited(full_url, **kwargs)
async def get_search_results(self, **kwargs):
"""Get site search results."""
if "is_active" in kwargs and kwargs["is_active"] == "any":
full_url = f"{API_BASE_URL}/v1/taxa"
else:
full_url = f"{API_BASE_URL}/v1/search"
return await self._get_rate_limited(full_url, **kwargs)
async def get_users(
self, query: Union[int, str], refresh_cache=False, by_login_id=False, **kwargs
):
"""Get the users for the specified login, user_id, or query."""
request = f"/v1/users/{query}"
if isinstance(query, int) or query.isnumeric():
user_id = int(query)
key = user_id
elif by_login_id:
user_id = None
key = query
else:
user_id = None
request = f"/v1/users/autocomplete?q={query}"
key = query
full_url = f"{API_BASE_URL}{request}"
if refresh_cache or (
key not in self.users_cache and key not in self.users_login_cache
):
# TODO: provide means to expire the cache (other than reloading the cog).
json_data = await self._get_rate_limited(full_url, **kwargs)
if json_data:
results = json_data.get("results")
if not results:
return None
if user_id is None:
if len(results) == 1:
# String query matched exactly one result; cache it:
user = results[0]
# The entry itself is put in the main cache, indexed by user_id.
self.users_cache[user["id"]] = json_data
# Lookaside by login stores only linkage to the
# entry just stored in the main cache.
self.users_login_cache[user["login"]] = user["id"]
# Additionally add an entry to the main cache for
# the query string, but only for other than an
# exact login id match as that would serve no
# purpose. This is slightly wasteful, but makes for
# simpler code.
if user["login"] != key:
self.users_cache[key] = json_data
else:
# Cache multiple results matched by string.
self.users_cache[key] = json_data
# Additional synthesized cache results per matched user, as
# if they were queried individually.
for user in results:
user_json = {}
user_json["results"] = [user]
self.users_cache[user["id"]] = user_json
# Only index the login in the lookaside cache if it
# isn't the query string itself, already indexed above
# in the main cache.
# - i.e. it's possible a search for a login matches
# more than one entry (e.g. david, david99, etc.)
# so retrieving it from cache must always return
# all matching results, not just one for the login
# itself
if user["login"] != key:
self.users_login_cache[user["login"]] = user["id"]
else:
# i.e. lookup by user_id only returns one match
user = results[0]
if user:
self.users_cache[key] = json_data
self.users_login_cache[user["login"]] = key
self.request_time = time()
if key in self.users_cache:
return self.users_cache[key]
# - Lookaside for login is only consulted if not found in the main
# users_cache.
# - This is important, since a lookup by user_id could prime the
# lookaside cache with the single login entry, and then a subsequent
# search by login could return multiple results into the main cache.
# From then on, searching for the login should return the cached
# multiple results from the main cache, not the single result that the
# lookaside users_login_cache supports.
# - This shortcut seems like it would return incomplete results depending
# on the order in which lookups are performed. However, since the login
# lookaside is primarily in support of iNat login lookups from already
# cached project members, this is OK. The load of the whole project
# membership at once (get_observers_from_projects) for that use case
# ensures all relevant matches are already individually cached.
if key in self.users_login_cache:
user_id = self.users_login_cache[key]
return self.users_cache[user_id]
return None
async def get_observers_from_projects(
self, project_ids: Optional[List] = None, user_ids: Optional[List] = None
):
"""Get observers for a list of project ids.
Since the cache is filled as a side effect, this method can be
used to prime the cache prior to fetching multiple users at once
by id.
Users may also be specified, and in that case, project ids may be
omitted. The cache will then be primed from a list of user ids.
"""
if not (project_ids or user_ids):
return
page = 1
more = True
users = []
# Note: This will only handle up to 10,000 users. Anything more
# needs to set id_above and id_below. With luck, we won't ever
# need to deal with projects this big!
while more:
params = {"page": page}
if project_ids:
params["project_id"] = ",".join(map(str, project_ids))
if user_ids:
params["user_id"] = ",".join(map(str, user_ids))
response = await self.get_observations("observers", **params)
results = response.get("results") or []
for observer in results:
user = observer.get("user")
if user:
user_id = user.get("id")
if user_id:
# Synthesize a single result as if returned by a get_users
# lookup of a single user_id, and cache it:
user_json = {}
user_json["results"] = [user]
users.append(user)
self.users_cache[user_id] = user_json
self.users_login_cache[user["login"]] = user_id
# default values provided defensively to exit loop if missing
per_page = response.get("per_page") or len(results)
total_results = response.get("total_results") or len(results)
if results and (page * per_page < total_results):
page += 1
else:
more = False
# return all user results as a single page
return {
"total_results": len(users),
"pages": 1,
"per_page": len(users),
"results": users,
}