Skip to content

Instantly share code, notes, and snippets.

@chishaku
Created June 8, 2016 16:55
Show Gist options
  • Save chishaku/bbadf169a32586d16104d6ef9eb9feb4 to your computer and use it in GitHub Desktop.
Save chishaku/bbadf169a32586d16104d6ef9eb9feb4 to your computer and use it in GitHub Desktop.
import os
import hashlib
from b2 import b2
import requests
from bunch import bunchify
from logger import get_logger
class B2ApiModified(b2.B2Api):
"""An extension of the base B2Api maintained by backblaze."""
def open_file_from_url(self, url, authorization=True,
headers_received_cb=None):
"""Opens a file from a given url."""
request_headers = {}
if authorization:
request_headers['Authorization'] = self.account_info.get_account_auth_token()
with b2.OpenUrl(url, None, request_headers) as response:
info = response.info()
if headers_received_cb is not None:
headers_received_cb(info) # may raise an exception to abort
file_size = int(info['content-length'])
file_sha1 = info['x-bz-content-sha1']
block_size = 4096
digest = hashlib.sha1()
bytes_read = 0
while 1:
data = response.read(block_size)
if len(data) == 0:
break
digest.update(data)
bytes_read += len(data)
return data
if bytes_read != int(info['content-length']):
raise b2.TruncatedOutput(bytes_read, file_size)
if digest.hexdigest() != file_sha1:
raise b2.ChecksumMismatch(
checksum_type='sha1',
expected=file_sha1,
actual=digest.hexdigest()
)
class B2Client(object):
"""A B2 client built on top of the actively developed Backblaze client."""
def __init__(self, account_id='', application_key='', log_path='.'):
self.log = get_logger('b2', log_path=log_path, console_level=2)
self.log.debug('Authorizing B2 account.')
self.client = B2ApiModified()
self.client.authorize_account(realm_url='https://api.backblaze.com',
account_id=account_id, application_key=application_key)
auth_token = self.client.account_info.get_account_auth_token()
self.headers = { 'Authorization': auth_token }
def create_bucket(self, bucket_name, bucket_type='allPrivate'):
"""Create a B2 bucket.
Args:
bucket_name (str): The name of the B2 bucket.
bucket_type (str): allPrivate or allPublic. Default to 'allPrivate'.
"""
try:
self.client.create_bucket(bucket_name, bucket_type)
self.log.info('"{}" bucket was created successfully.'.format(bucket_name))
except b2.DuplicateBucketName:
self.log.error('"{}" bucket already exists.'.format(bucket_name))
def delete_bucket(self, bucket_name):
"""Delete a B2 bucket.
Args:
bucket_name (str): The name of the B2 bucket.
"""
bucket_id = self.get_bucket_id(bucket_name)
bucket = b2.Bucket(self.client, bucket_id, bucket_name)
self.client.delete_bucket(bucket)
self.log.info('"{}" bucket was deleted successfully.'.format(bucket_name))
def upload(self, bucket_name, filepath):
"""Upload a B2 file.
Args:
bucket_name (str): The name of the B2 bucket.
filepath (str): The full path of the file to be uploaded
"""
bucket_id = self.get_bucket_id(bucket_name)
bucket = b2.Bucket(self.client, bucket_id, bucket_name)
directory, filename = os.path.split(filepath)
if not self.get_file_info(bucket_name, filename):
bucket.upload_file(filepath, filename)
self.log.info('"{}" was added to the "{}" bucket.'.format(filename, bucket_name))
else:
self.log.error('"{}" already exists in "{}" bucket. The file was not uploaded.'.format(filename, bucket_name))
def open(self, bucket_name, filename):
"""Return the content of a B2 file.
Args:
bucket_name (str): The name of the B2 bucket.
filename (str): The name of the B2 file.
"""
file_info = self.get_file_info(bucket_name, filename)
if file_info:
url = file_info.download_url
return self.client.open_file_from_url(url)
def download(self, bucket_name, filename, dest='.'):
"""Download a B2 file.
Args:
bucket_name (str): The name of the B2 bucket.
filename (str): The name of the B2 file.
dest (str): Destination directory.
"""
file_info = self.get_file_info(bucket_name, filename)
if file_info:
url = file_info.download_url
outfile = os.path.join(dest, filename)
if os.path.exists(outfile):
self.log.error('"{}" target file already exists. Nothing was downloaded.'.format(outfile))
else:
with open(filename, 'w') as f:
self.client.download_file_from_url(url, f)
self.log.info('"{}" was downloaded successfully.'.format(outfile))
else:
self.log.error('"{}" does not exist in the "{}" bucket. Nothing was downloaded.'.format(filename, bucket_name))
def delete(self, bucket_name, filename):
"""Delete a B2 file.
Args:
bucket_name (str): The name of the B2 bucket.
filename (str): The name of the B2 file.
"""
file_info = self.get_file_info(bucket_name, filename)
if file_info:
file_id = file_info.file_id
self.client.delete_file_version(file_id, filename)
self.log.info('"{}" was deleted successfully.'.format(filename))
else:
self.log.error('"{}" does not exist in the "{}" bucket. Nothing was deleted.'.format(filename, bucket_name))
def get_files(self, bucket_name):
"""Return a list of B2 files.
TODO: convert camelcase keys from B2Api.
Each file is represented by a dictionary including the following keys:
* fileName
* fileId
Args:
bucket_name (str): The name of the B2 bucket.
"""
bucket_id = self.get_bucket_id(bucket_name)
bucket = b2.Bucket(self.client, bucket_id, bucket_name)
return bunchify(bucket.list_file_names()).files
def list_files(self, bucket_name):
"""Print out a list of files from a bucket.
Args:
bucket_name (str): The name of the B2 bucket.
"""
self.client.list_buckets() # Needed to refresh bucket list, not sure why.
print 'Listing all files from "{}" bucket.'.format(bucket_name)
for f in self.get_files(bucket_name):
print '\t', f.fileName, '-', f.fileId
def list_buckets(self):
"""Print out a list of buckets."""
buckets = self.client.list_buckets()
print 'Listing all buckets.'
all_bucket_names = []
for b in buckets:
bucket_name = repr(b).split(',')[1]
bucket_type = repr(b).split(',')[2]
print '\t', bucket_name, '-', bucket_type
all_bucket_names.append(bucket_name)
return ','.join(all_bucket_names)
def get_bucket_id(self, bucket_name):
"""Get a bucket_id given the name of the bucket.
Args:
bucket_name (str): The name of the B2 bucket.
"""
try:
return self.client.get_bucket_by_name(bucket_name).id_
except b2.NonExistentBucket:
self.log.error('"{}" bucket does not exist.'.format(bucket_name))
raise
def get_file_info(self, bucket_name, filename):
"""Get info for a B2 file.
Args:
bucket_name (str): The name of the B2 bucket.
filename (str): The name of the B2 file.
"""
bucket_id = self.get_bucket_id(bucket_name)
bucket = b2.Bucket(self.client, bucket_id, bucket_name)
url = bucket.get_download_url(filename)
r = requests.head(url, headers=self.headers)
if r.status_code != 200:
return False
else:
file_id = r.headers['x-bz-file-id']
file_info = bunchify(self.client.get_file_info(file_id))
file_info.download_url = url
file_info.file_id = file_info.pop('fileId')
return file_info
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment