Last active
May 17, 2021 06:27
-
-
Save foozlevazquez/1f0181f50e2eba5e7020b04f65ec9906 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Dict | |
import ssl | |
import asyncio | |
import websockets | |
import requests | |
import logging | |
import warnings | |
import logging | |
from pprint import pprint | |
warnings.filterwarnings('ignore', module='urllib3.connectionpool') | |
DEBUG = True | |
if DEBUG: | |
LOG_FMT='%(asctime)s %(name)s:%(levelname)s %(message)s' | |
logging.basicConfig(format=LOG_FMT, level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
async def exec(cmd_args:List[str], inst:str, api_addr='10.20.5.1:8443', | |
cert=('lxd.crt', 'lxd.key')): | |
payload = {'command': ['/bin/hostname'], | |
'environment': {}, | |
'wait-for-websocket': True, | |
'interactive': False} | |
resp = requests.post(f'https://{api_addr}/1.0/instances/{inst}/exec', | |
cert=cert, verify=False, json=payload) | |
if resp.status_code != 202: | |
raise Exception("Unknown HTTP return code: {}".format(vars(resp))) | |
data = resp.json() | |
logger.debug(data) | |
if data['status_code'] != 100: | |
raise Exception("Unknown LXD return code: {}".format(data)) | |
op = data['operation'] | |
# stdin, stdout, stderr | |
# 'fds': {'0': '1e52...', | |
# '1': 'f671...', ..} | |
fds:Dict[str:str] = data['metadata']['metadata']['fds'] | |
ssl_ctxt = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
ssl_ctxt.load_cert_chain(*cert) | |
ssl_ctxt.check_hostname = False | |
ssl_ctxt.verify_mode = ssl.CERT_NONE | |
control_secret = fds["control"] | |
stdin_secret = fds["0"] | |
stdout_secret = fds["1"] | |
stderr_secret = fds["2"] | |
stdout = [] | |
stderr = [] | |
async def get_op(op): | |
while True: | |
try: | |
resp = requests.get( | |
f'https://{api_addr}{op}', cert=cert, verify=False) | |
logger.debug("OP = {} {}".format(resp, resp.json())) | |
if 'return' in resp.json()['metadata']['metadata']: | |
break | |
await asyncio.sleep(0.5) | |
except Exception as exc: | |
logger.error("Got exception: " + str(exc)) | |
logger.debug("Exiting get_op()") | |
async def ws_recv(secret, dest): | |
uri = f'wss://{api_addr}{op}/websocket?secret={secret}' | |
async with websockets.connect(uri, ssl=ssl_ctxt, close_timeout=0) as ws: | |
while True: | |
try: | |
resp = await ws.recv() | |
except Exception as exc: | |
logger.error("Got exception: " + str(exc)) | |
break | |
logger.debug(f"{secret} received: \"{resp}\"") | |
if resp: | |
dest.append(resp) | |
else: | |
break | |
logger.debug(f"{secret} closing") | |
await ws.close() | |
logger.debug(f"{secret} leaving ctmgr") | |
logger.debug(f"{secret} exiting") | |
async def ws_send(secret, msg): | |
uri = f'wss://{api_addr}{op}/websocket?secret={secret}' | |
async with websockets.connect(uri, ssl=ssl_ctxt, close_timeout=0) as ws: | |
try: | |
resp = await ws.send(msg) | |
logger.debug(f"stdin got {resp}") | |
except Exception as exc: | |
logger.error("Got exception: " + str(exc)) | |
logger.debug(f"stdin {secret} closing") | |
await ws.close() | |
logger.debug(f"stdin {secret} leaving ctmgr") | |
logger.debug(f"stdin {secret} exiting") | |
handle_stdin = ws_send(stdin_secret, "") | |
handle_stdout = ws_recv(stdout_secret, stdout) | |
handle_stderr = ws_recv(stderr_secret, stderr) | |
handle_control = ws_recv(control_secret, []) | |
handle_op = get_op(op) | |
await asyncio.gather(handle_control, | |
handle_stdin, | |
handle_stdout, | |
handle_stderr, handle_op) | |
print( {'stdout': stdout, 'stderr': stderr}) | |
asyncio.run(exec(['hostname'], 'pytest-lxd-foo3')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment