Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add test for nonexistent user #237

Open
github-actions bot opened this issue Oct 22, 2024 · 0 comments
Open

add test for nonexistent user #237

github-actions bot opened this issue Oct 22, 2024 · 0 comments
Labels

Comments

@github-actions
Copy link

if (user := await User.y.get(user_id, False)) is None: # TODO: add test for nonexistent user

                "accent_color": data.accent_color
            }
        }
        if guild_id and (guild := await models.Guild.get_or_none(id=guild_id)):
            if member := await guild.get_member(self.id):
                data["guild_member_profile"] = {"guild_id": str(guild_id)}
                data["guild_member"] = await member.ds_json()
        if mutual_friends_count:
            data["mutual_friends_count"] = 0  # TODO: add mutual friends count
        if with_mutual_guilds:
            query = Q(user=self)
            if self != other_user:
                query &= Q(guild__id__in=Subquery(
                    models.GuildMember
                    .filter(user__id__in=[self.id, other_user.id])
                    .group_by("guild_id")
                    .annotate(user_count=Count("user__id", distinct=True))
                    .filter(user_count=2)
                    .values_list("guild_id", flat=True)
                ))

            data["mutual_guilds"] = [
                {"id": str(guild_id), "nick": nick}
                for nick, guild_id in await models.GuildMember.filter(query).values_list("nick", "guild_id")
            ]
        if self.is_bot:
            data["user"]["bot"] = True

        return data

    # noinspection PyMethodMayBeStatic
    async def get_another_user(self, user_id: int) -> User:
        # TODO: check for relationship, mutual guilds or mutual friends
        if (user := await User.y.get(user_id, False)) is None:  # TODO: add test for nonexistent user
            raise UnknownUser
        return user

    def check_password(self, password: str) -> bool:
        return checkpw(self.y.prepare_password(password, self.id), self.password.encode("utf8"))

    def hash_new_password(self, password: str) -> str:
        return self.y.hash_password(password, self.id)

    async def change_password(self, new_password: str) -> None:
        self.password = self.hash_new_password(new_password)
        await self.save(update_fields=["password"])

    async def change_username(self, username: str) -> None:
        data = await self.data
        discriminator = data.discriminator
        if await User.y.getByUsername(username, discriminator):
            discriminator = await self.y.get_free_discriminator(username)
            if discriminator is None:
                raise InvalidDataErr(400, Errors.make(50035, {"username": {
                    "code": "USERNAME_TOO_MANY_USERS",
                    "message": "This name is used by too many users. Please enter something else or try again."
                }}))
        data.username = username
        data.discriminator = discriminator
        await data.save(update_fields=["username", "discriminator"])

    async def change_discriminator(self, new_discriminator: int, username_changed: bool = False) -> bool:
        data = await self.data
        username = data.username
        if await self.y.getByUsername(username, new_discriminator):
            if username_changed:
                return False
            raise InvalidDataErr(400, Errors.make(50035, {"username": {
                "code": "USERNAME_TOO_MANY_USERS",
                "message": "This discriminator already used by someone. Please enter something else."
            }}))
        data.discriminator = new_discriminator
        await data.save(update_fields=["discriminator"])
        return True

    async def change_email(self, new_email: str) -> None:
        new_email = new_email.lower()
        if self.email == new_email:
            return
        if await User.exists(email=new_email):
            raise InvalidDataErr(400, Errors.make(50035, {"email": {"code": "EMAIL_ALREADY_REGISTERED",
                                                                    "message": "Email address already registered."}}))
        self.email = new_email
        self.verified = False
        await self.save()

    async def create_backup_codes(self) -> list[str]:
        codes = ["".join([choice('abcdefghijklmnopqrstuvwxyz0123456789') for _ in range(8)]) for _ in range(10)]

        await self.clear_backup_codes()
        await models.MfaCode.bulk_create([
            models.MfaCode(user=self, code=code) for code in codes
        ])

        return codes

    async def clear_backup_codes(self) -> None:
        await models.MfaCode.filter(user=self).delete()

    async def get_backup_codes(self) -> list[str]:
        return [code.code for code in await models.MfaCode.filter(user=self).limit(10)]

    async def use_backup_code(self, code: str) -> bool:
        if (code := await models.MfaCode.get_or_none(user=self, code=code, used=False)) is None:
            return False
        code.used = True
        await code.save(update_fields=["used"])
        return True

    async def y_delete(self) -> None:
        await self.update(deleted=True, email=f"deleted_{self.id}@yepcord.ml", password="")
        data = await self.data
        await data.update(discriminator=0, username=f"Deleted User {hex(self.id)[2:]}", avatar=None, public_flags=0,
                          avatar_decoration=None)
        await models.Session.filter(user=self).delete()
        await models.Relationship.filter(Q(from_user=self) | Q(to_user=self)).delete()
        await models.MfaCode.filter(user=self).delete()
        await models.GuildMember.filter(user=self).delete()
        await models.UserSettings.filter(user=self).delete()
        await models.FrecencySettings.filter(user=self).delete()
        await models.Invite.filter(inviter=self).delete()
        await models.ReadState.filter(user=self).delete()

    async def get_guilds(self) -> list[models.Guild]:
        return [
            member.guild for member in
            await models.GuildMember.filter(user=self).select_related("guild", "guild__owner")
        ]

    async def get_private_channels(self) -> list[models.Channel]:
        return [
            channel
            for channel in await models.Channel.filter(recipients__id=self.id).select_related("owner")
            if not await channel.dm_is_hidden(self)
        ]

    async def get_relationships(self) -> list[models.Relationship]:
        return [
            relationship
            for relationship in
            await models.Relationship.filter(Q(from_user=self) | Q(to_user=self)).select_related("from_user", "to_user")
            if not (relationship.type == RelationshipType.BLOCK and relationship.from_user.id != self.id)
        ]

    async def get_related_users(self) -> list[models.User]:
        users = {
            relationship.other_user(self).id: relationship.other_user(self)
            for relationship in await self.get_relationships()
        }
        for channel in await models.Channel.filter(recipients__id=self.id):
            for recipient in await channel.recipients.all():
                if recipient.id in users or recipient == self:
                    continue
                users[recipient.id] = recipient

        return list(users.values())

    async def get_mfa_key(self) -> str | None:
        return cast(str, await models.UserSettings.get(user=self).values_list("mfa", flat=True))

    async def generate_mfa_nonce(self) -> tuple[str, str]:
        key = b64decode(Config.KEY)
        exp = time() + 600
        code = b64encode(urandom(16))
        nonce = JWT.encode({"t": MfaNonceType.NORMAL, "c": code, "u": self.id}, key, exp)
        rnonce = JWT.encode({"t": MfaNonceType.REGENERATE, "c": code, "u": self.id}, key, exp)
        return nonce, rnonce

    async def verify_mfa_nonce(self, nonce: str, nonce_type: MfaNonceType) -> None:
        key = b64decode(Config.KEY)
        assert_(payload := JWT.decode(nonce, key), InvalidKey)
        assert_(payload["u"] == self.id, InvalidKey)
        assert_(nonce_type == payload["t"], InvalidKey)

    async def update_read_state(self, channel: models.Channel, count: int, last: int) -> None:
        await models.ReadState.update_or_create(user=self, channel=channel, defaults={
            "last_read_id": last,
            "count": count,
        })

    async def send_verification_email(self) -> None:
        token = JWT.encode({"id": self.id, "email": self.email}, b64decode(Config.KEY), expires_after=600)
        await EmailMsg.send_verification(self.email, token)

    async def get_read_states(self) -> list[models.ReadState]:
        if self.is_bot:
            return []

        return await models.ReadState.filter(user=self).select_related("channel")
@github-actions github-actions bot added the todo label Oct 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

0 participants