-
-
Save parweb/5ae1d8cd139aa0ec28933f25072f19e6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
from datetime import datetime | |
def pack_payload(args, null=True): | |
payload = bytearray() | |
for arg in list(args): | |
if type(arg) == str: | |
if len(arg) > 1: | |
# When encoding a string we add a null terminator | |
if null: | |
payload += arg.encode('ascii') + 0x0.to_bytes(1, byteorder='big') | |
else: | |
payload += arg.encode('ascii') | |
else: | |
# When we encode a char, we only encode the char byte | |
payload += arg.encode('ascii') | |
elif type(arg) == int: | |
# All integer values are encoded as 32-bits (4 bytes) | |
payload += arg.to_bytes(4, byteorder='big') | |
return payload | |
def create_message_pack(key:str, payload, null=True): | |
payload = pack_payload(payload, null=null) | |
length = len(payload)+4 # We add four bytes for the len param itself | |
size = (length).to_bytes(4, byteorder='big') | |
key = key.encode('ascii') | |
return key + size + payload | |
def split_reply(data): | |
string = data.decode('ascii', errors='ignore') | |
values = string.split('\0') | |
values = [ v for v in values if v != '' ] | |
match values[0]: | |
case 'p': | |
values[0] = 'PASSWORD' | |
case 'P': | |
values[0] = 'FUNCTION CALL' | |
case 'Q': | |
values[0] = 'QUERY' | |
values[1] = values[1][1:] # Remove ampersand prefix | |
case 'X': | |
values[0] = 'TERMINATE' | |
case '': | |
values[0] = 'STARTUP' | |
return values | |
s = socket.create_server(('127.0.0.1', 54302), family=socket.AF_INET) | |
s.listen() | |
conn, details = s.accept() | |
ip, port = details | |
# The first request in a connection is either SSLRequest or StartupMessage | |
startup = conn.recv(1024*4096) | |
length = startup[:4] | |
value = startup[4:] | |
if value == (80877103).to_bytes(4, byteorder='big'): | |
print(f'[{ip}] [{datetime.now()}] Refusing SSLRequest') | |
# We have received an SSLRequest, refuse it | |
nossl = 'N'.encode('ascii') | |
conn.send(nossl) # Refuse SSL | |
else: | |
# We received a StartupMessage, we can ignore it | |
print(f'[{ip}] [{datetime.now()}] Ignoring StartupMessage') | |
pass | |
authpass = create_message_pack('R', [3,]) | |
authok = create_message_pack('R', [0,]) | |
backendkeydata = create_message_pack('K', [1,1]) | |
p1 = create_message_pack('S', ['server_version', '15.1']) | |
p2 = create_message_pack('S', ['server_encoding', 'utf-8']) | |
readyforquery = create_message_pack('Z', ['I']) | |
conn.send(authpass) # Request plaintext password | |
data = conn.recv(1024*1024) # Receive password | |
print(f'[{ip}] [{datetime.now()}]', split_reply(data)) | |
conn.send(authok) # Respond that the password is okay | |
conn.send(backendkeydata) # Send the PID and key | |
conn.send(p1) # Send our version | |
conn.send(p2) # Send our encoding | |
conn.send(readyforquery) # Tell the client it can send queries | |
while(True): # Interaction loop | |
data = conn.recv(1024*1024) | |
if len(data) == 0: | |
continue | |
reply = split_reply(data) | |
print(f'[{ip}] [{datetime.now()}]', reply) | |
if reply[0] == 'TERMINATE': | |
break # Close the connection when we recive a termination signal | |
conn.send(readyforquery) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment