-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth.py
231 lines (201 loc) · 8.62 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import urllib
import aiohttp
from oauthlib.oauth1 import Client
from .exceptions import USOSAPIException
from .logger import get_logger
_LOGGER = get_logger("AuthManager")
class AuthManager:
"""
A manager for the USOS API authentication.
"""
REQUEST_TOKEN_SUFFIX = "services/oauth/request_token"
AUTHORIZE_SUFFIX = "services/oauth/authorize"
ACCESS_TOKEN_SUFFIX = "services/oauth/access_token"
REVOKE_TOKEN_SUFFIX = "services/oauth/revoke_token"
# List of available scopes can be found at https://apps.usos.edu.pl/developers/api/authorization/#scopes
SCOPES = "|".join(["offline_access", "studies"])
def __init__(
self,
api_base_address: str,
consumer_key: str,
consumer_secret: str,
trust_env: bool = False,
):
"""
Initialize the authentication manager.
:param api_base_address: The base address of the USOS API.
:param consumer_key: Consumer key obtained from the USOS API.
:param consumer_secret: Consumer secret obtained from the USOS API.
:param trust_env: Whether to trust the environment variables for the connection, see https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientSession for more information.
"""
self.base_address = api_base_address.rstrip("/") + "/"
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = None
self.access_token_secret = None
self._session = None
self.trust_env = trust_env
self._oauth_client = Client(consumer_key, consumer_secret)
async def __aenter__(self) -> "AuthManager":
"""
Enter the manager.
:return: The manager.
"""
await self.open()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
"""
Exit the manager.
:param exc_type: The exception type.
:param exc_value: The exception value.
:param traceback: The traceback.
"""
await self.close()
async def open(self):
"""
Open the manager.
"""
self._session = aiohttp.ClientSession(trust_env=self.trust_env)
async def close(self):
"""
Close the manager.
"""
if self._session:
await self._session.close()
async def _generate_request_token(self, callback_url: str) -> None:
"""
Generate a new request token.
:param callback_url:
"""
url = f"{self.base_address}{self.REQUEST_TOKEN_SUFFIX}"
params = {
"oauth_callback": callback_url,
"scopes": self.SCOPES,
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
url, headers, body = Client(
self.consumer_key, client_secret=self.consumer_secret
).sign(
url, http_method="POST", body=params, headers=headers
) # Use a new client to avoid using the access token if it's set
async with self._session.post(url, data=body, headers=headers) as response:
await self._handle_response_errors(response)
data = dict(urllib.parse.parse_qsl(await response.text()))
self._request_token = data["oauth_token"]
self._request_token_secret = data["oauth_token_secret"]
self._oauth_client.resource_owner_key = self._request_token
self._oauth_client.resource_owner_secret = self._request_token_secret
_LOGGER.info(f"New request token generated: {self._request_token}")
async def get_authorization_url(
self, callback_url: str, confirm_user: bool = False
) -> str:
"""
Get the authorization URL.
:param callback_url: The callback URL.
:param confirm_user: Whether to confirm the user.
:return: The authorization URL.
"""
await self._generate_request_token(callback_url)
if confirm_user:
return f"{self.base_address}{self.AUTHORIZE_SUFFIX}?oauth_token={self._request_token}&interactivity=confirm_user"
return f"{self.base_address}{self.AUTHORIZE_SUFFIX}?oauth_token={self._request_token}"
async def authorize(self, verifier: str, request_token, request_token_secret):
"""
Authorize the client with verifier and optionally token and token secret.
:param verifier: The verifier to authorize the client with.
:param token: The OAuth token obtained from the previous step.
:param token_secret: The OAuth token secret obtained from the previous step.
:return: The access token and secret.
"""
self._oauth_client.verifier = verifier
if request_token:
self._oauth_client.resource_owner_key = request_token
if request_token_secret:
self._oauth_client.resource_owner_secret = request_token_secret
url = f"{self.base_address}{self.ACCESS_TOKEN_SUFFIX}"
url, headers, body = self._oauth_client.sign(url, http_method="POST")
try:
async with self._session.post(url, data=body, headers=headers) as response:
await self._handle_response_errors(response)
data = dict(urllib.parse.parse_qsl(await response.text()))
self.load_access_token(data["oauth_token"], data["oauth_token_secret"])
_LOGGER.info(
f"Authorization successful, received access token: {self.access_token}"
)
return self.access_token, self.access_token_secret
except AttributeError as e:
if e.args[0] == "'NoneType' object has no attribute 'post'":
raise USOSAPIException(
"Authorization failed. Did you forget to open the manager?"
)
raise
def load_access_token(self, access_token: str, access_token_secret: str):
"""
Load the access token and secret into the manager.
:param access_token: The access token.
:param access_token_secret: The access token secret.
"""
self.access_token = access_token
self.access_token_secret = access_token_secret
self._oauth_client = Client(
self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.access_token,
resource_owner_secret=self.access_token_secret,
)
def get_access_token(self):
return self.access_token, self.access_token_secret
def get_request_token(self):
return self._request_token, self._request_token_secret
def sign_request(
self, url: str, http_method: str = "GET", **kwargs
) -> tuple[str, dict, dict]:
"""
Sign a request with the OAuth client.
:param url: The URL to sign.
:param http_method: The HTTP method to use.
:param kwargs: Additional parameters to pass.
:return: The signed URL, headers, and body.
"""
if not self.access_token:
raise USOSAPIException("Access token not set. Did you forget to authorize?")
url, headers, body = self._oauth_client.sign(
url, http_method=http_method, **kwargs
)
return url, headers, body
async def _handle_response_errors(self, response: aiohttp.ClientResponse):
"""
Handle errors in the response.
:param response: The response to handle.
:raises USOSAPIException: If an error occurred.
"""
if response.status != 200:
text = await response.text()
if response.status == 401:
_LOGGER.error(
f"HTTP 401: Unauthorized. Your access key probably expired. Response: {text}"
)
raise USOSAPIException(
"HTTP 401: Unauthorized. Your access key probably expired."
)
elif response.status == 400:
raise USOSAPIException(f"HTTP 400: Bad request: {text}")
else:
raise USOSAPIException(f"HTTP {response.status}: {text}")
async def _revoke_token(self):
"""
Revoke the current access token.
"""
url = f"{self.base_address}{self.REVOKE_TOKEN_SUFFIX}"
url, headers, body = self._oauth_client.sign(url, http_method="POST")
async with self._session.post(url, data=body, headers=headers) as response:
await self._handle_response_errors(response)
_LOGGER.info("Token revoked successfully.")
async def logout(self):
"""
Log out the user.
"""
if not self.access_token:
return
await self._revoke_token()
self.access_token = None