Skip to content

Instantly share code, notes, and snippets.

@dclobato
Created December 1, 2023 19:36
Show Gist options
  • Save dclobato/b8c948d82a1ade57e5ea1c531e9643f3 to your computer and use it in GitHub Desktop.
Save dclobato/b8c948d82a1ade57e5ea1c531e9643f3 to your computer and use it in GitHub Desktop.
import socket
import mensagens
HOST, PORT = input().split()
PORT = int(PORT)
with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s):
s.connect((HOST, PORT))
mensagem = mensagens.Mensagem(mensagens.CODIFICAR, payload=["3", "Ola, velhinho"])
s.sendall(mensagem.dumps().encode())
entrada = s.recv(1024).decode()
resposta = mensagens.Mensagem()
resposta.loads(entrada)
if resposta.msgid == mensagem.msgid:
if resposta.opcode == mensagens.ERRO:
print("Erro!")
elif resposta.opcode == mensagens.CODIFICADO:
print(f"Recebido [{resposta.payload}]")
else:
print("nao tratado")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
mensagem = mensagens.Mensagem(mensagens.DECODIFICAR, payload=["3", "Rod, yhoklqkr"])
s.sendall(mensagem.dumps().encode())
entrada = s.recv(1024).decode()
resposta = mensagens.Mensagem()
resposta.loads(entrada)
if resposta.msgid == mensagem.msgid:
if resposta.opcode == mensagens.ERRO:
print("Erro!")
elif resposta.opcode == mensagens.DECODIFICADO:
print(f"Recebido [{resposta.payload}]")
else:
print("nao tratado")
import socket
import threading
import mensagens
def trabalho(host, porta, opcode, payload):
with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s):
s.connect((host, porta))
mensagem = mensagens.Mensagem(opcode, payload=payload)
s.sendall(mensagem.dumps().encode())
entrada = s.recv(1024).decode()
resposta = mensagens.Mensagem()
resposta.loads(entrada)
print(f"Recebido [{resposta.payload}] na thread {threading.current_thread().ident}")
HOST, PORT = input().split()
PORT = int(PORT)
acoes = [(mensagens.CODIFICAR,['3', 'Ola, velhinho']),
(mensagens.CODIFICAR,['4', 'Ola, velhinho']),
(mensagens.CODIFICAR,['5', 'Ola, velhinho']),
(mensagens.CODIFICAR,['6', 'Ola, velhinho']),
(mensagens.CODIFICAR,['7', 'Ola, velhinho']),
(mensagens.CODIFICAR,['8', 'Ola, velhinho']),
(mensagens.DECODIFICAR,['3', 'Rod, yhoklqkr'])]
threads = []
for acao in acoes:
threads.append(threading.Thread(target=trabalho,
args=(HOST, PORT, acao[0], acao[1])))
for t in threads:
t.start()
for t in threads:
t.join()
import binascii
import json
import uuid
from typing import List
CODIFICAR = 0
CODIFICADO = 1
DECODIFICAR = 2
DECODIFICADO = 3
ERRO = 99
class Mensagem:
"""
Uma classe para as mensagens da aplicação
"""
def __init__(self,
opcode: int = None,
msgid: str = str(uuid.uuid4()),
payload=None):
self._opcode = opcode
self._msgid = msgid
self._payload = payload
self._checksum = self._calculatechecksum()
@property
def opcode(self):
return self._opcode
@opcode.setter
def opcode(self, opcode):
self._opcode = opcode
self._checksum = self._calculatechecksum()
@property
def msgid(self):
return self._msgid
@property
def payload(self):
return self._payload
@property
def checksum(self):
return self._checksum
@payload.setter
def payload(self, payload: List[str]):
self._payload = payload
self._checksum = self._calculatechecksum()
def loads(self, texto: str):
t = json.loads(texto)
self._opcode = t["opcode"]
self._msgid = t["msgid"]
self._payload = t["payload"]
self._checksum = t["checksum"]
if not self.is_valid():
raise ValueError(f"Checksum esperado {self._calculatechecksum()}."
f" Checksum recebido {self._checksum}")
def dumps(self) -> str:
mensagem = {
"opcode": self.opcode,
"msgid": self.msgid,
"payload": self.payload,
"checksum": self.checksum
}
saida = json.dumps(mensagem, sort_keys=True)
return saida
def _calculatechecksum(self):
mensagem = {
"opcode": self.opcode,
"msgid": self.msgid,
"payload": self.payload
}
return hex(binascii.crc32(json.dumps(mensagem, sort_keys=True).encode()))
def is_valid(self):
return self.checksum == self._calculatechecksum()
import socket
import threading
import mensagens
def codifica(texto: str, passo: int = 3) -> str:
retorno = ""
for letra in texto:
if letra.isupper():
retorno += chr(((ord(letra) - 65 + passo) % 26) + 65)
elif letra.islower():
retorno += chr(((ord(letra) - 97 + passo) % 26) + 97)
else:
retorno += letra
return retorno
def decodifica(texto: str, passo: int = 3) -> str:
retorno = ""
for letra in texto:
if letra.isupper():
retorno += chr(((ord(letra) - 65 - passo) % 26) + 65)
elif letra.islower():
retorno += chr(((ord(letra) - 97 - passo) % 26) + 97)
else:
retorno += letra
return retorno
def trata_cliente(socket_do_cliente: socket.socket, addr):
try:
entrada = socket_do_cliente.recv(1024).decode()
mensagem = mensagens.Mensagem()
mensagem.loads(entrada)
resposta = mensagens.Mensagem(msgid=mensagem.msgid)
if mensagem.opcode == mensagens.CODIFICAR:
resposta.opcode = mensagens.CODIFICADO
resposta.payload = codifica(mensagem.payload[1], int(mensagem.payload[0]))
elif mensagem.opcode == mensagens.DECODIFICAR:
resposta.opcode = mensagens.DECODIFICADO
resposta.payload = decodifica(mensagem.payload[1], int(mensagem.payload[0]))
else:
resposta.opcode = mensagens.ERRO
resposta.payload = None
socket_do_cliente.sendall(resposta.dumps().encode())
except Exception as e:
print(f"Erro no atendimento ao cliente: {e}")
finally:
socket_do_cliente.close()
print(f"Encerrando conexao com o cliente {addr[0]}:{addr[1]}")
return
def executa_servidor(endereco: str = "0.0.0.0",
porta: int = 0):
print(f"Iniciando o servidor em {endereco}:{porta}...")
servidor = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
threads = []
try:
servidor.bind((endereco, porta))
servidor.listen()
endereco, porta = servidor.getsockname()
while True:
print(f"Aguardando conexao em {endereco}:{porta}...")
socket_do_cliente, addr = servidor.accept()
print(f"Nova conexao originada em {addr[0]}:{addr[1]}")
thread_cliente = threading.Thread(target=trata_cliente,
args=(socket_do_cliente, addr))
threads.append(thread_cliente)
thread_cliente.start()
except Exception as e:
print(f"Erro na execucao do servidor: {e}")
except KeyboardInterrupt:
print("Finalizando o servidor...")
finally:
for thread in threads:
thread.join()
servidor.close()
executa_servidor()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment