diff --git a/sample_config.ini b/sample_config.ini
index 423d5287..2b17083e 100644
--- a/sample_config.ini
+++ b/sample_config.ini
@@ -30,3 +30,4 @@ SPAMMERS =
LASTFM_API_KEY = https://www.last.fm/api/account/create
CF_API_KEY = coffehouse.intellivoid.net
BOT_API_URL = https://api.telegram.org/bot
+BOT_API_FILE_URL = https://api.telegram.org/file/bot
\ No newline at end of file
diff --git a/tg_bot/__init__.py b/tg_bot/__init__.py
index e3f66b6c..daeca5df 100644
--- a/tg_bot/__init__.py
+++ b/tg_bot/__init__.py
@@ -2,6 +2,7 @@
import os
import sys
import time
+from typing import List
import spamwatch
import telegram.ext as tg
from telethon import TelegramClient
@@ -48,41 +49,42 @@ def get_user_list(key):
class KigyoINIT:
def __init__(self, parser: ConfigParser):
self.parser = parser
- self.SYS_ADMIN = self.parser.getint('SYS_ADMIN', 0)
- self.OWNER_ID = self.parser.getint('OWNER_ID')
- self.OWNER_USERNAME = self.parser.get('OWNER_USERNAME', None)
- self.APP_ID = self.parser.getint("APP_ID")
- self.API_HASH = self.parser.get("API_HASH")
- self.WEBHOOK = self.parser.getboolean('WEBHOOK', False)
- self.URL = self.parser.get('URL', None)
- self.CERT_PATH = self.parser.get('CERT_PATH', None)
- self.PORT = self.parser.getint('PORT', None)
- self.INFOPIC = self.parser.getboolean('INFOPIC', False)
- self.DEL_CMDS = self.parser.getboolean("DEL_CMDS", False)
- self.STRICT_GBAN = self.parser.getboolean("STRICT_GBAN", False)
- self.ALLOW_EXCL = self.parser.getboolean("ALLOW_EXCL", False)
- self.CUSTOM_CMD = ['/', '!']
- self.BAN_STICKER = self.parser.get("BAN_STICKER", None)
- self.TOKEN = self.parser.get("TOKEN")
- self.DB_URI = self.parser.get("SQLALCHEMY_DATABASE_URI")
+ self.SYS_ADMIN: int = self.parser.getint('SYS_ADMIN', 0)
+ self.OWNER_ID: int = self.parser.getint('OWNER_ID')
+ self.OWNER_USERNAME: str = self.parser.get('OWNER_USERNAME', None)
+ self.APP_ID: str = self.parser.getint("APP_ID")
+ self.API_HASH: str = self.parser.get("API_HASH")
+ self.WEBHOOK: bool = self.parser.getboolean('WEBHOOK', False)
+ self.URL: str = self.parser.get('URL', None)
+ self.CERT_PATH: str = self.parser.get('CERT_PATH', None)
+ self.PORT: int = self.parser.getint('PORT', None)
+ self.INFOPIC: bool = self.parser.getboolean('INFOPIC', False)
+ self.DEL_CMDS: bool = self.parser.getboolean("DEL_CMDS", False)
+ self.STRICT_GBAN: bool = self.parser.getboolean("STRICT_GBAN", False)
+ self.ALLOW_EXCL: bool = self.parser.getboolean("ALLOW_EXCL", False)
+ self.CUSTOM_CMD: List[str] = ['/', '!']
+ self.BAN_STICKER: str = self.parser.get("BAN_STICKER", None)
+ self.TOKEN: str = self.parser.get("TOKEN")
+ self.DB_URI: str = self.parser.get("SQLALCHEMY_DATABASE_URI")
self.LOAD = self.parser.get("LOAD").split()
- self.LOAD = list(map(str, self.LOAD))
- self.MESSAGE_DUMP = self.parser.getint('MESSAGE_DUMP', None)
- self.GBAN_LOGS = self.parser.getint('GBAN_LOGS', None)
+ self.LOAD: List[str] = list(map(str, self.LOAD))
+ self.MESSAGE_DUMP: int = self.parser.getint('MESSAGE_DUMP', None)
+ self.GBAN_LOGS: int = self.parser.getint('GBAN_LOGS', None)
self.NO_LOAD = self.parser.get("NO_LOAD").split()
- self.NO_LOAD = list(map(str, self.NO_LOAD))
- self.spamwatch_api = self.parser.get('spamwatch_api', None)
- self.CASH_API_KEY = self.parser.get('CASH_API_KEY', None)
- self.TIME_API_KEY = self.parser.get('TIME_API_KEY', None)
- self.WALL_API = self.parser.get('WALL_API', None)
- self.LASTFM_API_KEY = self.parser.get('LASTFM_API_KEY', None)
- self.CF_API_KEY = self.parser.get("CF_API_KEY", None)
+ self.NO_LOAD: List[str] = list(map(str, self.NO_LOAD))
+ self.spamwatch_api: str = self.parser.get('spamwatch_api', None)
+ self.CASH_API_KEY: str = self.parser.get('CASH_API_KEY', None)
+ self.TIME_API_KEY: str = self.parser.get('TIME_API_KEY', None)
+ self.WALL_API: str = self.parser.get('WALL_API', None)
+ self.LASTFM_API_KEY: str = self.parser.get('LASTFM_API_KEY', None)
+ self.CF_API_KEY: str = self.parser.get("CF_API_KEY", None)
self.bot_id = 0 #placeholder
self.bot_name = "Kigyo" #placeholder
self.bot_username = "KigyoRobot" #placeholder
- self.DEBUG = self.parser.getboolean("IS_DEBUG", False)
- self.DROP_UPDATES = self.parser.getboolean("DROP_UPDATES", True)
- self.BOT_API_URL = self.parser.get('BOT_API_URL', "https://api.telegram.org/bot")
+ self.DEBUG: bool = self.parser.getboolean("IS_DEBUG", False)
+ self.DROP_UPDATES: bool = self.parser.getboolean("DROP_UPDATES", True)
+ self.BOT_API_URL: str = self.parser.get('BOT_API_URL', "https://api.telegram.org/bot")
+ self.BOT_API_FILE_URL: str = self.parser.get('BOT_API_FILE_URL', "https://api.telegram.org/file/bot")
def init_sw(self):
@@ -142,10 +144,10 @@ def init_sw(self):
from tg_bot.modules.sql import SESSION
if not KInit.DROP_UPDATES:
- updater = tg.Updater(token=TOKEN, base_url=KInit.BOT_API_URL, workers=min(32, os.cpu_count() + 4), request_kwargs={"read_timeout": 10, "connect_timeout": 10}, persistence=PostgresPersistence(session=SESSION))
+ updater = tg.Updater(token=TOKEN, base_url=KInit.BOT_API_URL, base_file_url=KInit.BOT_API_FILE_URL, workers=min(32, os.cpu_count() + 4), request_kwargs={"read_timeout": 10, "connect_timeout": 10}, persistence=PostgresPersistence(session=SESSION))
else:
- updater = tg.Updater(token=TOKEN, base_url=KInit.BOT_API_URL, workers=min(32, os.cpu_count() + 4), request_kwargs={"read_timeout": 10, "connect_timeout": 10})
+ updater = tg.Updater(token=TOKEN, base_url=KInit.BOT_API_URL, base_file_url=KInit.BOT_API_FILE_URL, workers=min(32, os.cpu_count() + 4), request_kwargs={"read_timeout": 10, "connect_timeout": 10})
telethn = TelegramClient(MemorySession(), APP_ID, API_HASH)
dispatcher = updater.dispatcher
diff --git a/tg_bot/__main__.py b/tg_bot/__main__.py
index 90a93b6b..c8051b83 100644
--- a/tg_bot/__main__.py
+++ b/tg_bot/__main__.py
@@ -118,7 +118,6 @@ def test(update: Update, context: CallbackContext):
update: Update -
context: CallbackContext -
'''
-
# pprint(ast.literal_eval(str(update)))
# update.effective_message.reply_text("Hola tester! _I_ *have* `markdown`", parse_mode=ParseMode.MARKDOWN)
update.effective_message.reply_text("This person edited a message")
@@ -126,7 +125,7 @@ def test(update: Update, context: CallbackContext):
@kigcallback(pattern=r'start_back')
@kigcmd(command='start', pass_args=True)
-def start(update: Update, context: CallbackContext): # sourcery no-metrics
+def start(update: Update, context: CallbackContext): # sourcery no-metrics
'''#TODO
Params:
@@ -137,7 +136,7 @@ def start(update: Update, context: CallbackContext): # sourcery no-metrics
args = context.args
if hasattr(update, 'callback_query'):
- query = update.callback_query
+ query = update.callback_query
if hasattr(query, 'id'):
first_name = update.effective_user.first_name
update.effective_message.edit_text(
@@ -152,7 +151,7 @@ def start(update: Update, context: CallbackContext): # sourcery no-metrics
[
InlineKeyboardButton(
text=gs(chat.id, "support_chat_link_btn"),
- url=f"https://t.me/YorktownEagleUnion",
+ url='https://t.me/YorktownEagleUnion',
),
InlineKeyboardButton(
text=gs(chat.id, "updates_channel_link_btn"),
@@ -162,11 +161,8 @@ def start(update: Update, context: CallbackContext): # sourcery no-metrics
text=gs(chat.id, "src_btn"),
url="https://github.com/Dank-del/EnterpriseALRobot",
),
-
],
-
[
-
InlineKeyboardButton(
text="Try inline",
switch_inline_query_current_chat="",
@@ -174,18 +170,18 @@ def start(update: Update, context: CallbackContext): # sourcery no-metrics
InlineKeyboardButton(
text="Help",
callback_data="help_back",
- ),
+ ),
InlineKeyboardButton(
text=gs(chat.id, "add_bot_to_group_btn"),
url="t.me/{}?startgroup=true".format(
context.bot.username
),
),
- ]
-
+ ],
]
),
)
+
context.bot.answer_callback_query(query.id)
return
@@ -223,7 +219,7 @@ def start(update: Update, context: CallbackContext): # sourcery no-metrics
[
InlineKeyboardButton(
text=gs(chat.id, "support_chat_link_btn"),
- url=f"https://t.me/YorktownEagleUnion",
+ url='https://t.me/YorktownEagleUnion',
),
InlineKeyboardButton(
text=gs(chat.id, "updates_channel_link_btn"),
@@ -233,33 +229,30 @@ def start(update: Update, context: CallbackContext): # sourcery no-metrics
text=gs(chat.id, "src_btn"),
url="https://github.com/Dank-del/EnterpriseALRobot",
),
-
],
-
[
-
- InlineKeyboardButton(
- text="Try inline",
- switch_inline_query_current_chat="",
- ),
- InlineKeyboardButton(
+ InlineKeyboardButton(
+ text="Try inline",
+ switch_inline_query_current_chat="",
+ ),
+ InlineKeyboardButton(
text="Help",
callback_data="help_back",
- ),
+ ),
InlineKeyboardButton(
text=gs(chat.id, "add_bot_to_group_btn"),
url="t.me/{}?startgroup=true".format(
context.bot.username
),
),
- ]
-
+ ],
]
),
)
+
else:
update.effective_message.reply_text(gs(chat.id, "grp_start_text"))
-
+
if hasattr(update, 'callback_query'):
query = update.callback_query
if hasattr(query, 'id'):
diff --git a/tg_bot/modules/approve.py b/tg_bot/modules/approve.py
index baf8cb8c..54b6b6ee 100644
--- a/tg_bot/modules/approve.py
+++ b/tg_bot/modules/approve.py
@@ -115,12 +115,13 @@ def approval(update, context):
chat = update.effective_chat
args = context.args
user_id = extract_user(message, args)
- member = chat.get_member(int(user_id))
if not user_id:
message.reply_text(
"I don't know who you're talking about, you're going to need to specify a user!"
)
return ""
+ member = chat.get_member(int(user_id))
+
if sql.is_approved(message.chat_id, user_id):
message.reply_text(
f"{member.user['first_name']} is an approved user. Locks, antiflood, and blocklists won't apply to them."
diff --git a/tg_bot/modules/error_handler.py b/tg_bot/modules/error_handler.py
index e3cffb72..1c31b8ad 100644
--- a/tg_bot/modules/error_handler.py
+++ b/tg_bot/modules/error_handler.py
@@ -5,7 +5,7 @@
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import CallbackContext, CommandHandler
-
+from psycopg2 import errors as sqlerrors
from tg_bot import KInit, dispatcher, DEV_USERS, OWNER_ID, log
@@ -36,7 +36,7 @@ def error_callback(update: Update, context: CallbackContext):
if e.find(KInit.TOKEN) != -1:
e = e.replace(KInit.TOKEN, "TOKEN")
- if update.effective_chat.type != "channel":
+ if update.effective_chat.type != "channel" and KInit.DEBUG:
try:
context.bot.send_message(update.effective_chat.id,
f"Sorry I ran into an error!\nError: {e}
\nThis incident has been logged. No further action is required.",
@@ -108,9 +108,10 @@ def list_errors(update: Update, context: CallbackContext):
context.bot.send_document(
update.effective_chat.id,
open("errors_msg.txt", "rb"),
- caption=f"Too many errors have occured..",
+ caption='Too many errors have occured..',
parse_mode="html",
)
+
return
update.effective_message.reply_text(msg, parse_mode="html")
diff --git a/tg_bot/modules/misc.py b/tg_bot/modules/misc.py
index 34dfbf51..77da41ec 100644
--- a/tg_bot/modules/misc.py
+++ b/tg_bot/modules/misc.py
@@ -19,7 +19,8 @@
WHITELIST_USERS,
INFOPIC,
sw,
- StartTime
+ StartTime,
+ KInit
)
from tg_bot.__main__ import STATS, USER_INFO, TOKEN
from tg_bot.modules.sql import SESSION
@@ -174,10 +175,8 @@ def info(update: Update, context: CallbackContext): # sourcery no-metrics
try:
user_member = chat.get_member(user.id)
if user_member.status == "administrator":
- result = requests.post(
- f"https://api.telegram.org/bot{TOKEN}/getChatMember?chat_id={chat.id}&user_id={user.id}"
- )
- result = result.json()["result"]
+ result = bot.get_chat_member(chat.id, user.id).to_json()
+ result = result["result"]
if "custom_title" in result.keys():
custom_title = result["custom_title"]
text += f"\nThis user holds the title {custom_title} here."
@@ -186,22 +185,22 @@ def info(update: Update, context: CallbackContext): # sourcery no-metrics
if user.id == OWNER_ID:
- text += f"\nThis person is my owner"
+ text += '\nThis person is my owner'
Nation_level_present = True
elif user.id in DEV_USERS:
- text += f"\nThis Person is a part of Eagle Union"
+ text += '\nThis Person is a part of Eagle Union'
Nation_level_present = True
elif user.id in SUDO_USERS:
- text += f"\nThe Nation level of this person is Royal"
+ text += '\nThe Nation level of this person is Royal'
Nation_level_present = True
elif user.id in SUPPORT_USERS:
- text += f"\nThe Nation level of this person is Sakura"
+ text += '\nThe Nation level of this person is Sakura'
Nation_level_present = True
elif user.id in SARDEGNA_USERS:
- text += f"\nThe Nation level of this person is Sardegna"
+ text += '\nThe Nation level of this person is Sardegna'
Nation_level_present = True
elif user.id in WHITELIST_USERS:
- text += f"\nThe Nation level of this person is Neptunia"
+ text += '\nThe Nation level of this person is Neptunia'
Nation_level_present = True
if Nation_level_present: