from PyQt6.QtGui import QIcon, QFont, QPixmap, QRegularExpressionValidator from PyQt6.QtCore import QSize, QObject, QThread, pyqtSignal, Qt, QRegularExpression import sys import os import re import requests import json from PyQt6.QtWidgets import ( QApplication, QMainWindow, QLabel, QPushButton, QFileDialog, QLineEdit, QPlainTextEdit ) base_path = getattr(sys, '_MEIPASS', os.path.dirname(os.path.abspath(__file__))) icon = os.path.join(base_path, './assets/logo.png') github = os.path.join(base_path, './assets/github.png') twitter = os.path.join(base_path, './assets/twitter.png') output_file = os.path.join(base_path, './assets/file.png') listpng = os.path.join(base_path, './assets/list.png') createpng = os.path.join(base_path, './assets/create.png') deletepng = os.path.join(base_path, './assets/delete.png') exportpng = os.path.join(base_path, './assets/exportpng.png') max_ips = 100 class Worker(QObject): finished = pyqtSignal() progress = pyqtSignal(tuple) def list(self): with open('user_id.json', 'r') as json_file: user_data = json.load(json_file) email = user_data['email'] api_token = user_data['api_token'] zone_id = user_data['zone_id'] domain = user_data['domain'] dns_record_name = user_data['ip_dns_record'] url = f'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records' headers = { 'X-Auth-Email': email, 'X-Auth-Key': api_token, 'Content-Type': 'application/json' } dns_records = [] page_num = 1 try: while True: # loop over all pages params = {'page': page_num, 'per_page': 100} # update pagination params response = requests.request('GET', url, headers=headers, params=params) data = response.json()['result'] if not data: # no more records to fetch break for record in data: # add records to the list dns_records.append(record) page_num += 1 i_num = 0 for record in dns_records: subdomain = record['name'] this_domain = record['zone_name'] this_dns_record = subdomain.replace(f".{this_domain}", "") if this_dns_record == dns_record_name: content = record['content'] self.progress.emit((i_num + 1, subdomain, content, "")) i_num += 1 self.finished.emit() self.finished.emit() except: self.progress.emit(("Error\nPlease check:\n- your Internet connection\n- your information",)) self.finished.emit() # export json def export_json(self): with open('user_id.json', 'r') as json_file: user_data = json.load(json_file) email = user_data['email'] api_token = user_data['api_token'] zone_id = user_data['zone_id'] domain = user_data['domain'] dns_record_name = user_data['ip_dns_record'] url = f'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records' headers = { 'X-Auth-Email': email, 'X-Auth-Key': api_token, 'Content-Type': 'application/json' } dns_records = [] page_num = 1 export_json = {} export_json["workingIPs"] = [] #make json files with IPs and <<< same Dataed >>> # TODO change the Dates export_json["startDate"] = "2023-04-04T10:41:35.5737055-07:00" export_json["endDate"] = "2023-04-04T10:41:35.5737056-07:05" try: while True: # loop over all pages params = {'page': page_num, 'per_page': 100} # update pagination params response = requests.request('GET', url, headers=headers, params=params) data = response.json()['result'] if not data: # no more records to fetch break for record in data: # add records to the list dns_records.append(record) page_num += 1 i_num = 0 for number, record in enumerate(dns_records): subdomain = record['name'] this_domain = record['zone_name'] this_dns_record = subdomain.replace(f".{this_domain}", "") if this_dns_record == dns_record_name: content = record['content'] #add dns record working ips to json file export_json["workingIPs"].append({"delay": number, "ip": content}) self.progress.emit((i_num + 1, subdomain, content,"✓")) i_num += 1 export_json["totalFoundWorkingIPs"] = i_num export_json["totalFoundWorkingIPsCurrentRange"] = i_num with open(f"{dns_record_name}.{domain}.json", "w") as f: json.dump(export_json, f) f.close() self.finished.emit() self.finished.emit() except: self.progress.emit(("Error\nPlease check:\n- your Internet connection\n- your information",)) self.finished.emit() # craete DNS record from scan file def create(self): #for .json files def scan_to_iplist(): with open(self.json_path_create, 'r') as f: data = json.load(f) ip_list = [i['ip'] for i in data['workingIPs']] f.close() return ip_list # for .cf files def linux_scan_to_iplist(): with open(self.json_path_create, 'r') as f: data = f.readlines() ip_list = [] for item in data: # iterate over the list items result = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', item) # extract the IP using regular expression if result: # check if IP is found and add to the list ip_list.append(result.group(0)) return ip_list # for .csv files def online_scan_to_iplist(): with open(self.json_path_create, 'r') as f: data = f.readlines() ip_list = [] for item in data: # iterate over the list items result = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', item) # extract the IP using regular expression if result: # check if IP is found and add to the list ip_list.append(result.group(0)) return ip_list # open result file and save to list no_error = True try: if re.search(r'.+\.json$', self.json_path_create): iplist = scan_to_iplist() if iplist == []: self.progress.emit(('file has no valid ip',)) no_error = False else: with open('ip.txt', 'w') as f: f.write('\n'.join(iplist)) elif re.search(r'.+\.cf$', self.json_path_create): iplist = linux_scan_to_iplist() if iplist == []: self.progress.emit(('file has no valid ip',)) no_error = False else: with open('ip.txt', 'w') as f: f.write('\n'.join(iplist)) elif re.search(r'.+\.csv$', self.json_path_create): iplist = online_scan_to_iplist() if iplist == []: self.progress.emit(('file has no valid ip',)) no_error = False else: with open('ip.txt', 'w') as f: f.write('\n'.join(iplist)) except: self.progress.emit(('Error\nunsupported file',)) no_error = False def ip_list(): with open ('ip.txt', 'r') as f: myip = [line.strip() for line in f] f.close() return myip if no_error: all_ips = ip_list() len_all_ips = len(all_ips) def bestip(): filename = "best_ip.txt" topUnder100ip = [] ipn = 0 while(ipn < len_all_ips): topUnder100ip.append(all_ips[ipn]) ipn += 1 if ipn >= max_ips: break with open(filename, "w") as f: f.write("\n".join(topUnder100ip)) f.close() bestip() with open("best_ip.txt", "r") as ip: lines = ip.readlines() topUnder100ipList = [line.strip() for line in lines] with open('user_id.json', 'r') as json_file: user_data = json.load(json_file) email = user_data['email'] api_token = user_data['api_token'] zone_id = user_data['zone_id'] record_name = user_data["ip_dns_record"] domain = user_data["domain"] try: max_ip_user = int(user_data["max_ip"]) except: max_ip_user = len(topUnder100ipList) params_name = f'{record_name}.{domain}' url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records" headers = { "Content-Type": "application/json", "X-Auth-Email": email, "X-Auth-Key": api_token } ipn = 0 try: while(ipn < len(topUnder100ipList)): data = { "type": "A", "name": params_name, "content": f"{topUnder100ipList[ipn]}", "ttl": 1, "proxied": False } if (ipn >= max_ip_user): break response = requests.post(url, headers=headers, json=data) if response.status_code == 200: subdomain = response.json()["result"]["name"] message_content = response.json()["result"]["content"] self.progress.emit((ipn + 1, subdomain, message_content," ✓ added")) ipn += 1 if response.status_code == 400: subdomain = f"{record_name}.{domain}" message_content = topUnder100ipList[ipn] self.progress.emit((ipn + 1, subdomain, message_content, " ✘ exist")) ipn += 1 if ipn >= max_ips: break self.finished.emit() except: self.progress.emit(("Error\nPlease check:\n- your Internet connection\n- your information",)) self.finished.emit() else: self.finished.emit() # delete all ips of sumdomain def delete(self): with open('user_id.json', 'r') as json_file: user_data = json.load(json_file) email = user_data['email'] api_token = user_data['api_token'] zone_id = user_data['zone_id'] domain = user_data['domain'] dns_record_name = user_data["ip_dns_record"] url = f'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records' headers = { 'X-Auth-Email': email, 'X-Auth-Key': api_token, 'Content-Type': 'application/json' } page_num = 1 i_num = 0 try: while True: params = {"page": page_num,"per_page": 100} response = requests.request("GET", url, headers=headers, params=params) data = response.json()["result"] if not data: break for record in data: if record["name"] == f"{dns_record_name}.{domain}": url_del = f"{url}/{record['id']}" requests.delete(url_del, headers=headers) self.progress.emit((i_num + 1, record["name"], record["content"], " ✓ deleted")) i_num += 1 page_num += 1 self.finished.emit() except: self.progress.emit(("Error\nPlease check:\n- your Internet connection\n- your information",)) self.finished.emit() # Main WindowApp class CloudflareDNS(QMainWindow): def __init__(self, parent=None): super().__init__(parent) self.setupUi() # open file dialog window to select result file self.createButton.setEnabled(False) self.json_path_create = None self.browse_dialog = QFileDialog(self) self.browse_dialog.setNameFilter('SCAN files (*.json *.cf *.csv)') self.browse_dialog.setFileMode(QFileDialog.FileMode.ExistingFile) self.browse_dialog.fileSelected.connect(self.open_file) # open user_id file file_path = 'user_id.json' if os.path.exists(file_path): self.load_input_values() # setup UI of main Window def setupUi(self): self.setFixedSize(260, 405) self.move(5, 5) self.setWindowTitle("CloudflareDNS") self.setWindowIcon(QIcon(icon)) self.label_1 = QLabel(self) self.label_1.move(5, 365) image_1 = QPixmap(github) self.label_1.setPixmap(image_1) self.label_1.setOpenExternalLinks(True) self.label_1.setText(f'') self.label_2 = QLabel(self) self.label_2.move(50, 365) image_2 = QPixmap(twitter) self.label_2.setPixmap(image_2) self.label_2.setOpenExternalLinks(True) self.label_2.setText(f'') self.email() self.api_token() self.zone_id() self.domain() self.name() self.show_output() self.maxip() self.listButton = QPushButton(self) # list button self.listButton.setGeometry(5, 160, 37, 37) self.listButton.setStyleSheet("border-radius : 10px; border : 2px solid black") self.listButton.setIcon(QIcon(listpng)) self.listButton.setIconSize(QSize(45, 48)) self.listButton.clicked.connect(self.list_clicked) self.exportlistButton = QPushButton(self) # list button self.exportlistButton.setGeometry(46, 160, 37, 37) self.exportlistButton.setStyleSheet("border-radius : 10px; border : 2px solid black") self.exportlistButton.setIcon(QIcon(exportpng)) self.exportlistButton.setIconSize(QSize(37, 28)) self.exportlistButton.clicked.connect(self.exportlist_clicked) self.createButton = QPushButton(self) # create button self.createButton.setGeometry(175, 160, 37, 37) self.createButton.setIcon(QIcon(createpng)) self.createButton.setIconSize((QSize(40, 40))) self.createButton.setStyleSheet("border-radius : 10px; border : 2px solid black") self.createButton.clicked.connect(self.create_clicked) # delete button self.deleteButton = QPushButton(self) self.deleteButton.setGeometry(215, 160, 37, 37) self.deleteButton.setIcon(QIcon(deletepng)) self.deleteButton.setIconSize((QSize(43, 43))) self.deleteButton.setStyleSheet("border-radius : 10px; border : 2px solid black") self.deleteButton.clicked.connect(self.delete_clicked) #result.json button self.resultButton = QPushButton(self) self.resultButton.setGeometry(87, 160, 85, 37) self.resultButton.setStyleSheet("border-radius : 10px; border : 2px solid black") self.resultButton.setIcon(QIcon(output_file)) self.resultButton.setIconSize(QSize(70, 70)) self.resultButton.clicked.connect(self.get_file_path) self.resultButton.clicked.connect(self.refresh_buttons) def get_file_path(self): file_dialog = QFileDialog(self) file_dialog.setNameFilter('SCAN files (*.json *.cf *.csv)') file_dialog.setFileMode(QFileDialog.FileMode.ExistingFile) file_dialog.fileSelected.connect(self.open_file) file_dialog.show() def open_file(self, file_path): if file_path: self.json_path = file_path self.json_path_create = file_path self.refresh_buttons() def refresh_buttons(self): is_create_enabled = False if self.json_path_create: is_create_enabled = True self.createButton.setEnabled(is_create_enabled) # show output in a QPlainText def show_output(self): self.otext = QPlainTextEdit(self) self.otext.setGeometry(5, 240, 250, 120) self.otext.setStyleSheet("background-color: black; color: white; border-radius : 10px") self.otext.setFont(QFont("Cascadia Code", 10)) self.otext.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff) self.otext.setReadOnly(True) # set user id in user_id file def email(self): self.input_text = QLineEdit(self) self.input_text.setObjectName("email") self.input_text.setGeometry(5, 10, 250, 25) self.input_text.setFont(QFont("Comic Sans MS", 10, QFont.Weight.Bold)) self.input_text.setStyleSheet("border-radius : 10px; border : 2px solid black") self.input_text.setPlaceholderText("example@mail.com") def api_token(self): self.input_text = QLineEdit(self) self.input_text.setObjectName("api_token") self.input_text.setFont(QFont("Comic Sans MS", 9, QFont.Weight.Bold)) self.input_text.setStyleSheet("border-radius : 10px; border : 2px solid black") self.input_text.setGeometry(5, 40, 250, 25) self.input_text.setPlaceholderText("Global API Key") def zone_id(self): self.input_text = QLineEdit(self) self.input_text.setStyleSheet("border-radius : 10px; border : 2px solid black") self.input_text.setFont(QFont("Comic Sans MS", 9, QFont.Weight.Bold)) self.input_text.setObjectName("zone_id") self.input_text.setGeometry(5, 70, 250, 25) self.input_text.setPlaceholderText("Zone ID") def domain(self): self.input_text = QLineEdit(self) self.input_text.setStyleSheet("border-radius : 10px; border : 2px solid black") self.input_text.setFont(QFont("Comic Sans MS", 10, QFont.Weight.Bold)) self.input_text.setObjectName("domain") self.input_text.setGeometry(5, 100, 250, 25) self.input_text.setPlaceholderText("mydomain.com") def name(self): self.input_text = QLineEdit(self) self.input_text.setStyleSheet("border-radius : 10px; border : 2px solid black") self.input_text.setFont(QFont("Comic Sans MS", 10, QFont.Weight.Bold)) self.input_text.setObjectName("name") self.input_text.setGeometry(5, 130, 250, 25) self.input_text.setPlaceholderText("DNS record name") def maxip(self): self.input_text = QLineEdit(self) self.input_text.setStyleSheet("border-radius : 10px; border : 2px solid black") reg_ex = QRegularExpression("(?!0)\\d{1,2}|100") validator = QRegularExpressionValidator(reg_ex, self.input_text) self.input_text.setValidator(validator) self.input_text.setFont(QFont("Comic Sans MS", 8, QFont.Weight.Bold)) self.input_text.setObjectName("maxip") self.input_text.setGeometry(5, 205, 90, 25) self.input_text.setPlaceholderText("1≤max ip≤100") def load_input_values(self): with open('user_id.json', 'r') as f: user_data = json.load(f) self.findChild(QLineEdit, "email").setText(user_data['email']) self.findChild(QLineEdit, "api_token").setText(user_data['api_token']) self.findChild(QLineEdit, "zone_id").setText(user_data['zone_id']) self.findChild(QLineEdit, "domain").setText(user_data['domain']) self.findChild(QLineEdit, "name").setText(user_data['ip_dns_record']) self.findChild(QLineEdit, "maxip").setText(user_data['max_ip']) self.refresh_buttons() def get_input_values(self): user_data = {} user_data['email'] = self.findChild(QLineEdit, "email").text() user_data['api_token'] = self.findChild(QLineEdit, "api_token").text() user_data['zone_id'] = self.findChild(QLineEdit, "zone_id").text() user_data['domain'] = self.findChild(QLineEdit, "domain").text() user_data['ip_dns_record'] = self.findChild(QLineEdit, "name").text() user_data['max_ip'] = self.findChild(QLineEdit, "maxip").text() return user_data def save_input_values(self): with open('user_id.json', 'w') as f: json.dump(self.get_input_values(), f) def update_text(self, data): try: row_num, ip, message = data[0], data[2], data[3] text = f"{row_num}) {ip} {message}\n" self.otext.insertPlainText(text) except: error_message = data[0] self.otext.insertPlainText(error_message) def list_clicked(self): self.save_input_values() self.thread = QThread() self.worker = Worker() self.otext.clear() self.worker.moveToThread(self.thread) self.worker.progress.connect(self.update_text) self.thread.started.connect(self.worker.list) self.worker.finished.connect(self.thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.thread.finished.connect(self.thread.deleteLater) self.thread.start() self.listButton.setEnabled(False) self.thread.finished.connect(lambda:self.listButton.setEnabled(True)) self.exportlistButton.setEnabled(False) self.thread.finished.connect(lambda:self.exportlistButton.setEnabled(True)) self.createButton.setEnabled(False) self.thread.finished.connect(lambda:self.refresh_buttons()) self.deleteButton.setEnabled(False) self.thread.finished.connect(lambda:self.deleteButton.setEnabled(True)) self.resultButton.setEnabled(False) self.thread.finished.connect(lambda:self.resultButton.setEnabled(True)) def exportlist_clicked(self): self.save_input_values() self.thread = QThread() self.worker = Worker() self.otext.clear() self.worker.moveToThread(self.thread) self.worker.progress.connect(self.update_text) self.thread.started.connect(self.worker.export_json) self.worker.finished.connect(self.thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.thread.finished.connect(self.thread.deleteLater) self.thread.start() self.listButton.setEnabled(False) self.thread.finished.connect(lambda:self.listButton.setEnabled(True)) self.exportlistButton.setEnabled(False) self.thread.finished.connect(lambda:self.exportlistButton.setEnabled(True)) self.createButton.setEnabled(False) self.thread.finished.connect(lambda:self.refresh_buttons()) self.deleteButton.setEnabled(False) self.thread.finished.connect(lambda:self.deleteButton.setEnabled(True)) self.resultButton.setEnabled(False) self.thread.finished.connect(lambda:self.resultButton.setEnabled(True)) def create_clicked(self): self.save_input_values() self.thread = QThread() self.worker = Worker() self.otext.clear() self.worker.json_path_create = self.json_path self.worker.moveToThread(self.thread) self.worker.progress.connect(self.update_text) self.thread.started.connect(self.worker.create) self.worker.finished.connect(self.thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.thread.finished.connect(self.thread.deleteLater) self.thread.start() self.listButton.setEnabled(False) self.thread.finished.connect(lambda:self.listButton.setEnabled(True)) self.exportlistButton.setEnabled(False) self.thread.finished.connect(lambda:self.exportlistButton.setEnabled(True)) self.createButton.setEnabled(False) self.thread.finished.connect(lambda:self.createButton.setEnabled(True)) self.deleteButton.setEnabled(False) self.thread.finished.connect(lambda:self.deleteButton.setEnabled(True)) self.resultButton.setEnabled(False) self.thread.finished.connect(lambda:self.resultButton.setEnabled(True)) def delete_clicked(self): self.save_input_values() self.thread = QThread() self.worker = Worker() self.otext.clear() self.worker.moveToThread(self.thread) self.worker.progress.connect(self.update_text) self.thread.started.connect(self.worker.delete) self.worker.finished.connect(self.thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.thread.finished.connect(self.thread.deleteLater) self.thread.start() self.listButton.setEnabled(False) self.thread.finished.connect(lambda:self.listButton.setEnabled(True)) self.exportlistButton.setEnabled(False) self.thread.finished.connect(lambda:self.exportlistButton.setEnabled(True)) self.createButton.setEnabled(False) self.thread.finished.connect(lambda:self.refresh_buttons()) self.deleteButton.setEnabled(False) self.thread.finished.connect(lambda:self.deleteButton.setEnabled(True)) self.resultButton.setEnabled(False) self.thread.finished.connect(lambda:self.resultButton.setEnabled(True)) app = QApplication(sys.argv) window = CloudflareDNS() window.show() sys.exit(app.exec())