Skip to content

Instantly share code, notes, and snippets.

@jroyalty
Created December 28, 2016 20:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jroyalty/67aeaaa5162a074a20e4f1e248def5a0 to your computer and use it in GitHub Desktop.
Save jroyalty/67aeaaa5162a074a20e4f1e248def5a0 to your computer and use it in GitHub Desktop.
Twisted listener for NFQ
import logging
import socket
from twisted.internet import abstract, address, interfaces, defer
from zope.interface import implementer
try:
import netfilterqueue
except:
netfilterqueue = None
LOG = logging.getLogger(__name__)
@implementer(interfaces.IListeningPort)
class tx_nfqueue_listener(object):
"""
Listen to Netfilter's QUEUE target and invoke a user-supplied callback.
:param int queue_num: The NFQUEUE number to bind to.
:param (netfilterqueue.Packet) callback_func:
"""
def __init__(self, queue_num, callback_func):
self._queue_num = queue_num
self._callback_func = callback_func
self.__nfq = None
self.__sock = None
self.__reader = None
self.__hostaddr = address._ProcessAddress()
def startListening(self):
if netfilterqueue is None:
LOG.error("NetfilterQueue module is not available! Aborting listener.")
self.__reader = None
return
assert self.__reader is None, "Already listening on NFQUEUE id = %d" % self._queue_num
# When running inside Docker this requires the NET_ADMIN capability.
self.__nfq = netfilterqueue.NetfilterQueue()
self.__nfq.bind(self._queue_num, self._callback_func)
self.__sock = socket.fromfd(self.__nfq.get_fd(), socket.AF_UNIX, socket.SOCK_RAW)
self.__sock.setblocking(False)
self.__reader = abstract.FileDescriptor()
self.__reader.doRead = self.__doRead
self.__reader.fileno = lambda: self.__sock
self.__reader.startReading()
def __doRead(self):
try:
self.__nfq.run_socket(self.__sock)
except:
LOG.exception("Error reading from NFQUEUE socket")
def stopListening(self):
if self.__reader is not None:
self.__reader.stopReading()
self.__sock.close()
self.__nfq.unbind()
return defer.succeed(True)
def getHost(self):
return self.__hostaddr
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment