Skip to content

Instantly share code, notes, and snippets.

@robertschneiderman
Created November 18, 2020 21:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robertschneiderman/07190719fe39b04cef6932b428efa63a to your computer and use it in GitHub Desktop.
Save robertschneiderman/07190719fe39b04cef6932b428efa63a to your computer and use it in GitHub Desktop.
import socket
import sys
import os
import subprocess
import time
RECHECK_INTERVAL_SECONDS = 10
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_name = "0.0.0.0" # Modify this if you want to bind only on a specific IP Address
server_address = (server_name, int(os.environ["TCP_HEALTH_CHECK_PORT"]))
print("starting up on %s port %s" % server_address, file=sys.stdout)
sock.bind(server_address)
sock.listen(1)
while True:
# If RADIUS service is not available, then let health check fail by not listening for 10 seconds
child = subprocess.Popen(
[
"eapol_test",
"-a",
os.environ["RADIUS_SERVER_IP"],
"-p",
"1812", # 1812 is the standard RADIUS port
"-A",
os.environ["TEST_ROUTER_IP"],
"-s",
os.environ["TEST_ROUTER_SECRET"],
"-c",
"ttls-eap.conf",
"-r0",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
(output, err) = child.communicate()
# Meanwhile checkback the health status after 10 seconds to see if RADIUS server recovered back
# If the service was not able to recover even on the successive tries:
# * Then consecutive health checks will fail
# * Current instance will be de-registered and drained
if child.returncode != 0:
time.sleep(RECHECK_INTERVAL_SECONDS)
continue
print("waiting for a connection", file=sys.stdout)
connection, client_address = sock.accept()
try:
print("client connected:", client_address, file=sys.stdout)
data = bytes("Server Alive", "utf-8")
connection.sendall(data)
connection.close()
finally:
connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment