Skip to content

Instantly share code, notes, and snippets.

@foozlevazquez
Last active May 17, 2021 06:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save foozlevazquez/1f0181f50e2eba5e7020b04f65ec9906 to your computer and use it in GitHub Desktop.
Save foozlevazquez/1f0181f50e2eba5e7020b04f65ec9906 to your computer and use it in GitHub Desktop.
from typing import List, Dict
import ssl
import asyncio
import websockets
import requests
import logging
import warnings
import logging
from pprint import pprint
warnings.filterwarnings('ignore', module='urllib3.connectionpool')
DEBUG = True
if DEBUG:
LOG_FMT='%(asctime)s %(name)s:%(levelname)s %(message)s'
logging.basicConfig(format=LOG_FMT, level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def exec(cmd_args:List[str], inst:str, api_addr='10.20.5.1:8443',
cert=('lxd.crt', 'lxd.key')):
payload = {'command': ['/bin/hostname'],
'environment': {},
'wait-for-websocket': True,
'interactive': False}
resp = requests.post(f'https://{api_addr}/1.0/instances/{inst}/exec',
cert=cert, verify=False, json=payload)
if resp.status_code != 202:
raise Exception("Unknown HTTP return code: {}".format(vars(resp)))
data = resp.json()
logger.debug(data)
if data['status_code'] != 100:
raise Exception("Unknown LXD return code: {}".format(data))
op = data['operation']
# stdin, stdout, stderr
# 'fds': {'0': '1e52...',
# '1': 'f671...', ..}
fds:Dict[str:str] = data['metadata']['metadata']['fds']
ssl_ctxt = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_ctxt.load_cert_chain(*cert)
ssl_ctxt.check_hostname = False
ssl_ctxt.verify_mode = ssl.CERT_NONE
control_secret = fds["control"]
stdin_secret = fds["0"]
stdout_secret = fds["1"]
stderr_secret = fds["2"]
stdout = []
stderr = []
async def get_op(op):
while True:
try:
resp = requests.get(
f'https://{api_addr}{op}', cert=cert, verify=False)
logger.debug("OP = {} {}".format(resp, resp.json()))
if 'return' in resp.json()['metadata']['metadata']:
break
await asyncio.sleep(0.5)
except Exception as exc:
logger.error("Got exception: " + str(exc))
logger.debug("Exiting get_op()")
async def ws_recv(secret, dest):
uri = f'wss://{api_addr}{op}/websocket?secret={secret}'
async with websockets.connect(uri, ssl=ssl_ctxt, close_timeout=0) as ws:
while True:
try:
resp = await ws.recv()
except Exception as exc:
logger.error("Got exception: " + str(exc))
break
logger.debug(f"{secret} received: \"{resp}\"")
if resp:
dest.append(resp)
else:
break
logger.debug(f"{secret} closing")
await ws.close()
logger.debug(f"{secret} leaving ctmgr")
logger.debug(f"{secret} exiting")
async def ws_send(secret, msg):
uri = f'wss://{api_addr}{op}/websocket?secret={secret}'
async with websockets.connect(uri, ssl=ssl_ctxt, close_timeout=0) as ws:
try:
resp = await ws.send(msg)
logger.debug(f"stdin got {resp}")
except Exception as exc:
logger.error("Got exception: " + str(exc))
logger.debug(f"stdin {secret} closing")
await ws.close()
logger.debug(f"stdin {secret} leaving ctmgr")
logger.debug(f"stdin {secret} exiting")
handle_stdin = ws_send(stdin_secret, "")
handle_stdout = ws_recv(stdout_secret, stdout)
handle_stderr = ws_recv(stderr_secret, stderr)
handle_control = ws_recv(control_secret, [])
handle_op = get_op(op)
await asyncio.gather(handle_control,
handle_stdin,
handle_stdout,
handle_stderr, handle_op)
print( {'stdout': stdout, 'stderr': stderr})
asyncio.run(exec(['hostname'], 'pytest-lxd-foo3'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment