-
-
Save savon-noir/483da3fcfe1b5cce3994 to your computer and use it in GitHub Desktop.
python-libnmap: poc new process.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import multiprocessing | |
from threading import Thread | |
import subprocess | |
import sys | |
class NmapProcess(Thread): | |
""" | |
NmapProcess is a class which wraps around the nmap executable. | |
Consequently, in order to run an NmapProcess, nmap should be installed | |
on the host running the script. By default NmapProcess will produce | |
the output of the nmap scan in the nmap XML format. This could be then | |
parsed out via the NmapParser class from libnmap.parser module. | |
""" | |
def __init__(self, targets="127.0.0.1", | |
options="-sT", event_callback=None, safe_mode=True): | |
""" | |
Constructor of NmapProcess class. | |
:param targets: hosts to be scanned. Could be a string of hosts | |
separated with a coma or a python list of hosts/ip. | |
:type targets: string or list | |
:param options: list of nmap options to be applied to scan. | |
These options are all documented in nmap's man pages. | |
:param event_callback: callable function which will be ran | |
each time nmap process outputs data. This function will receive | |
two parameters: | |
1. the nmap process object | |
2. the data produced by nmap process. See readme for examples. | |
:param safe_mode: parameter to protect unsafe options like -oN, -oG, | |
-iL, -oA,... | |
:return: NmapProcess object | |
""" | |
Thread.__init__(self) | |
unsafe_opts = set(['-oG', '-oN', '-iL', '-oA', '-oS', '-oX', | |
'--iflist', '--resume', '--stylesheet', | |
'--datadir']) | |
nmap_binary_name = "nmap" | |
self.__nmap_fixed_options = "-oX - -vvv --stats-every 2s" | |
self.__nmap_binary = self._whereis(nmap_binary_name) | |
if self.__nmap_binary is None: | |
raise EnvironmentError(1, "nmap is not installed or could " | |
"not be found in system path") | |
if isinstance(targets, str): | |
self.__nmap_targets = targets.replace(" ", "").split(',') | |
elif isinstance(targets, list): | |
self.__nmap_targets = targets | |
else: | |
raise Exception("Supplied target list should be either a " | |
"string or a list") | |
self._nmap_options = set(options.split()) | |
if safe_mode and not self._nmap_options.isdisjoint(unsafe_opts): | |
raise Exception("unsafe options activated while safe_mode " | |
"is set True") | |
self.__nmap_dynamic_options = options | |
self.__nmap_command_line = self.get_command_line() | |
if event_callback and callable(event_callback): | |
self.__nmap_event_callback = event_callback | |
else: | |
self.__nmap_event_callback = None | |
(self.DONE, self.READY, self.RUNNING, | |
self.CANCELLED, self.FAILED) = range(5) | |
# API usable in callback function | |
self.__nmap_proc = None | |
self.__nmap_rc = 0 | |
self.__sudo_run = '' | |
self.__state = self.READY | |
self.__starttime = 0 | |
self.__endtime = 0 | |
self.__version = '' | |
self.__progress = 0 | |
self.__etc = 0 | |
self.__elapsed = '' | |
self.__summary = '' | |
self.__stdout = '' | |
self.__stderr = '' | |
def _whereis(self, program): | |
""" | |
Protected method enabling the object to find the full path of a binary | |
from its PATH environment variable. | |
:param program: name of a binary for which the full path needs to | |
be discovered. | |
:return: the full path to the binary. | |
:todo: add a default path list in case PATH is empty. | |
""" | |
for path in os.environ.get('PATH', '').split(':'): | |
if (os.path.exists(os.path.join(path, program)) and not | |
os.path.isdir(os.path.join(path, program))): | |
return os.path.join(path, program) | |
return None | |
def get_command_line(self): | |
""" | |
Public method returning the reconstructed command line ran via the lib | |
:return: the full nmap command line to run | |
:rtype: string | |
""" | |
return ("{0} {1} {2} {3} {4}".format(self.__sudo_run, | |
self.__nmap_binary, | |
self.__nmap_fixed_options, | |
self.__nmap_dynamic_options, | |
" ".join(self.__nmap_targets))) | |
def sudo_run(self, run_as='root'): | |
""" | |
Public method enabling the library's user to run the scan with | |
priviledges via sudo. The sudo configuration should be set manually | |
on the local system otherwise sudo will prompt for a password. | |
This method alters the command line by prefixing the sudo command to | |
nmap and will then call self.run() | |
:param run_as: user name to which the lib needs to sudo to run the scan | |
:return: return code from nmap execution | |
""" | |
sudo_user = run_as.split().pop() | |
try: | |
pwd.getpwnam(sudo_user).pw_uid | |
except KeyError: | |
_exmsg = "Username {0} does not exists. Please supply" | |
" a valid username".format(run_as) | |
raise EnvironmentError(_exmsg) | |
sudo_path = self._whereis("sudo") | |
if sudo_path is None: | |
raise EnvironmentError(2, "sudo is not installed or " | |
"could not be found in system path: " | |
"cannot run nmap with sudo") | |
self.__sudo_run = "{0} -u {1}".format(sudo_path, sudo_user) | |
rc = self.run() | |
self.__sudo_run = "" | |
return rc | |
def sudo_run_background(self, run_as='root'): | |
""" | |
Public method enabling the library's user to run in background a | |
nmap scan with priviledges via sudo. | |
The sudo configuration should be set manually on the local system | |
otherwise sudo will prompt for a password. | |
This method alters the command line by prefixing the sudo command to | |
nmap and will then call self.run() | |
:param run_as: user name to which the lib needs to sudo to run the scan | |
:return: return code from nmap execution | |
""" | |
sudo_user = run_as.split().pop() | |
try: | |
pwd.getpwnam(sudo_user).pw_uid | |
except KeyError: | |
_exmsg = "Username {0} does not exists. Please supply" | |
" a valid username".format(run_as) | |
raise EnvironmentError(_exmsg) | |
sudo_path = self._whereis("sudo") | |
if sudo_path is None: | |
raise EnvironmentError(2, "sudo is not installed or " | |
"could not be found in system path: " | |
"cannot run nmap with sudo") | |
self.__sudo_run = "{0} -u {1}".format(sudo_path, sudo_user) | |
super(NmapProcess, self).start() | |
def run(self): | |
""" | |
Public method which is usually called right after the constructor | |
of NmapProcess. This method starts the nmap executable's subprocess. | |
It will also bind a Process that will read from subprocess' stdout | |
and stderr and push the lines read in a python queue for futher | |
processing. This processing is waken-up each time data is pushed | |
from the nmap binary into the stdout reading routine. Processing | |
could be performed by a user-provided callback. The whole | |
NmapProcess object could be accessible asynchroneously. | |
return: return code from nmap execution | |
""" | |
def ioreader_routine(proc_stdout, io_queue, data_pushed, producing): | |
""" | |
local function that will read lines from a file descriptor | |
and put the data in a python queue for futher processing. | |
:param proc_stdout: file descriptor to read lines from. | |
:param io_queue: queue in which read lines will be pushed. | |
:param data_pushed: queue used to push data read from the | |
nmap stdout back into the parent process | |
:param producing: shared variable to notify the parent process | |
that processing is either running, either over. | |
""" | |
producing.value = 1 | |
for streamline in iter(proc_stdout.readline, b''): | |
if streamline is not None: | |
io_queue.put(streamline) | |
data_pushed.set() | |
producing.value = 0 | |
data_pushed.set() | |
producing = multiprocessing.Value('i', 1) | |
data_pushed = multiprocessing.Event() | |
qout = multiprocessing.Queue() | |
try: | |
self.__nmap_proc = subprocess.Popen(args=self.cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
bufsize=0) | |
ioreader = multiprocessing.Process(target=ioreader_routine, | |
args=(self.proc.stdout, | |
qout, | |
data_pushed, | |
producing)) | |
ioreader.start() | |
self.__state = self.RUNNING | |
except OSError: | |
self.__state = self.FAILED | |
raise EnvironmentError(1, "nmap is not installed or could " | |
"not be found in system path") | |
while(self.proc.poll() is None or producing.value == 1): | |
if producing.value == 1: | |
data_pushed.wait() | |
try: | |
self.stdout += qout.get_nowait() | |
except: | |
pass | |
data_pushed.clear() | |
ioreader.join() | |
while not qout.empty(): | |
self.stdout += qout.get_nowait() | |
if __name__ == '__main__': | |
p = NmapProcess() | |
p.run() | |
print len(p.stdout) |
problem mentioned in comment fixed! time to branch the code.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What it does:
My problem:
Although it works quite good, when I run like 1000 times the code (for i in {1..1000} ; do python semafoire.py >> /tmp/abc ; done), I can see that although the results should always be the same, I sometimes (10 times out of 1000) a bunch of lost octets.