Created
April 21, 2021 16:10
-
-
Save ckxng/b31d666c0cb11cdca1760b9d2faa377a to your computer and use it in GitHub Desktop.
quick ldap dsa healthcheck service
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""simple socket service simulating an HTTP response to see if LDAP DSA is alive | |
""" | |
from subprocess import Popen, PIPE | |
from json import dumps | |
import socket | |
import sys | |
bind_host = "" | |
bind_port = 1200 | |
conn_queue = 10 | |
def checkdsa(): | |
"""checks if dsa is active | |
Requires: | |
* /usr/bin/ldapsearch | |
Will pass along any raised errors. | |
Returns: | |
(result, detail) | |
result(bool) - true if dsa is active, false otherwise | |
detail(object) | |
detail.alive(bool) - same as result | |
detail.rc(int) - returncode of check | |
detail.out(str) - lines of output | |
detail.err(str) - lines of error | |
""" | |
proc = Popen([ | |
"/usr/bin/ldapsearch", | |
"-xZZH", | |
"ldap://localhost", | |
"-b", | |
"cn=state" | |
], stdout=PIPE, stderr=PIPE) | |
(out, err) = proc.communicate() | |
rc = proc.wait() | |
detail = {} | |
detail['alive'] = False | |
detail['rc'] = rc | |
detail['out'] = out.decode("utf-8").split("\n") | |
detail['err'] = err.decode("utf-8").split("\n") | |
if rc == 0: | |
for line in detail['out']: | |
if line == "dsaIsActive: TRUE": | |
detail['alive'] = True | |
return (True, detail) | |
return (False, detail) | |
def printhttp(code, data): | |
print(printshttp(code,data)) | |
def printshttp(code, data): | |
"""prints a fake http response | |
Parameters: | |
code(str) - a string including the http code and title (ie. "200 OK") | |
data(object) - a payload to deliver to the browser via. JSON | |
""" | |
response = dumps(data) | |
return("""HTTP/1.0 %s | |
Content-Type: application/json; charset=UTF-8 | |
Content-Length: %s | |
%s | |
"""%(code, len(response), response)) | |
if __name__ == "__main__": | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
s.bind((bind_host,bind_port)) | |
except socket.err as msg: | |
print("Bind failed, the service might already be running: %s"%msg) | |
sys.exit(1) | |
s.listen(conn_queue) | |
while True: | |
conn, addr = s.accept() | |
(ok, detail) = checkdsa() | |
if ok: | |
conn.sendall(printshttp("200 OK", detail).encode()) | |
else: | |
conn.sendall(printshttp("500 Internal Server Error", detail).encode()) | |
conn.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=DSA Check Service | |
[Service] | |
Type=notify | |
ExecStart=/usr/local/bin/dsa-http-svc-chk | |
UMask=0066 | |
Restart=on-failure | |
[Install] | |
WantedBy=multi-user.target |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment