Skip to content

Instantly share code, notes, and snippets.

@gccyugi
Created May 11, 2012 06:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save gccyugi/2658028 to your computer and use it in GitHub Desktop.
Save gccyugi/2658028 to your computer and use it in GitHub Desktop.
upyun-python-api
# -*- coding: utf-8 -*-
#
# :author yugi
# :email gccyugi@gmail.com
# :update 2012/5/18 15:06
import md5 as mmd5
DEBUG = False
api_upyun = 'api.upyun.com'
ENDPOINT_V0 = 'v0.' + api_upyun
ENDPOINT_V1 = 'v1.' + api_upyun
ENDPOINT_V2 = 'v2.' + api_upyun
ENDPOINT_V3 = 'v3.' + api_upyun
default_endpoint = ENDPOINT_V0
def md5(e):
m = mmd5.new()
m.update(e)
return m.hexdigest()
def md5_fd(fobj):
m = mmd5.new()
while True:
data = fobj.read(8096)
if not data:
break
m.update(data)
fobj.seek(0)
return m.hexdigest()
def set_endpoint(e):
global default_endpoint
default_endpoint = e
def get_endpoint():
return default_endpoint
class UpyunCore(object):
def __init__(self, auth_type='upyun'):
self._auth_type = auth_type
def _basic_auth(self, headers, username, password):
import base64
data = base64.encodestring(username + ':' + password).strip()
headers['Authorization'] = 'Basic ' + data
def _upyun_auth(self, headers, method, path, username, password):
import time
headers['Date'] = time.strftime('%a, %d %b %Y %X GMT', time.gmtime())
content_length = headers.get('Content-Length') or 0
signature = '&'.join([method, path, headers['Date'], str(content_length),
md5(password)])
headers['Authorization'] = 'UpYun %s:%s' % (username, md5(signature))
def request(self, host, method, path, data, username, password, options):
import httplib
headers = options.copy()
if self._auth_type == 'basic':
self._basic_auth(headers, username, password)
else:
self._upyun_auth(headers, method, path, username, password)
conn = httplib.HTTPConnection(host)
conn.request(method, path, data, headers)
res = conn.getresponse()
if DEBUG:
print '%s %s => %s %s' % (method, host + path, res.status, res.reason)
return res
class UpyunEntry(object):
def __init__(self, name, type, size, date):
self.name = name
self.type = 'folder' if type == 'F' else 'file'
self.size = size
self.date = date
def __str__(self):
return '<%s: %s>' % (self.type, self.name)
class Upyun(object):
def __init__(self, bucket, username, password, transport=UpyunCore()):
self._bucket = bucket
self._username = username
self._password = password
self._core = transport
def __str__(self):
return '#<bucket: %s, username: %s>' % (self._bucket, self._username)
def touch(self, path, data, md5_checksum=None, auto_mkdir=False):
options = {}
if md5_checksum:
options['Content-MD5'] = md5_checksum
if auto_mkdir:
options['mkdir'] = 'true'
if type(data) != file:
options['Content-Length'] = len(data)
res = self._request('PUT', self._generate_path(path), data, options)
return self._is_res_ok(res)
def touch_image(self, path, data, secret_code=None,
md5_checksum=None, auto_mkdir=False):
options = {}
if secret_code:
options['Content-Secret'] = secret_code
if md5_checksum:
options['Content-MD5'] = md5_checksum
if auto_mkdir:
options['mkdir'] = 'true'
if type(data) != file:
options['Content-Length'] = len(data)
res = self._request('PUT', self._generate_path(path), data, options)
if self._is_res_ok(res):
info = {}
info['width'] = int(res.getheader('x-upyun-width'))
info['height'] = int(res.getheader('x-upyun-height'))
info['frames'] = int(res.getheader('x-upyun-frames'))
info['is_folder'] = res.getheader('x-upyun-file-type') != 'file'
return True, info
return False, self._get_res_errmsg(res)
def cat(self, path):
res = self._request('GET', self._generate_path(path))
if self._is_res_ok(res):
return True, res.read()
return False, self._get_res_errmsg(res)
def stat(self, path):
res = self._request('HEAD', self._generate_path(path))
if self._is_res_ok(res):
info = {}
info['is_folder'] = res.getheader('x-upyun-file-type') != 'file'
info['size'] = int(res.getheader('x-upyun-file-size'))
info['timestamp'] = int(res.getheader('x-upyun-file-date'))
return True, info
return False, self._get_res_errmsg(res)
def usage(self, path='/'):
res = self._request('GET', self._generate_path(path) + '?usage')
if self._is_res_ok(res):
return True, int(res.read())
return False, self._get_res_errmsg(res)
def ls(self, path='/'):
res = self._request('GET', self._generate_path(path))
if self._is_res_ok(res):
data = res.read()
for e in data.split('\n'):
info_lst = e.split('\t')
if len(info_lst) >= 4:
yield UpyunEntry(*info_lst)
def mkdir(self, path, auto_mkdir=False):
options = {}
options['folder'] = 'create'
if auto_mkdir:
options['mkdir'] = 'true'
res = self._request('POST', self._generate_path(path), None, options)
return self._is_res_ok(res)
def rmdir(self, path):
return self.rm(path)
def rm(self, path):
res = self._request('DELETE', self._generate_path(path))
return self._is_res_ok(res)
def _is_res_ok(self, res):
return res.status == 200
def _get_res_errmsg(self, res):
return '%s %s: %s' % (res.status, res.reason, res.read())
def _request(self, method, path, data=None, options={}):
return self._core.request(get_endpoint(), method, path, data,
self._username, self._password,
options)
def _generate_path(self, filename=None):
path = '/' + self._bucket
return path + filename if filename else path
if __name__ == '__main__':
s = Upyun('<bucket-name>', '<operator-name>', '<operator-pwd>')
# endpoint
print 'endpoint: %s' % get_endpoint()
set_endpoint(ENDPOINT_V1)
print 'endpoint: %s' % get_endpoint()
# usage
rc, usage = s.usage()
if rc:
print 'usage: %d' % usage
# mkdir
s.mkdir('/tmp')
s.mkdir('/tmp/1/2', True)
# rmdir
s.rmdir('/tmp/1/2')
s.rmdir('/tmp/1')
s.rmdir('/tmp')
# touch
s.touch('/a.txt', 'hijcak')
# cat
rc, data = s.cat('/a.txt')
if rc:
print 'data of a.txt is: ' + data
# stat
rc, info = s.stat('/a.txt')
if rc:
print info
# rm
s.rm('/a.txt')
# ls
for e in s.ls():
print e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment