Skip to content

Instantly share code, notes, and snippets.

@savon-noir
Last active August 29, 2015 13:58
Show Gist options
  • Save savon-noir/483da3fcfe1b5cce3994 to your computer and use it in GitHub Desktop.
Save savon-noir/483da3fcfe1b5cce3994 to your computer and use it in GitHub Desktop.
python-libnmap: poc new process.py
#!/usr/bin/env python
import multiprocessing
from threading import Thread
import subprocess
import sys
class NmapProcess(Thread):
"""
NmapProcess is a class which wraps around the nmap executable.
Consequently, in order to run an NmapProcess, nmap should be installed
on the host running the script. By default NmapProcess will produce
the output of the nmap scan in the nmap XML format. This could be then
parsed out via the NmapParser class from libnmap.parser module.
"""
def __init__(self, targets="127.0.0.1",
options="-sT", event_callback=None, safe_mode=True):
"""
Constructor of NmapProcess class.
:param targets: hosts to be scanned. Could be a string of hosts
separated with a coma or a python list of hosts/ip.
:type targets: string or list
:param options: list of nmap options to be applied to scan.
These options are all documented in nmap's man pages.
:param event_callback: callable function which will be ran
each time nmap process outputs data. This function will receive
two parameters:
1. the nmap process object
2. the data produced by nmap process. See readme for examples.
:param safe_mode: parameter to protect unsafe options like -oN, -oG,
-iL, -oA,...
:return: NmapProcess object
"""
Thread.__init__(self)
unsafe_opts = set(['-oG', '-oN', '-iL', '-oA', '-oS', '-oX',
'--iflist', '--resume', '--stylesheet',
'--datadir'])
nmap_binary_name = "nmap"
self.__nmap_fixed_options = "-oX - -vvv --stats-every 2s"
self.__nmap_binary = self._whereis(nmap_binary_name)
if self.__nmap_binary is None:
raise EnvironmentError(1, "nmap is not installed or could "
"not be found in system path")
if isinstance(targets, str):
self.__nmap_targets = targets.replace(" ", "").split(',')
elif isinstance(targets, list):
self.__nmap_targets = targets
else:
raise Exception("Supplied target list should be either a "
"string or a list")
self._nmap_options = set(options.split())
if safe_mode and not self._nmap_options.isdisjoint(unsafe_opts):
raise Exception("unsafe options activated while safe_mode "
"is set True")
self.__nmap_dynamic_options = options
self.__nmap_command_line = self.get_command_line()
if event_callback and callable(event_callback):
self.__nmap_event_callback = event_callback
else:
self.__nmap_event_callback = None
(self.DONE, self.READY, self.RUNNING,
self.CANCELLED, self.FAILED) = range(5)
# API usable in callback function
self.__nmap_proc = None
self.__nmap_rc = 0
self.__sudo_run = ''
self.__state = self.READY
self.__starttime = 0
self.__endtime = 0
self.__version = ''
self.__progress = 0
self.__etc = 0
self.__elapsed = ''
self.__summary = ''
self.__stdout = ''
self.__stderr = ''
def _whereis(self, program):
"""
Protected method enabling the object to find the full path of a binary
from its PATH environment variable.
:param program: name of a binary for which the full path needs to
be discovered.
:return: the full path to the binary.
:todo: add a default path list in case PATH is empty.
"""
for path in os.environ.get('PATH', '').split(':'):
if (os.path.exists(os.path.join(path, program)) and not
os.path.isdir(os.path.join(path, program))):
return os.path.join(path, program)
return None
def get_command_line(self):
"""
Public method returning the reconstructed command line ran via the lib
:return: the full nmap command line to run
:rtype: string
"""
return ("{0} {1} {2} {3} {4}".format(self.__sudo_run,
self.__nmap_binary,
self.__nmap_fixed_options,
self.__nmap_dynamic_options,
" ".join(self.__nmap_targets)))
def sudo_run(self, run_as='root'):
"""
Public method enabling the library's user to run the scan with
priviledges via sudo. The sudo configuration should be set manually
on the local system otherwise sudo will prompt for a password.
This method alters the command line by prefixing the sudo command to
nmap and will then call self.run()
:param run_as: user name to which the lib needs to sudo to run the scan
:return: return code from nmap execution
"""
sudo_user = run_as.split().pop()
try:
pwd.getpwnam(sudo_user).pw_uid
except KeyError:
_exmsg = "Username {0} does not exists. Please supply"
" a valid username".format(run_as)
raise EnvironmentError(_exmsg)
sudo_path = self._whereis("sudo")
if sudo_path is None:
raise EnvironmentError(2, "sudo is not installed or "
"could not be found in system path: "
"cannot run nmap with sudo")
self.__sudo_run = "{0} -u {1}".format(sudo_path, sudo_user)
rc = self.run()
self.__sudo_run = ""
return rc
def sudo_run_background(self, run_as='root'):
"""
Public method enabling the library's user to run in background a
nmap scan with priviledges via sudo.
The sudo configuration should be set manually on the local system
otherwise sudo will prompt for a password.
This method alters the command line by prefixing the sudo command to
nmap and will then call self.run()
:param run_as: user name to which the lib needs to sudo to run the scan
:return: return code from nmap execution
"""
sudo_user = run_as.split().pop()
try:
pwd.getpwnam(sudo_user).pw_uid
except KeyError:
_exmsg = "Username {0} does not exists. Please supply"
" a valid username".format(run_as)
raise EnvironmentError(_exmsg)
sudo_path = self._whereis("sudo")
if sudo_path is None:
raise EnvironmentError(2, "sudo is not installed or "
"could not be found in system path: "
"cannot run nmap with sudo")
self.__sudo_run = "{0} -u {1}".format(sudo_path, sudo_user)
super(NmapProcess, self).start()
def run(self):
"""
Public method which is usually called right after the constructor
of NmapProcess. This method starts the nmap executable's subprocess.
It will also bind a Process that will read from subprocess' stdout
and stderr and push the lines read in a python queue for futher
processing. This processing is waken-up each time data is pushed
from the nmap binary into the stdout reading routine. Processing
could be performed by a user-provided callback. The whole
NmapProcess object could be accessible asynchroneously.
return: return code from nmap execution
"""
def ioreader_routine(proc_stdout, io_queue, data_pushed, producing):
"""
local function that will read lines from a file descriptor
and put the data in a python queue for futher processing.
:param proc_stdout: file descriptor to read lines from.
:param io_queue: queue in which read lines will be pushed.
:param data_pushed: queue used to push data read from the
nmap stdout back into the parent process
:param producing: shared variable to notify the parent process
that processing is either running, either over.
"""
producing.value = 1
for streamline in iter(proc_stdout.readline, b''):
if streamline is not None:
io_queue.put(streamline)
data_pushed.set()
producing.value = 0
data_pushed.set()
producing = multiprocessing.Value('i', 1)
data_pushed = multiprocessing.Event()
qout = multiprocessing.Queue()
try:
self.__nmap_proc = subprocess.Popen(args=self.cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0)
ioreader = multiprocessing.Process(target=ioreader_routine,
args=(self.proc.stdout,
qout,
data_pushed,
producing))
ioreader.start()
self.__state = self.RUNNING
except OSError:
self.__state = self.FAILED
raise EnvironmentError(1, "nmap is not installed or could "
"not be found in system path")
while(self.proc.poll() is None or producing.value == 1):
if producing.value == 1:
data_pushed.wait()
try:
self.stdout += qout.get_nowait()
except:
pass
data_pushed.clear()
ioreader.join()
while not qout.empty():
self.stdout += qout.get_nowait()
if __name__ == '__main__':
p = NmapProcess()
p.run()
print len(p.stdout)
@savon-noir
Copy link
Author

What it does:

  • runs a subprocess (nmap)
  • start a multiprocess.Process which will read from the subprocess stdout file descriptor until EOF and send read data to a queue
  • start a loop that will event.wait() until data is sent in the queue and process the data
  • when EOF is reached and subprocess finished, collected data is used (not in the poc yet)

My problem:
Although it works quite good, when I run like 1000 times the code (for i in {1..1000} ; do python semafoire.py >> /tmp/abc ; done), I can see that although the results should always be the same, I sometimes (10 times out of 1000) a bunch of lost octets.

@savon-noir
Copy link
Author

problem mentioned in comment fixed! time to branch the code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment