Skip to content

Instantly share code, notes, and snippets.

@ckxng
Created April 21, 2021 16:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ckxng/b31d666c0cb11cdca1760b9d2faa377a to your computer and use it in GitHub Desktop.
Save ckxng/b31d666c0cb11cdca1760b9d2faa377a to your computer and use it in GitHub Desktop.
quick ldap dsa healthcheck service
#!/usr/bin/env python3
"""simple socket service simulating an HTTP response to see if LDAP DSA is alive
"""
from subprocess import Popen, PIPE
from json import dumps
import socket
import sys
bind_host = ""
bind_port = 1200
conn_queue = 10
def checkdsa():
"""checks if dsa is active
Requires:
* /usr/bin/ldapsearch
Will pass along any raised errors.
Returns:
(result, detail)
result(bool) - true if dsa is active, false otherwise
detail(object)
detail.alive(bool) - same as result
detail.rc(int) - returncode of check
detail.out(str) - lines of output
detail.err(str) - lines of error
"""
proc = Popen([
"/usr/bin/ldapsearch",
"-xZZH",
"ldap://localhost",
"-b",
"cn=state"
], stdout=PIPE, stderr=PIPE)
(out, err) = proc.communicate()
rc = proc.wait()
detail = {}
detail['alive'] = False
detail['rc'] = rc
detail['out'] = out.decode("utf-8").split("\n")
detail['err'] = err.decode("utf-8").split("\n")
if rc == 0:
for line in detail['out']:
if line == "dsaIsActive: TRUE":
detail['alive'] = True
return (True, detail)
return (False, detail)
def printhttp(code, data):
print(printshttp(code,data))
def printshttp(code, data):
"""prints a fake http response
Parameters:
code(str) - a string including the http code and title (ie. "200 OK")
data(object) - a payload to deliver to the browser via. JSON
"""
response = dumps(data)
return("""HTTP/1.0 %s
Content-Type: application/json; charset=UTF-8
Content-Length: %s
%s
"""%(code, len(response), response))
if __name__ == "__main__":
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind((bind_host,bind_port))
except socket.err as msg:
print("Bind failed, the service might already be running: %s"%msg)
sys.exit(1)
s.listen(conn_queue)
while True:
conn, addr = s.accept()
(ok, detail) = checkdsa()
if ok:
conn.sendall(printshttp("200 OK", detail).encode())
else:
conn.sendall(printshttp("500 Internal Server Error", detail).encode())
conn.close()
[Unit]
Description=DSA Check Service
[Service]
Type=notify
ExecStart=/usr/local/bin/dsa-http-svc-chk
UMask=0066
Restart=on-failure
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment