Skip to content

Instantly share code, notes, and snippets.

@fbparis
Last active December 29, 2018 07:45
Show Gist options
  • Save fbparis/c728bef85937792aa56da52701fc42d8 to your computer and use it in GitHub Desktop.
Save fbparis/c728bef85937792aa56da52701fc42d8 to your computer and use it in GitHub Desktop.
Client and Server using LoremVPN protocol
import sys
import socket
import zlib
from loremvpn import LoremVPN
if __name__ == '__main__':
VPN = LoremVPN("demo", "demo")
HOST, PORT = "localhost", 9999
data = " ".join(sys.argv[1:]).encode()
# Create a socket (SOCK_STREAM means a TCP socket)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Connect to server and send data
sock.connect((HOST, PORT))
sock.sendall(zlib.compress(VPN.Encrypt(data).encode()))
# Receive data from the server and shut down
received, userId = VPN.Decrypt(zlib.decompress(sock.recv(1024)).decode())
print("Sent: {}".format(data))
print("Received: {}".format(received))
import socketserver
import zlib
import logging
from loremvpn import LoremVPN
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)
logger = logging.getLogger(__name__)
class LoremTCPHandler(socketserver.BaseRequestHandler):
"""
"""
def handle(self):
"""
"""
# self.request is the TCP socket connected to the client
data, userId = VPN.Decrypt(zlib.decompress(self.request.recv(1024)).decode())
if data is None:
logger.info("closing connection with %s" % self.client_address[0])
else:
logger.info("sending response to %s" % self.client_address[0])
# just send back the same data
self.request.sendall(zlib.compress(VPN.Encrypt(data, userId=userId).encode()))
if __name__ == "__main__":
HOST, PORT = "localhost", 9999
VPN = LoremVPN()
# Create the server, binding to localhost on port 9999
with socketserver.TCPServer((HOST, PORT), LoremTCPHandler) as server:
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
import os
import logging
import pickle
from random import randint
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)
_TABLES_FILENAME = "lorem-tables.pkl"
_USERS_FILENAME = "lorem-users.pkl"
class LoremVPN(object):
"""
Client/Server framework to implement Lorem Protocol
Messages from and to client / server must match the scheme:
[UserID (16 bytes)][IV (16 bytes)][AES encoded message with key=UserSecret and IV=IV]
Where IV = MD5(UserSecret + SHA256(UserSecret + message))[:8] + os.urandom(8) (first 8 bytes are also used to check message integrity)
Messages are then converted in LoremIpsum text and should be gzipped before transfers.
"""
def __init__(self, userId=None, userSecret=None):
"""
"""
# setup logger
self.logger = logging.getLogger(self.__class__.__name__)
# retrieve encoding and decoding tables
try:
self.encodingTable, self.decodingTable = pickle.load(open(_TABLES_FILENAME, "rb"))
except Exception as e:
self.logger.error(e)
self.ready = False
else:
self.ready = True
# retrieve valid users credentials or assign object to an unique user
if userId is None:
self.userId = None
try:
validUsers = pickle.load(open(_USERS_FILENAME, "rb"))
except Exception as e:
self.logger.warning(e)
validUsers = {"demo": "demo"}
finally:
self.validUsers = {}
for userId, userSecret in validUsers.items():
userId, userSecret = self._normalize_credentials(userId, userSecret)
self.validUsers[userId] = userSecret
else:
self.userId, self.userSecret = self._normalize_credentials(userId, userSecret)
def _normalize_credentials(self, userId, userSecret):
"""
"""
md5 = hashes.Hash(hashes.MD5(), backend=default_backend())
md5.update(userId.encode())
sha256 = hashes.Hash(hashes.SHA256(), backend=default_backend())
sha256.update(userSecret.encode())
return md5.finalize(), sha256.finalize()
def Encode(self, data):
"""
"""
if not self.ready:
return None
try:
encoded = ""
k = next(iter(self.encodingTable))
hexData = data.hex()
for c in [int(hexData[i:i+1], 16) for i in range(0, len(hexData), 1)]:
while c > 0:
if len(self.encodingTable[k]) == 1:
i = 0
elif c >= len(self.encodingTable[k]):
i = randint(1, len(self.encodingTable[k]) - 1)
else:
i = c
c -= i
encoded += self.encodingTable[k][i]
k = k[1:] + tuple(encoded[-1])
while 1:
encoded += self.encodingTable[k][0]
if len(self.encodingTable[k]) > 1:
break
k = k[1:] + tuple(encoded[-1])
k = k[1:] + tuple(encoded[-1])
return encoded
except Exception as e:
self.logger.error(e)
self.logger.warning("Data can not be encoded")
return None
def Decode(self, data):
"""
"""
if not self.ready:
return None
try:
decoded = []
k = next(iter(self.encodingTable))
n = 0
for c in data:
i = self.decodingTable[k][c]
n += i
if i == 0 and len(self.decodingTable[k]) > 1:
decoded.append(n)
n = 0
k = k[1:] + tuple(c)
return bytes.fromhex("".join([hex(x)[2:] for x in decoded]))
except Exception as e:
self.logger.error(e)
self.logger.warning("Data can not be decoded")
return None
def Encrypt(self, data, userId=None):
"""
"""
if not self.ready:
return None
if self.userId:
userId, userSecret = self.userId, self.userSecret
elif userId not in self.validUsers:
self.logger.warning("Unknown user, aborting encryption")
return None
else:
userSecret = self.validUsers[userId]
sha256 = hashes.Hash(hashes.SHA256(), backend=default_backend())
md5 = hashes.Hash(hashes.MD5(), backend=default_backend())
# first part of iv will also be used to check data integrity
sha256.update(userSecret + data)
iv = sha256.finalize()
# we need a 128 bits iv
md5.update(userSecret + iv)
iv = md5.finalize()[:8] + os.urandom(8)
# encrypt data with AES
encryptor = Cipher(algorithms.AES(userSecret), modes.CFB(iv), backend=default_backend()).encryptor()
return self.Encode(userId + iv + encryptor.update(data) + encryptor.finalize())
def Decrypt(self, data):
"""
"""
if not self.ready:
return None, None
data = self.Decode(data)
try:
userId, iv, data = data[:16], data[16:32], data[32:]
except Exception as e:
self.logger.warning(e)
self.logger.warning("Invalid data format")
return None, None
if self.userId:
if userId != self.userId:
self.logger.error("Unauthorized userId, aborting decryption")
return None, None
userSecret = self.userSecret
elif userId not in self.validUsers:
self.logger.warning("Unknown user, aborting decryption")
return None, None
else:
userSecret = self.validUsers[userId]
# decrypt data with AES
decryptor = Cipher(algorithms.AES(userSecret), modes.CFB(iv), backend=default_backend()).decryptor()
data = decryptor.update(data) + decryptor.finalize()
# check integrity of the data
sha256 = hashes.Hash(hashes.SHA256(), backend=default_backend())
md5 = hashes.Hash(hashes.MD5(), backend=default_backend())
sha256.update(userSecret + data)
controle = sha256.finalize()
md5.update(userSecret + controle)
controle = md5.finalize()
if controle[:8] != iv[:8]:
self.logger.warning("Data integrity compromised, aborting decryption")
return None, None
return data, userId
if __name__ == '__main__':
vpn = LoremVPN("demo", "demo")
ct = vpn.Encrypt(b"test")
print(vpn.Decrypt(ct))
@fbparis
Copy link
Author

fbparis commented Dec 29, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment