Last active
December 29, 2018 07:45
-
-
Save fbparis/c728bef85937792aa56da52701fc42d8 to your computer and use it in GitHub Desktop.
Client and Server using LoremVPN protocol
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import socket | |
import zlib | |
from loremvpn import LoremVPN | |
if __name__ == '__main__': | |
VPN = LoremVPN("demo", "demo") | |
HOST, PORT = "localhost", 9999 | |
data = " ".join(sys.argv[1:]).encode() | |
# Create a socket (SOCK_STREAM means a TCP socket) | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
# Connect to server and send data | |
sock.connect((HOST, PORT)) | |
sock.sendall(zlib.compress(VPN.Encrypt(data).encode())) | |
# Receive data from the server and shut down | |
received, userId = VPN.Decrypt(zlib.decompress(sock.recv(1024)).decode()) | |
print("Sent: {}".format(data)) | |
print("Received: {}".format(received)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socketserver | |
import zlib | |
import logging | |
from loremvpn import LoremVPN | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s %(name)s %(levelname)s %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
class LoremTCPHandler(socketserver.BaseRequestHandler): | |
""" | |
""" | |
def handle(self): | |
""" | |
""" | |
# self.request is the TCP socket connected to the client | |
data, userId = VPN.Decrypt(zlib.decompress(self.request.recv(1024)).decode()) | |
if data is None: | |
logger.info("closing connection with %s" % self.client_address[0]) | |
else: | |
logger.info("sending response to %s" % self.client_address[0]) | |
# just send back the same data | |
self.request.sendall(zlib.compress(VPN.Encrypt(data, userId=userId).encode())) | |
if __name__ == "__main__": | |
HOST, PORT = "localhost", 9999 | |
VPN = LoremVPN() | |
# Create the server, binding to localhost on port 9999 | |
with socketserver.TCPServer((HOST, PORT), LoremTCPHandler) as server: | |
# Activate the server; this will keep running until you | |
# interrupt the program with Ctrl-C | |
server.serve_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
import pickle | |
from random import randint | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import hashes | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s %(name)s %(levelname)s %(message)s' | |
) | |
_TABLES_FILENAME = "lorem-tables.pkl" | |
_USERS_FILENAME = "lorem-users.pkl" | |
class LoremVPN(object): | |
""" | |
Client/Server framework to implement Lorem Protocol | |
Messages from and to client / server must match the scheme: | |
[UserID (16 bytes)][IV (16 bytes)][AES encoded message with key=UserSecret and IV=IV] | |
Where IV = MD5(UserSecret + SHA256(UserSecret + message))[:8] + os.urandom(8) (first 8 bytes are also used to check message integrity) | |
Messages are then converted in LoremIpsum text and should be gzipped before transfers. | |
""" | |
def __init__(self, userId=None, userSecret=None): | |
""" | |
""" | |
# setup logger | |
self.logger = logging.getLogger(self.__class__.__name__) | |
# retrieve encoding and decoding tables | |
try: | |
self.encodingTable, self.decodingTable = pickle.load(open(_TABLES_FILENAME, "rb")) | |
except Exception as e: | |
self.logger.error(e) | |
self.ready = False | |
else: | |
self.ready = True | |
# retrieve valid users credentials or assign object to an unique user | |
if userId is None: | |
self.userId = None | |
try: | |
validUsers = pickle.load(open(_USERS_FILENAME, "rb")) | |
except Exception as e: | |
self.logger.warning(e) | |
validUsers = {"demo": "demo"} | |
finally: | |
self.validUsers = {} | |
for userId, userSecret in validUsers.items(): | |
userId, userSecret = self._normalize_credentials(userId, userSecret) | |
self.validUsers[userId] = userSecret | |
else: | |
self.userId, self.userSecret = self._normalize_credentials(userId, userSecret) | |
def _normalize_credentials(self, userId, userSecret): | |
""" | |
""" | |
md5 = hashes.Hash(hashes.MD5(), backend=default_backend()) | |
md5.update(userId.encode()) | |
sha256 = hashes.Hash(hashes.SHA256(), backend=default_backend()) | |
sha256.update(userSecret.encode()) | |
return md5.finalize(), sha256.finalize() | |
def Encode(self, data): | |
""" | |
""" | |
if not self.ready: | |
return None | |
try: | |
encoded = "" | |
k = next(iter(self.encodingTable)) | |
hexData = data.hex() | |
for c in [int(hexData[i:i+1], 16) for i in range(0, len(hexData), 1)]: | |
while c > 0: | |
if len(self.encodingTable[k]) == 1: | |
i = 0 | |
elif c >= len(self.encodingTable[k]): | |
i = randint(1, len(self.encodingTable[k]) - 1) | |
else: | |
i = c | |
c -= i | |
encoded += self.encodingTable[k][i] | |
k = k[1:] + tuple(encoded[-1]) | |
while 1: | |
encoded += self.encodingTable[k][0] | |
if len(self.encodingTable[k]) > 1: | |
break | |
k = k[1:] + tuple(encoded[-1]) | |
k = k[1:] + tuple(encoded[-1]) | |
return encoded | |
except Exception as e: | |
self.logger.error(e) | |
self.logger.warning("Data can not be encoded") | |
return None | |
def Decode(self, data): | |
""" | |
""" | |
if not self.ready: | |
return None | |
try: | |
decoded = [] | |
k = next(iter(self.encodingTable)) | |
n = 0 | |
for c in data: | |
i = self.decodingTable[k][c] | |
n += i | |
if i == 0 and len(self.decodingTable[k]) > 1: | |
decoded.append(n) | |
n = 0 | |
k = k[1:] + tuple(c) | |
return bytes.fromhex("".join([hex(x)[2:] for x in decoded])) | |
except Exception as e: | |
self.logger.error(e) | |
self.logger.warning("Data can not be decoded") | |
return None | |
def Encrypt(self, data, userId=None): | |
""" | |
""" | |
if not self.ready: | |
return None | |
if self.userId: | |
userId, userSecret = self.userId, self.userSecret | |
elif userId not in self.validUsers: | |
self.logger.warning("Unknown user, aborting encryption") | |
return None | |
else: | |
userSecret = self.validUsers[userId] | |
sha256 = hashes.Hash(hashes.SHA256(), backend=default_backend()) | |
md5 = hashes.Hash(hashes.MD5(), backend=default_backend()) | |
# first part of iv will also be used to check data integrity | |
sha256.update(userSecret + data) | |
iv = sha256.finalize() | |
# we need a 128 bits iv | |
md5.update(userSecret + iv) | |
iv = md5.finalize()[:8] + os.urandom(8) | |
# encrypt data with AES | |
encryptor = Cipher(algorithms.AES(userSecret), modes.CFB(iv), backend=default_backend()).encryptor() | |
return self.Encode(userId + iv + encryptor.update(data) + encryptor.finalize()) | |
def Decrypt(self, data): | |
""" | |
""" | |
if not self.ready: | |
return None, None | |
data = self.Decode(data) | |
try: | |
userId, iv, data = data[:16], data[16:32], data[32:] | |
except Exception as e: | |
self.logger.warning(e) | |
self.logger.warning("Invalid data format") | |
return None, None | |
if self.userId: | |
if userId != self.userId: | |
self.logger.error("Unauthorized userId, aborting decryption") | |
return None, None | |
userSecret = self.userSecret | |
elif userId not in self.validUsers: | |
self.logger.warning("Unknown user, aborting decryption") | |
return None, None | |
else: | |
userSecret = self.validUsers[userId] | |
# decrypt data with AES | |
decryptor = Cipher(algorithms.AES(userSecret), modes.CFB(iv), backend=default_backend()).decryptor() | |
data = decryptor.update(data) + decryptor.finalize() | |
# check integrity of the data | |
sha256 = hashes.Hash(hashes.SHA256(), backend=default_backend()) | |
md5 = hashes.Hash(hashes.MD5(), backend=default_backend()) | |
sha256.update(userSecret + data) | |
controle = sha256.finalize() | |
md5.update(userSecret + controle) | |
controle = md5.finalize() | |
if controle[:8] != iv[:8]: | |
self.logger.warning("Data integrity compromised, aborting decryption") | |
return None, None | |
return data, userId | |
if __name__ == '__main__': | |
vpn = LoremVPN("demo", "demo") | |
ct = vpn.Encrypt(b"test") | |
print(vpn.Decrypt(ct)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
use https://gist.github.com/fbparis/24c24c106c76862235b8230fd8c4ed28 to generate lorem-tables.pkl :)