Created
May 24, 2024 14:25
-
-
Save jul/2a336fa577233b99626f53839e9bc035 to your computer and use it in GitHub Desktop.
pubsub library for python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S python3 -u | |
I_PREFER_MULTI_THREADING=1 | |
if I_PREFER_MULTI_THREADING: | |
from threading import Thread, Lock, Event | |
from queue import Queue | |
else: | |
from multiprocessing import Process as Thread, Lock, Event, Queue | |
from time import sleep | |
from subprocess import Popen, PIPE | |
SEE_DEBUG=True | |
trigger = Event() | |
pager = Event() | |
def process_out(q,qp,fd, pager): | |
anchor_seen=False | |
while True: | |
with Lock(): | |
line=fd.readline().rstrip("\n") | |
if line.strip("\r") and anchor_seen \ | |
and (SEE_DEBUG or not line.startswith("#")): | |
q.put([line]) | |
qp.put([line]) | |
anchor_seen|= anchor_seen or line == "BEGINNING!" | |
if callback: | |
pager.set() | |
sleep(.001) | |
def process_in(q,fd, trigger): | |
while True: | |
trigger.wait() | |
line=q.get() | |
fd.write(line+"\n") | |
fd.flush() | |
qin=Queue() | |
qout=Queue() | |
qcallback=Queue() | |
def callback(qout, pager): | |
while True: | |
pager.wait() | |
while not qout.empty(): | |
line = qout.get() | |
for l in line: | |
if "general" not in l: | |
print("*" + l) | |
def pubsub(qin=qin, qout=qout,host="pubsub.home.", port=2023, user="pubsub", | |
pwd="pubsub", client_cert="./client.pem", server_ca="./server.crt", | |
callback=None, pager=None): | |
p = Popen( | |
f"socat - OPENSSL-CONNECT:{host}:{port},cert={client_cert},cafile={server_ca}".split(" "), | |
stdout=PIPE, stdin=PIPE,encoding='utf-8') | |
if callback: | |
assert pager | |
callb = Thread(target=callback, args=(qcallback, pager)) | |
callb.start() | |
reader=Thread(target=process_out, args=(qout, qcallback,p.stdout, pager)) | |
writer=Thread(target=process_in, args=(qin, p.stdin,trigger)) | |
reader.start() | |
writer.start() | |
sleep(2) | |
p.stdin.write(user + "\n") | |
p.stdin.flush() | |
sleep(2) | |
p.stdin.write(pwd + "\n") | |
p.stdin.flush() | |
def send(line): | |
with Lock(): | |
qin.put(line) | |
trigger.set() | |
def collect(): | |
res= [] | |
while not qout.empty(): | |
res+=qout.get() | |
return res | |
pubsub( | |
qin, qout,"pubsub.home.", 2023, "pubsub", "pubsub","./client.pem", | |
"./server.crt",callback, pager | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment