Skip to content

Instantly share code, notes, and snippets.

@rcarmo
Created October 20, 2021 08:04
Show Gist options
  • Save rcarmo/823bd16c87cadd9e692285484a59ef1a to your computer and use it in GitHub Desktop.
Save rcarmo/823bd16c87cadd9e692285484a59ef1a to your computer and use it in GitHub Desktop.
Azure Blob Storage Tiering for Synology Backups:
import datetime
from typing import Iterator
from requests import Session, Response
from base64 import b64encode, b64decode
from datetime import datetime, timedelta, timezone
from email.utils import formatdate, parsedate_to_datetime
from hashlib import sha256, md5
from hmac import HMAC
from lxml.etree import Element, tostring, fromstring, iterwalk
from os import environ
from logging import getLogger, basicConfig
import azure.functions as func
log = getLogger(__name__)
basicConfig(format = 'time=%(asctime)s loc=%(funcName)s:%(lineno)d msg="%(message)s"',
level = environ.get('LOGLEVEL','DEBUG'))
class BlobClient:
account = None
auth = None
session = None
def __init__(self, account: str, auth=None, session=None) -> None:
"""Create a BlobClient instance"""
self.account = account
if auth is not None:
self.auth = b64decode(auth)
if session is None:
session = Session()
self.session = session
def close(self) -> None:
self.session.close()
def _headers(self, headers={}, date=None) -> dict:
"""Default headers for REST requests"""
if not date:
date = formatdate(usegmt=True) # if you don't use GMT, the API breaks
return {
'x-ms-date': date,
'x-ms-version': '2018-03-28',
'Content-Type': 'application/octet-stream',
'Connection': 'Keep-Alive',
**headers
}
#'x-ms-blob-type': 'BlockBlob', # TODO: support also append blobs later
#'x-ms-blob-content-disposition': 'attachment; filename="fname.ext"',
#'Content-Type': 'text/plain; charset=UTF-8',
#'x-ms-meta-createdBy': 'aioazstorage',
def _sign_for_blobs(self, verb: str, canonicalized: str, headers={}, payload='') -> dict:
"""Compute SharedKeyLite authorization header and add standard headers"""
headers = self._headers(headers)
signing_headers = sorted(filter(lambda x: 'x-ms' in x, headers.keys()))
canon_headers = "\n".join("{}:{}".format(k, headers[k]) for k in signing_headers)
sign = "\n".join([verb, '', headers['Content-Type'], '', canon_headers, canonicalized]).encode('utf-8')
return {
'Authorization': 'SharedKeyLite {}:{}'.format(self.account, \
b64encode(HMAC(self.auth, sign, sha256).digest()).decode('utf-8')),
'Content-Length': str(len(payload)),
**headers
}
def createContainer(self, container_name) -> Response:
"""Create a new Container"""
canon = f'/{self.account}/{container_name}'
uri = f'https://{self.account}.blob.core.windows.net/{container_name}?restype=container'
return self.session.put(uri, headers=self._sign_for_blobs("PUT", canon))
def deleteContainer(self, container_name) -> Response:
canon = f'/{self.account}/{container_name}'
uri = f'https://{self.account}.blob.core.windows.net/{container_name}?restype=container'
return self.session.delete(uri, headers=self._sign_for_blobs("DELETE", canon))
def listContainers(self, marker=None) -> Iterator[dict]:
canon = f'/{self.account}/?comp=list'
if marker is None:
uri = f'https://{self.account}.blob.core.windows.net/?comp=list'
else:
uri = f'https://{self.account}.blob.core.windows.net/?comp=list&marker={marker}'
res = self.session.get(uri, headers=self._sign_for_blobs("GET", canon))
if res.ok:
doc = fromstring(res.text.encode('utf-8'))
for container in doc.xpath("//Container"):
item = {
"name": container.find("Name").text
}
for prop in container.xpath("./Properties/*"):
if prop.tag in ["Creation-Time","Last-Modified","Etag","Content-Length","Content-Type","Content-Encoding","Content-MD5","Cache-Control"]:
if prop.tag in ["Last-Modified", "DeletedTime"]:
item[prop.tag.lower()] = parsedate_to_datetime(prop.text)
else:
item[prop.tag.lower()] = prop.text
yield item
tag = doc.find("NextMarker")
if tag is not None:
if tag.text:
del res
del doc
for item in self.listContainers(tag.text):
yield item
else:
log.info("not ok")
log.info(res.status_code)
log.info(res.text)
def listBlobs(self, container_name, marker=None) -> Iterator[dict]:
canon = f'/{self.account}/{container_name}?comp=list'
if marker is None:
uri = f'https://{self.account}.blob.core.windows.net/{container_name}?restype=container&comp=list&include=metadata'
else:
uri = f'https://{self.account}.blob.core.windows.net/{container_name}?restype=container&comp=list&include=metadata&marker={marker}'
res = self.session.get(uri, headers=self._sign_for_blobs("GET", canon))
if res.ok:
doc = fromstring(res.text.encode('utf-8'))
for blob in doc.xpath("//Blob"):
item = {
"name": blob.find("Name").text
}
for prop in blob.xpath("./Properties/*"):
if prop.tag in ["AccessTier","Creation-Time","Last-Modified","Etag","Content-Length","Content-Type","Content-Encoding","Content-MD5","Cache-Control"] and prop.text:
if prop.tag in ["Last-Modified", "Creation-Time"]:
item[prop.tag.lower()] = parsedate_to_datetime(prop.text)
elif prop.tag in ["Content-Length"]:
item[prop.tag.lower()] = int(prop.text)
elif prop.tag in ["Content-MD5"]:
item[prop.tag.lower()] = b64decode(prop.text.encode('utf-8'))
else:
item[prop.tag.lower()] = prop.text
yield item
tag = doc.find("NextMarker")
if tag is not None:
if tag.text:
del res
del doc
for item in self.listBlobs(container_name, tag.text):
yield item
else:
log.error(res.status_code)
log.error(res.text)
def putBlob(self, container_name: str, blob_path: str, payload, mimetype="application/octet-stream") -> Response:
"""Upload a blob"""
canon = f'/{self.account}/{container_name}/{blob_path}'
uri = f'https://{self.account}.blob.core.windows.net/{container_name}/{blob_path}'
headers = {
'x-ms-blob-type': 'BlockBlob',
'x-ms-blob-content-type': mimetype,
'Content-Type': mimetype
}
return self.session.put(uri, data=payload, headers=self._sign_for_blobs("PUT", canon, headers, payload))
def setBlobTier(self, container_name: str, blob_path: str, tier: str) -> Response:
canon = f'/{self.account}/{container_name}/{blob_path}?comp=tier'
uri = f'https://{self.account}.blob.core.windows.net/{container_name}/{blob_path}?comp=tier'
headers = {
'x-ms-access-tier': tier
}
return self.session.put(uri, headers=self._sign_for_blobs("PUT", canon, headers))
def main(mytimer: func.TimerRequest) -> None:
now = datetime.now(timezone.utc)
cool_window = now + timedelta(hours=-24)
archive_window = now + timedelta(days=-30)
log.info(f'Starting run {now}')
if mytimer.past_due:
log.info('The timer is past due!')
client = BlobClient(environ.get("STORAGE_ACCOUNT_NAME"), environ.get("STORAGE_ACCOUNT_KEY"))
for container in client.listContainers():
if container['name'] in environ.get("INSPECT_CONTAINERS", "homes"):
for blob in client.listBlobs(container['name']):
if "HyperBackup.hbk/Pool/0" in blob['name']:
if "index" in blob['name']:
if blob['accesstier'] != "Cool":
if blob['last-modified'] < cool_window:
log.info(f"{blob['last-modified']} <-> {cool_window} for {blob}")
res = client.setBlobTier(container['name'], blob['name'], "Cool")
if not res.ok:
log.info(f"error setting tier: {res.text}")
log.info('ending run')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment