Skip to content

Instantly share code, notes, and snippets.

@ProgramCrafter
Created May 7, 2022 10:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ProgramCrafter/3b6a2faa6f4da3f74b0ca52b5aee4dc5 to your computer and use it in GitHub Desktop.
Save ProgramCrafter/3b6a2faa6f4da3f74b0ca52b5aee4dc5 to your computer and use it in GitHub Desktop.
import traceback
import socket
import time
sock = socket.socket()
def safe_send(s):
if isinstance(s, int): s = chr(s).encode('ascii')
if isinstance(s, str): s = s.encode('utf-8')
i = 0
while i < len(s): i += sock.send(s[i:])
def safe_recv(n):
data = b''
while len(data) < n: data += sock.recv(n - len(data))
return data
def send_package(_type, _id, _message=None):
pack = chr(_type).encode('ascii')
if _type == 3 or _type == 4:
pack += _id
else:
pack += chr(len(_id)).encode('ascii')
pack += _id.encode('utf-8')
if _message: pack += _message.encode('utf-8')
safe_send(len(pack) // 256)
safe_send(len(pack) % 256)
safe_send(pack)
def subscribe(channel):
send_package(1, channel)
def unsubscribe(channel):
send_package(2, channel)
def send(channel, msg):
send_package(0, channel, msg)
sock.connect(('stem.fomalhaut.me', 5733))
subscribe('thundernet')
try:
while True:
pack_len = safe_recv(2)
pack_type = safe_recv(1)
if pack_type == b'\0': # incoming msg
ch_len = safe_recv(1)[0]
ch_id = safe_recv(ch_len)
msg_len = pack_len[0] * 256 + pack_len[1]
msg_len -= ch_len + 2
msg = safe_recv(msg_len)
print(time.strftime('%d.%m.%Y %H:%M:%S | ') + '\7' * 5, msg)
elif pack_type == b'\3': # pinging
msg_len = pack_len[0] * 256 + pack_len[1] - 1
send_package(4, sock.recv(msg_len))
except:
traceback.print_exc()
input()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment