Skip to content

Instantly share code, notes, and snippets.

@ilyashupta
Created March 17, 2020 12:33
Show Gist options
  • Save ilyashupta/e3ae0914cbb137edf590ec68b49f098e to your computer and use it in GitHub Desktop.
Save ilyashupta/e3ae0914cbb137edf590ec68b49f098e to your computer and use it in GitHub Desktop.
#!/bin/python3
import asyncio
import sys
from ast import literal_eval
from optparse import OptionParser
async def check_service(name, address, port):
try:
future = asyncio.open_connection(address, port)
_, writer = await asyncio.wait_for(future, timeout=10.0)
except Exception as e:
return name, False
else:
writer.close()
return name, True
if __name__ == '__main__':
parser = OptionParser()
parser.add_option('-c', '--config', dest='config', help='Configuration file', metavar='CONFIG')
parser.add_option('-a', '--address', dest='address', default='localhost', help='Host to check', metavar='ADDRESS')
options, args = parser.parse_args()
with open(options.config, 'rt') as config_file:
config = literal_eval(config_file.read())
tasks = [check_service(service['name'], options.address, service['port']) for service in config.get('service_list', [])]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
print(results)
sys.exit(any(not result[1] for result in results))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment