Skip to content

Instantly share code, notes, and snippets.

@parweb

parweb/main.py Secret

Created March 21, 2024 00:00
Show Gist options
  • Save parweb/5ae1d8cd139aa0ec28933f25072f19e6 to your computer and use it in GitHub Desktop.
Save parweb/5ae1d8cd139aa0ec28933f25072f19e6 to your computer and use it in GitHub Desktop.
import socket
from datetime import datetime
def pack_payload(args, null=True):
payload = bytearray()
for arg in list(args):
if type(arg) == str:
if len(arg) > 1:
# When encoding a string we add a null terminator
if null:
payload += arg.encode('ascii') + 0x0.to_bytes(1, byteorder='big')
else:
payload += arg.encode('ascii')
else:
# When we encode a char, we only encode the char byte
payload += arg.encode('ascii')
elif type(arg) == int:
# All integer values are encoded as 32-bits (4 bytes)
payload += arg.to_bytes(4, byteorder='big')
return payload
def create_message_pack(key:str, payload, null=True):
payload = pack_payload(payload, null=null)
length = len(payload)+4 # We add four bytes for the len param itself
size = (length).to_bytes(4, byteorder='big')
key = key.encode('ascii')
return key + size + payload
def split_reply(data):
string = data.decode('ascii', errors='ignore')
values = string.split('\0')
values = [ v for v in values if v != '' ]
match values[0]:
case 'p':
values[0] = 'PASSWORD'
case 'P':
values[0] = 'FUNCTION CALL'
case 'Q':
values[0] = 'QUERY'
values[1] = values[1][1:] # Remove ampersand prefix
case 'X':
values[0] = 'TERMINATE'
case '':
values[0] = 'STARTUP'
return values
s = socket.create_server(('127.0.0.1', 54302), family=socket.AF_INET)
s.listen()
conn, details = s.accept()
ip, port = details
# The first request in a connection is either SSLRequest or StartupMessage
startup = conn.recv(1024*4096)
length = startup[:4]
value = startup[4:]
if value == (80877103).to_bytes(4, byteorder='big'):
print(f'[{ip}] [{datetime.now()}] Refusing SSLRequest')
# We have received an SSLRequest, refuse it
nossl = 'N'.encode('ascii')
conn.send(nossl) # Refuse SSL
else:
# We received a StartupMessage, we can ignore it
print(f'[{ip}] [{datetime.now()}] Ignoring StartupMessage')
pass
authpass = create_message_pack('R', [3,])
authok = create_message_pack('R', [0,])
backendkeydata = create_message_pack('K', [1,1])
p1 = create_message_pack('S', ['server_version', '15.1'])
p2 = create_message_pack('S', ['server_encoding', 'utf-8'])
readyforquery = create_message_pack('Z', ['I'])
conn.send(authpass) # Request plaintext password
data = conn.recv(1024*1024) # Receive password
print(f'[{ip}] [{datetime.now()}]', split_reply(data))
conn.send(authok) # Respond that the password is okay
conn.send(backendkeydata) # Send the PID and key
conn.send(p1) # Send our version
conn.send(p2) # Send our encoding
conn.send(readyforquery) # Tell the client it can send queries
while(True): # Interaction loop
data = conn.recv(1024*1024)
if len(data) == 0:
continue
reply = split_reply(data)
print(f'[{ip}] [{datetime.now()}]', reply)
if reply[0] == 'TERMINATE':
break # Close the connection when we recive a termination signal
conn.send(readyforquery)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment