Created
September 4, 2013 14:03
-
-
Save Eeko/6437376 to your computer and use it in GitHub Desktop.
A mock socket server for simulating vulnerabilities for remote exploits
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python -u | |
# -*- coding: utf-8 -*- | |
# A multi-threadable server program to provide mock-answers to socket queries. | |
# Usable for simulating server processes for various remote exploits | |
# by Eeko, 2013 | |
# inspired by http://stackoverflow.com/questions/15486064/simple-tcp-server-in-python | |
import socket | |
import SocketServer | |
import time | |
import threading | |
import re | |
import struct | |
class ReplyRequestHandler(SocketServer.BaseRequestHandler): | |
def setup(self): | |
self.port = self.request.getsockname()[1] | |
def handle(self): | |
reply = "Default Reply" # Set default reply (overridable) | |
data = "" | |
if self.port == 80: | |
replies = { # a dict for possible replies of an individual server thread | |
"GET" : "HTTP/1.1 200 OK", | |
"POST" : "HTTP/1.1 404 NOT FOUND" | |
} | |
elif self.port == 8080: | |
# Sample with a bit more constructed binary reply | |
answerstring = "foobar\x00barfoo\x00foobar\x00field3\x00Data".encode('utf-16') | |
omnireply1 = struct.pack('>L', len(answerstring)) + answerstring | |
replies = { | |
"Admin" : "1024", | |
"\x00A\x00A\x00e\x00e\x00k\x00o\x00-\x001\x003": omnireply1 | |
} | |
while not re.search(killcmd, data): # FIN gets sent when handle() is finished | |
data = self.request.recv(2048) | |
print 'C:%s> %s' % (str(self.request.getpeername()[1]).ljust(6), repr(data)) | |
for key, value in replies.iteritems(): # iter on both keys and values | |
if re.search(key, data): | |
reply = value | |
self.request.send(reply) | |
print 'S:%s> %s' % (str(self.request.getsockname()[1]).ljust(6), repr(reply)) | |
return | |
class ServerThreads(threading.Thread): | |
def __init__(self,port): | |
self.server = None | |
self.port = port | |
threading.Thread.__init__(self) | |
def run(self): | |
if self.server == None: | |
address = ('', self.port) | |
self.server = SocketServer.TCPServer(address, ReplyRequestHandler) | |
self.server.serve_forever() | |
if __name__ == '__main__': | |
thr = ServerThreads(8080) | |
thr.setDaemon(True) | |
thr.start() | |
print 'Port 8080 listening...' | |
thr2 = ServerThreads(80) | |
thr2.setDaemon(True) | |
thr2.start() | |
print 'Port 80 listening...' | |
time.sleep(1) | |
raw_input("Press Enter to stop...\n--\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment