From 3fed0f4bf87477fda893639364a759e6c398f0a4 Mon Sep 17 00:00:00 2001 From: Sayan Biswas Date: Fri, 12 Nov 2021 01:16:21 +0530 Subject: [PATCH] new: drag and drop module for Sibyl System API Signed-off-by: Sayan Biswas --- tg_bot/modules/sibylsystem.py | 151 ++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 tg_bot/modules/sibylsystem.py diff --git a/tg_bot/modules/sibylsystem.py b/tg_bot/modules/sibylsystem.py new file mode 100644 index 00000000..be32f660 --- /dev/null +++ b/tg_bot/modules/sibylsystem.py @@ -0,0 +1,151 @@ +from configparser import ConfigParser +import os, logging, threading +from telegram.error import BadRequest +from telegram.ext.commandhandler import CommandHandler +from telegram.ext.filters import Filters +from telegram.ext.messagehandler import MessageHandler + +from ..modules.helper_funcs.chat_status import user_admin +from ..import dispatcher +from telegram.ext import CallbackContext +from telegram import Update +from sqlalchemy import Column, String, Boolean + +from ..modules.sql import BASE, SESSION + +logging.info("Drag and drop Sibyl System Plugin by Sayan Biswas [github.com/Dank-del // t.me/dank_as_fuck] @Kaizoku") + +class SibylSettings(BASE): + __tablename__ = "chat_sibyl_settings" + chat_id = Column(String(14), primary_key=True) + setting = Column(Boolean, default=True, nullable=False) + + def __init__(self, chat_id, disabled): + self.chat_id = str(chat_id) + self.setting = disabled + + def __repr__(self): + return "".format(self.chat_id, self.setting) + +SibylSettings.__table__.create(checkfirst=True) + +SIBYL_SETTING_LOCK = threading.RLock() +SIBYLBAN_LIST = set() + +def enable_sibyl(chat_id): + with SIBYL_SETTING_LOCK: + chat = SESSION.query(SibylSettings).get(str(chat_id)) + if not chat: + chat = SibylSettings(chat_id, True) + + chat.setting = True + SESSION.add(chat) + SESSION.commit() + if str(chat_id) in SIBYLBAN_LIST: + SIBYLBAN_LIST.remove(str(chat_id)) + +def disable_sibyl(chat_id): + with SIBYL_SETTING_LOCK: + chat = SESSION.query(SibylSettings).get(str(chat_id)) + if not chat: + chat = SibylSettings(chat_id, False) + + chat.setting = False + SESSION.add(chat) + SESSION.commit() + SIBYLBAN_LIST.add(str(chat_id)) + +def __load_sibylban_list(): + global SIBYLBAN_LIST + try: + SIBYLBAN_LIST = { + x.chat_id for x in SESSION.query(SibylSettings).all() if not x.setting + } + finally: + SESSION.close() + + +def does_chat_sibylban(chat_id): + return str(chat_id) not in SIBYLBAN_LIST + + +if os.getenv("ENV", "False") == "False": + try: + p = ConfigParser() + p.read("config.ini") + sk = p.get("kigconfig", "SIBYL_KEY") + except BaseException as e: + logging.warning("Not loading Sibyl System plugin due to {}".format(e)) + sk = None +elif os.getenv("ENV", "False") == "True": + sk = os.getenv("SIBYL_KEY") + + +if sk: + try: + from SibylSystem import PsychoPass + from SibylSystem.exceptions import GeneralException + except ImportError as e: + logging.warning('Not loading Sibyl System plugin due to {}'.format(e)) + try: + client = PsychoPass(sk) + logging.info("Connection to Sibyl System was successful...") + except BaseException as e: + logging.warning("Not loading Sibyl System plugin due to {}".format(e)) + client = None +else: + client = None + +# Create in memory userid to avoid disk access +__load_sibylban_list() + +def sibyl_ban(update: Update, context: CallbackContext): + message = update.effective_message + chat = update.effective_chat + user = update.effective_user + if not user: + return + bot = context.bot + if not does_chat_sibylban(chat.id): + return + + if client: + try: + data = client.get_info(user.id) + except GeneralException: + return + except BaseException as e: + logging.error(e) + return + if data.banned: + try: + bot.kick_chat_member(chat_id=chat.id, user_id=user.id) + except BadRequest: + return + except BaseException as e: + logging.error("Failed to ban {} in {} due to {}".format(user.id, chat.id, e)) + txt = '''Dominator locked on {}\n'''.format(user.mention_html()) + txt += "Target was Eliminated with {}\n\n".format("Lethal Eliminator" if not data.is_bot else "Destroy Decomposer") + txt += "Reason: {}\n".format(data.reason) + txt += "Ban Flag(s): {}\n".format(", ".join(data.ban_flags)) + txt += "Inspector ID: {}\n".format(data.banned_by) + txt += "Ban time: {}\n\n".format(data.date) + txt += "If the enforcement was unjust in any way, kindly report it to @PublicSafetyBureau or disable this feature using /sibylban" + message.reply_html(text=txt, disable_web_page_preview=True) + +@user_admin +def toggle_sibyl(update: Update, _: CallbackContext): + message = update.effective_message + chat = update.effective_chat + do = does_chat_sibylban(chat.id) + if not do: + enable_sibyl(chat.id) + message.reply_text("Dominator enabled for {}".format(chat.title)) + else: + disable_sibyl(chat.id) + message.reply_text("Dominator disabled for {}".format(chat.title)) + + return + +dispatcher.add_handler(MessageHandler(filters=Filters.chat_type.groups, callback=sibyl_ban), group=101) +dispatcher.add_handler(CommandHandler(command="sibylban", callback=toggle_sibyl, run_async=True, filters=Filters.chat_type.groups), group=100) \ No newline at end of file