Skip to content

Instantly share code, notes, and snippets.

@rhaidiz
Last active April 22, 2023 21:22
Show Gist options
  • Save rhaidiz/7c5b1f752faf75681463 to your computer and use it in GitHub Desktop.
Save rhaidiz/7c5b1f752faf75681463 to your computer and use it in GitHub Desktop.
sqlmap APIs wrapper
#!/usr/bin/env python3.5
"""
This library provides convenient methods for executing
and accessing sqlmap APIs.
NOTE: Tested only with python3.5
"""
import time
import json
import atexit
import shutil
import pexpect
import os.path
import requests
import threading
import subprocess
import logging
from modules.logger import logger
logging.getLogger("requests").setLevel(logging.DEBUG) # configure loggin to log only from WARNING up
logger = logging.getLogger("wsfast")
SQLMAP_API = "./sqlmapapi.py -s"
SQLMAP_SERVER_IP = "127.0.0.1"
SQLMAP_SERVER_PORT = "8775"
SQLMAP_BASE_URL = "http://"+SQLMAP_SERVER_IP+":"+SQLMAP_SERVER_PORT
sqlmap_process = None
""" Run the api as a web server """
def run_api_server():
global sqlmap_process
atexit.register(exiting)
logger.info("starting sqlmap APIs")
# NOTE: when executing sqlmapapi.py the working directory must be ./sqlmap/ otherwise when the analysis
# is started, it raises a not file execeptio 'cause it cannot find sqlmap.py
sqlmap_process = subprocess.Popen(SQLMAP_API.split(" "),stderr=subprocess.PIPE, stdout=subprocess.PIPE,cwd="./sqlmap/")
while True:
line = sqlmap_process.stdout.readline()
if "REST-JSON API server connected to IPC database" in line.decode('utf-8'):
# the webserver is up and running
return True
return False
""" Kill sqlmap API process """
def kill():
sqlmap_process.kill()
""" Set a value for a specified task_id """
def set_option(option,value,task_id):
url = SQLMAP_BASE_URL+"/option/"+task_id+"/set"
params = { option : value }
logger.debug("task %s setting %s to %s" %(task_id, option, value))
r = requests.post(url,json=params)
try:
json_result = json.loads(r.text)
except json.decoder.JSONDecodeError as e:
logger.critical("JSON decoder error")
exit()
if json_result['success'] == True:
return True
else:
return False
""" Create a new task """
def new_task():
url = SQLMAP_BASE_URL+"/task/new"
r = requests.get(url)
try:
json_result = json.loads(r.text)
except json.decoder.JSONDecodeError as e:
logger.critical("JSON decoder error")
exit()
if json_result['success'] == True:
return json_result['taskid']
else:
return False
""" Delete a task """
def del_task(task_id):
url = SQLMAP_BASE_URL+"/task/"+task_id+"/delete"
r = requests.get(url)
try:
json_result = json.loads(r.text)
except json.decoder.JSONDecodeError as e:
logger.critical("JSON decoder error")
exit()
return json_result['success']
""" Start scanning """
def start_scan(url_to_scan,task_id):
url = SQLMAP_BASE_URL+"/scan/"+task_id+"/start"
logger.debug(url)
params = { "url" : url_to_scan }
r = requests.post(url,json=params)
try:
json_result = json.loads(r.text)
except json.decoder.JSONDecodeError as e:
logger.critical("start_scan JSON decoder error")
logger.debug(r.text)
exit()
if json_result['success'] == True:
return json_result['engineid']
else:
return False
""" Retrive sqlmap status """
def get_status(task_id):
url = SQLMAP_BASE_URL+"/scan/"+task_id+"/status"
r = requests.get(url)
try:
json_result = json.loads(r.text)
except json.decoder.JSONDecodeError as e:
logger.critical("JSON decoder error")
exit()
return json_result['status']
""" Retrieve sqlmap log """
def get_log(task_id):
url = SQLMAP_BASE_URL+"/scan/"+task_id+"/log"
r = requests.get(url)
try:
json_result = json.loads(r.text)
except json.decoder.JSONDecodeError as e:
logger.critical("JSON decoder error")
exit()
return json_result['log']
""" Get result data """
def get_data(task_id):
url = SQLMAP_BASE_URL+"/scan/"+task_id+"/data"
r = requests.get(url)
try:
json_result = json.loads(r.text)
except json.decoder.JSONDecodeError as e:
logger.critical("JSON decoder error")
exit()
return json_result
""" Kill a specified task """
def kill_task(task_id):
url = SQLMAP_BASE_URL+"/scan/"+task_id+"/kill"
r = requests.get(url)
try:
json_result = json.loads(r.text)
except json.decoder.JSONDecodeError as e:
logger.critical("JSON decoder error")
exit()
if json_result['success'] == True:
return json_result
else:
return False
""" Exiting """
def exiting():
kill()
logger.debug("Done!")
if __name__ == "__main__":
#---------------------#
# Testing the library #
#---------------------#
run_api_server()
task = new_task()
print("Created a new task " + task)
## configuring the scanner for testing the chained attack case
print(set_option("authType","Basic",task))
print(set_option("authCred","regis:password",task))
print(set_option("data","username=a&password=0",task))
print(set_option("getTables","true",task))
# starting the scan
print(start_scan("https://157.27.244.25/chained/chained/index.php",task))
stopFlag = threading.Event()
# checking when sqlmap ends
while not stopFlag.wait(5):
r = get_status(task)
if "terminated" in r:
print("Analysis terminated")
logger.info(get_data(task))
stopFlag.set()
kill()
else:
print("Analysis in progress ... ")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment