Skip to content

Instantly share code, notes, and snippets.

@aleroddepaz
Created August 26, 2021 19:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aleroddepaz/47c0abb9a770070e8562f9182345393c to your computer and use it in GitHub Desktop.
Save aleroddepaz/47c0abb9a770070e8562f9182345393c to your computer and use it in GitHub Desktop.
PyHeartBeat example
import sys
import time
import socket
import asyncio
DEFAULT_IP = '127.0.0.1' # local host, just for testing
DEFAULT_PORT = 43278 # an arbitrary UDP port
BEATWAIT = 1 # number of seconds between heartbeats
async def main():
ip = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_IP
port = sys.argv[2] if len(sys.argv) > 2 else DEFAULT_PORT
print(f'PyHeartBeat client sending to IP {ip}, port {port}\n')
print('*** Press Ctrl-C to terminate ***\n')
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
s.sendto(b'Thump!', (ip, port))
print('Time:', time.ctime(time.time()))
await asyncio.sleep(BEATWAIT)
if __name__ == '__main__':
try: asyncio.run(main())
except KeyboardInterrupt: pass
import sys
import time
import socket
import threading
DEFAULT_PORT = 43278 # an arbitrary UDP port
DEFAULT_WAIT = 10 # number of seconds between heartbeats
class BeatReceiver(threading.Thread):
"Receive UDP packets, log them in heartbeat dictionary"
def __init__(self, port, expiry):
super().__init__()
self._expiry = expiry
self._dict = dict()
self._lock = threading.Lock()
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.bind(('', port))
def __repr__(self):
when = time.time() - self._expiry
msg = 'IP address: {} - Last time: {}'
with self._lock:
items = filter(lambda x: x[1] >= when, self._dict.items())
return '\n'.join(msg.format(k, time.ctime(v)) for k, v in items)
def run(self):
while True:
try:
_, addr = self._socket.recvfrom(6)
except:
return
with self._lock:
self._dict[addr[0]] = time.time()
def stop(self):
self._socket.shutdown(socket.SHUT_WR)
self._socket.close()
def main():
port = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_PORT
checkwait = sys.argv[2] if len(sys.argv) > 2 else DEFAULT_WAIT
print(f'PyHeartBeat server listening on port {port}\n')
print('*** Press Ctrl-C to stop ***\n')
receiver = BeatReceiver(port, checkwait)
receiver.start()
while receiver.is_alive():
try:
time.sleep(checkwait)
print('Active clients:')
print(receiver)
except KeyboardInterrupt:
print('Exiting')
receiver.stop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment