We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server/yepcord/yepcord/models/user.py
Line 214 in 7dfe87e
"accent_color": data.accent_color } } if guild_id and (guild := await models.Guild.get_or_none(id=guild_id)): if member := await guild.get_member(self.id): data["guild_member_profile"] = {"guild_id": str(guild_id)} data["guild_member"] = await member.ds_json() if mutual_friends_count: data["mutual_friends_count"] = 0 # TODO: add mutual friends count if with_mutual_guilds: query = Q(user=self) if self != other_user: query &= Q(guild__id__in=Subquery( models.GuildMember .filter(user__id__in=[self.id, other_user.id]) .group_by("guild_id") .annotate(user_count=Count("user__id", distinct=True)) .filter(user_count=2) .values_list("guild_id", flat=True) )) data["mutual_guilds"] = [ {"id": str(guild_id), "nick": nick} for nick, guild_id in await models.GuildMember.filter(query).values_list("nick", "guild_id") ] if self.is_bot: data["user"]["bot"] = True return data # noinspection PyMethodMayBeStatic async def get_another_user(self, user_id: int) -> User: # TODO: check for relationship, mutual guilds or mutual friends if (user := await User.y.get(user_id, False)) is None: # TODO: add test for nonexistent user raise UnknownUser return user def check_password(self, password: str) -> bool: return checkpw(self.y.prepare_password(password, self.id), self.password.encode("utf8")) def hash_new_password(self, password: str) -> str: return self.y.hash_password(password, self.id) async def change_password(self, new_password: str) -> None: self.password = self.hash_new_password(new_password) await self.save(update_fields=["password"]) async def change_username(self, username: str) -> None: data = await self.data discriminator = data.discriminator if await User.y.getByUsername(username, discriminator): discriminator = await self.y.get_free_discriminator(username) if discriminator is None: raise InvalidDataErr(400, Errors.make(50035, {"username": { "code": "USERNAME_TOO_MANY_USERS", "message": "This name is used by too many users. Please enter something else or try again." }})) data.username = username data.discriminator = discriminator await data.save(update_fields=["username", "discriminator"]) async def change_discriminator(self, new_discriminator: int, username_changed: bool = False) -> bool: data = await self.data username = data.username if await self.y.getByUsername(username, new_discriminator): if username_changed: return False raise InvalidDataErr(400, Errors.make(50035, {"username": { "code": "USERNAME_TOO_MANY_USERS", "message": "This discriminator already used by someone. Please enter something else." }})) data.discriminator = new_discriminator await data.save(update_fields=["discriminator"]) return True async def change_email(self, new_email: str) -> None: new_email = new_email.lower() if self.email == new_email: return if await User.exists(email=new_email): raise InvalidDataErr(400, Errors.make(50035, {"email": {"code": "EMAIL_ALREADY_REGISTERED", "message": "Email address already registered."}})) self.email = new_email self.verified = False await self.save() async def create_backup_codes(self) -> list[str]: codes = ["".join([choice('abcdefghijklmnopqrstuvwxyz0123456789') for _ in range(8)]) for _ in range(10)] await self.clear_backup_codes() await models.MfaCode.bulk_create([ models.MfaCode(user=self, code=code) for code in codes ]) return codes async def clear_backup_codes(self) -> None: await models.MfaCode.filter(user=self).delete() async def get_backup_codes(self) -> list[str]: return [code.code for code in await models.MfaCode.filter(user=self).limit(10)] async def use_backup_code(self, code: str) -> bool: if (code := await models.MfaCode.get_or_none(user=self, code=code, used=False)) is None: return False code.used = True await code.save(update_fields=["used"]) return True async def y_delete(self) -> None: await self.update(deleted=True, email=f"deleted_{self.id}@yepcord.ml", password="") data = await self.data await data.update(discriminator=0, username=f"Deleted User {hex(self.id)[2:]}", avatar=None, public_flags=0, avatar_decoration=None) await models.Session.filter(user=self).delete() await models.Relationship.filter(Q(from_user=self) | Q(to_user=self)).delete() await models.MfaCode.filter(user=self).delete() await models.GuildMember.filter(user=self).delete() await models.UserSettings.filter(user=self).delete() await models.FrecencySettings.filter(user=self).delete() await models.Invite.filter(inviter=self).delete() await models.ReadState.filter(user=self).delete() async def get_guilds(self) -> list[models.Guild]: return [ member.guild for member in await models.GuildMember.filter(user=self).select_related("guild", "guild__owner") ] async def get_private_channels(self) -> list[models.Channel]: return [ channel for channel in await models.Channel.filter(recipients__id=self.id).select_related("owner") if not await channel.dm_is_hidden(self) ] async def get_relationships(self) -> list[models.Relationship]: return [ relationship for relationship in await models.Relationship.filter(Q(from_user=self) | Q(to_user=self)).select_related("from_user", "to_user") if not (relationship.type == RelationshipType.BLOCK and relationship.from_user.id != self.id) ] async def get_related_users(self) -> list[models.User]: users = { relationship.other_user(self).id: relationship.other_user(self) for relationship in await self.get_relationships() } for channel in await models.Channel.filter(recipients__id=self.id): for recipient in await channel.recipients.all(): if recipient.id in users or recipient == self: continue users[recipient.id] = recipient return list(users.values()) async def get_mfa_key(self) -> str | None: return cast(str, await models.UserSettings.get(user=self).values_list("mfa", flat=True)) async def generate_mfa_nonce(self) -> tuple[str, str]: key = b64decode(Config.KEY) exp = time() + 600 code = b64encode(urandom(16)) nonce = JWT.encode({"t": MfaNonceType.NORMAL, "c": code, "u": self.id}, key, exp) rnonce = JWT.encode({"t": MfaNonceType.REGENERATE, "c": code, "u": self.id}, key, exp) return nonce, rnonce async def verify_mfa_nonce(self, nonce: str, nonce_type: MfaNonceType) -> None: key = b64decode(Config.KEY) assert_(payload := JWT.decode(nonce, key), InvalidKey) assert_(payload["u"] == self.id, InvalidKey) assert_(nonce_type == payload["t"], InvalidKey) async def update_read_state(self, channel: models.Channel, count: int, last: int) -> None: await models.ReadState.update_or_create(user=self, channel=channel, defaults={ "last_read_id": last, "count": count, }) async def send_verification_email(self) -> None: token = JWT.encode({"id": self.id, "email": self.email}, b64decode(Config.KEY), expires_after=600) await EmailMsg.send_verification(self.email, token) async def get_read_states(self) -> list[models.ReadState]: if self.is_bot: return [] return await models.ReadState.filter(user=self).select_related("channel")
The text was updated successfully, but these errors were encountered:
No branches or pull requests
server/yepcord/yepcord/models/user.py
Line 214 in 7dfe87e
The text was updated successfully, but these errors were encountered: