Skip to content

Instantly share code, notes, and snippets.

@gimmi
Last active April 29, 2024 12:54
Show Gist options
  • Save gimmi/c4a830d3db6180441a4beceb138351b9 to your computer and use it in GitHub Desktop.
Save gimmi/c4a830d3db6180441a4beceb138351b9 to your computer and use it in GitHub Desktop.
Python
import argparse
from pathlib import Path
import json
default_path = Path(__file__, '..', 'defaults.json')
parser = argparse.ArgumentParser()
def defaults_from_json(*args):
global default_path
default_path = Path(*args)
def add_argument(*args, **kwargs):
parser.add_argument(*args, **kwargs)
def parse_args():
args = json.loads(default_path.read_text())
args = argparse.Namespace(**args)
return parser.parse_args(namespace=args)
import os
import sys
from types import SimpleNamespace
from urllib.request import Request
import urllib.parse
import json
import argparse
from pathlib import Path
import email.utils
import shutil
def get_access_token(tenant, app_id, password, resource):
# See https://docs.microsoft.com/en-us/azure/active-directory/azuread-dev/v1-oauth2-client-creds-grant-flow
req = Request(
f'https://login.microsoftonline.com/{tenant}/oauth2/token',
method='POST',
headers={
'Content-Type': 'application/x-www-form-urlencoded',
},
data=urllib.parse.urlencode({
'grant_type': 'client_credentials',
'client_id': app_id,
'client_secret': password,
'resource': resource,
}).encode(),
)
with urllib.request.urlopen(req) as res:
config = json.load(res)
return config['access_token']
def split_blob_url(url):
(scheme, netloc, path, _, _) = urllib.parse.urlsplit(url)
resource = urllib.parse.urlunsplit((scheme, netloc, '', '', ''))
(_, container_name, blob_name) = path.split('/', 2)
return (resource, container_name, blob_name)
def download_blob(tenant, app_id, password, blob_url, out_fileobj):
(resource, container_name, blob_name) = split_blob_url(blob_url)
# This require the "Storage Blob Data Reader" role on the service principal
access_token = get_access_token(tenant, app_id, password, resource)
req = Request(
f'{resource}/{container_name}/{blob_name}',
method='GET',
headers={
'Authorization': f'Bearer {access_token}',
'Date': email.utils.formatdate(timeval=None, localtime=False, usegmt=True),
'x-ms-version': '2019-07-07',
},
)
with urllib.request.urlopen(req) as res:
# https://stackoverflow.com/a/42011644/66629
shutil.copyfileobj(res, out_fileobj)
def put_blob(tenant, app_id, password, blob_url, path):
(resource, container_name, blob_name) = split_blob_url(blob_url)
# This require the "Storage Blob Data Reader" role on the service principal
access_token = get_access_token(tenant, app_id, password, resource)
content_length = path.stat().st_size
with path.open('rb') as file:
req = Request(
f'{resource}/{container_name}/{blob_name}',
method='PUT',
headers={
'Authorization': f'Bearer {access_token}',
'Date': email.utils.formatdate(timeval=None, localtime=False, usegmt=True),
'x-ms-version': '2019-07-07',
'x-ms-blob-type': 'BlockBlob',
'Content-Length': content_length,
},
data=file,
)
urllib.request.urlopen(req)
if __name__ == '__main__':
tenant = os.environ['AZCOPY_TENANT_ID']
app_id = os.environ['AZCOPY_SPA_APPLICATION_ID']
password = os.environ['AZCOPY_SPA_CLIENT_SECRET']
src = sys.argv[1]
dst = Path(sys.argv[2])
print(f'{src} => {dst}')
dst.parent.mkdir(parents=True, exist_ok=True)
with dst.open('wb') as file:
download_blob(tenant, app_id, password, src, file)
import urllib.request, json, base64, ssl
def http_get_json(username, password, url, verify=True):
ctx = None
if not verify:
# https://stackoverflow.com/a/58337431/66629
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url, headers={ 'Accept': 'application/json' })
if password:
authorization = f'{username}:{password}'.encode('ascii')
authorization = base64.b64encode(authorization).decode('ascii')
req.add_header('Authorization', f'Basic {authorization}')
with urllib.request.urlopen(req, context=ctx) as res:
return json.load(res)
urls = [
'https://httpbin.org/anything', # OK
'https://httpbin.org/redirect-to?url=http%3A%2F%2Flocalhost%2Fanything', # OK
'https://httpbin.org/status/404', # HTTP Error 404: NOT FOUND
'https://httpbin.org/status/500', # HTTP Error 500: INTERNAL SERVER ERROR
]
for url in urls:
try:
print(f'---- GET {url}')
print(http_get_json(url))
except Exception as e:
print(e)
import logging
import re
class OneLineFormatter(logging.Formatter):
def __init__(self):
super().__init__('%(levelname)s:%(name)s:%(message)s')
def format(self, record):
log = super().format(record)
return re.sub(r'\s*\n+\s*', ' | ', log)
def configure_logging():
formatter = OneLineFormatter()
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root = logging.getLogger()
root.addHandler(handler)
root.setLevel(logging.INFO)
configure_logging()
logging.debug('This is a DEBUG')
logging.info('This is a INFO')
logging.warning('This is a WARNING')
logging.error('This is a ERROR')
logging.critical('This is a CRITICAL')
logging.getLogger().setLevel(logging.DEBUG)
logging.debug('This is a DEBUG')
logging.info('This is a INFO')
logging.warning('This is a WARNING')
logging.error('This is a ERROR')
logging.critical('This is a CRITICAL')
try:
raise Exception('message')
except Exception as e:
logging.exception('This is a EXCEPTION')
# Inspired from https://stackoverflow.com/a/4628446/66629
import sys
args = iter(sys.argv)
next(args) # Discard script name
for key, val in zip(args, args):
print(f'{key} -> {val}')
args = iter(sys.argv)
next(args) # Discard script name
print(dict(zip(args, args)))
import os
import subprocess
import sys
import textwrap
# subprocess.check_call(['scp -i', r'C:\Users\dev\.ssh\bastion-rke', os.path.join(os.path.dirname(__file__), 'xxx.py'), 'rke@192.168.201.105:/tmp/xxx.py'])
# subprocess.check_call(['ssh -i', r'C:\Users\dev\.ssh\bastion-rke rke@192.168.201.105 python3 /tmp/xxx.py'])
# subprocess.check_call(['ssh -i', r'C:\Users\dev\.ssh\bastion-rke rke@192.168.201.105 python3 -c print("llll")'])
subprocess.run(['ssh -i', r'C:\Users\dev\.ssh\bastion-rke rke@192.168.201.105', 'python3', '-', 'a', 'b', 'c'], check=True, text=True, input=textwrap.dedent('''
import sys
print(f'Running python code with {len(sys.argv)} args:')
for i, arg in enumerate(sys.argv):
print(f"Argument {i:>6}: {arg}")
'''))
# TODO big problems with interpreting windows line endings
subprocess.run(['ssh', '-i', r'C:\Users\dev\.ssh\bastion-rke', 'rke@192.168.201.105', 'bash -s'], check=True, text=True, input='echo "hello from bash"')
subprocess.run(['ssh', '-i', r'C:\Users\dev\.ssh\bastion-rke', 'rke@192.168.201.105', 'docker', 'run', '-it', '--rm', 'hello-world'], check=True)
import tarfile
tar_file = tarfile.open('file.tar', 'r:')
print(tar_file.getnames())
for name in tar_file.getnames():
print(f'-- name: {name} ---------------------')
with tar_file.extractfile(name) as rfile:
print(rfile.read())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment