Created
October 22, 2013 15:28
-
-
Save mariocesar/7102729 to your computer and use it in GitHub Desktop.
Ubuntu One OAuth1 API / Request, Authorize and Access
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
""" | |
Reference: | |
http://bazaar.launchpad.net/~ubuntuone-control-tower/ubuntuone-file-storage-api/trunk/view/head:/ubuntuone/filestorageapi/volumes.py | |
""" | |
import json | |
import urllib | |
import uuid | |
import time | |
import requests | |
from django.core.mail import send_mail | |
from django.utils import timezone | |
from django.utils.translation import ugettext_lazy as _ | |
from django.contrib.auth.models import (AbstractBaseUser, | |
UserManager) | |
from django.db import models | |
class User(AbstractBaseUser): | |
email = models.EmailField( | |
verbose_name='email address', | |
max_length=255, | |
unique=True, | |
db_index=True, | |
) | |
photo = models.ImageField(upload_to='photo', null=True, blank=True) | |
first_name = models.CharField(_('first name'), max_length=30, blank=True) | |
last_name = models.CharField(_('last name'), max_length=30, blank=True) | |
is_superuser = models.BooleanField( | |
_('superuser status'), | |
default=False, | |
help_text=_('Designates that this user has all permissions without ' | |
'explicitly assigning them.')) | |
is_staff = models.BooleanField( | |
_('staff status'), | |
default=False, | |
help_text=_('Designates whether the user can log into this admin ' | |
'site.')) | |
is_active = models.BooleanField( | |
_('active'), | |
default=True, | |
help_text=_('Designates whether this user should be treated as ' | |
'active. Unselect this instead of deleting accounts.')) | |
date_joined = models.DateTimeField(_('date joined'), default=timezone.now) | |
oauth_access_token = models.OneToOneField('accounts.OAuthAccess', null=True, | |
blank=True) | |
REQUIRED_FIELDS = [] | |
USERNAME_FIELD = 'email' | |
objects = UserManager() | |
def get_full_name(self): | |
""" | |
Returns the first_name plus the last_name, with a space in between. | |
""" | |
full_name = '%s %s' % (self.first_name, self.last_name) | |
return full_name.strip() | |
def get_short_name(self): | |
"Returns the short name for the user." | |
return self.first_name | |
def email_user(self, subject, message, from_email=None): | |
""" | |
Sends an email to this User. | |
""" | |
send_mail(subject, message, from_email, [self.email]) | |
def has_perm(self, perm, obj=None): | |
"Does the user have a specific permission?" | |
return self.is_superuser | |
def has_module_perms(self, app_label): | |
"Does the user have permissions to view the app `app_label`?" | |
return self.is_superuser | |
class OAuthRequest(models.Model): | |
token = models.TextField() | |
token_secret = models.TextField() | |
created = models.DateTimeField(auto_now_add=True) | |
class Meta: | |
db_table = 'oauth_request' | |
verbose_name = 'OAuth Request Token' | |
verbose_name_plural = 'OAuth Request Tokens' | |
def __unicode__(self): | |
return u'%s' % self.token | |
@classmethod | |
def request_authorization(cls): | |
params = { | |
'nonce': uuid.uuid1().hex, | |
'timestamp': int(time.time()), | |
'callback': urllib.quote('http://127.0.0.1:8000/verify/') | |
} | |
headers = { | |
'Authorization': 'OAuth realm="",' | |
'oauth_version="1.0",' | |
'oauth_nonce="{nonce}",' | |
'oauth_timestamp={timestamp},' | |
'oauth_consumer_key="ubuntuone",' | |
'oauth_signature_method="PLAINTEXT",' | |
'oauth_signature="hammertime&",' | |
'oauth_callback="{callback}"'.format(**params) | |
} | |
return requests.post('https://one.ubuntu.com/oauth/request/', | |
headers=headers) | |
def verify_authorization(self, oauth_verifier): | |
return requests.post( | |
'https://one.ubuntu.com/oauth/access/', | |
headers=self.authorization_header(oauth_verifier) | |
) | |
def authorization_header(self, verifier): | |
params = { | |
'nonce': uuid.uuid1().hex, | |
'timestamp': int(time.time()), | |
'oauth_token': self.token, | |
'oauth_token_secret': self.token_secret, | |
'oauth_verifier': verifier | |
} | |
headers = { | |
'Authorization': 'OAuth realm="",' | |
'oauth_version="1.0",' | |
'oauth_nonce="{nonce}",' | |
'oauth_timestamp={timestamp},' | |
'oauth_consumer_key="ubuntuone",' | |
'oauth_token="{oauth_token}",' | |
'oauth_verifier="{oauth_verifier}",' | |
'oauth_signature_method="PLAINTEXT",' | |
'oauth_signature="hammertime&{oauth_token_secret}"' | |
''.format(**params) | |
} | |
return headers | |
class OAuthAccess(models.Model): | |
token = models.TextField() | |
token_secret = models.TextField() | |
request = models.OneToOneField(OAuthRequest) | |
created = models.DateTimeField(auto_now_add=True) | |
class Meta: | |
db_table = 'oauth_access' | |
verbose_name = 'OAuth Access Token' | |
verbose_name_plural = 'OAuth Access Tokens' | |
def __unicode__(self): | |
return u'%s' % self.token | |
def authorization_header(self): | |
params = { | |
'nonce': uuid.uuid1().hex, | |
'timestamp': int(time.time()), | |
'oauth_token': self.token, | |
'oauth_token_secret': self.token_secret, | |
} | |
return { | |
'Authorization': 'OAuth realm="",' | |
'oauth_version="1.0",' | |
'oauth_nonce="{nonce}",' | |
'oauth_timestamp={timestamp},' | |
'oauth_consumer_key="ubuntuone",' | |
'oauth_token="{oauth_token}",' | |
'oauth_signature_method="PLAINTEXT",' | |
'oauth_signature="hammertime&{oauth_token_secret}"' | |
''.format(**params) | |
} | |
def get_user_or_create(self): | |
account = self.account | |
user, created = User.objects.get_or_create( | |
email__iexact=account['email'], | |
defaults={ | |
'first_name': account['first_name'], | |
'last_name': account['last_name'], | |
'oauth_access_token': self, | |
'is_active': True | |
}) | |
if not user.first_name or not user.last_name: | |
user.first_name = account['first_name'] | |
user.last_name = account['last_name'] | |
user.save() | |
return user, created | |
@property | |
def account(self): | |
"""Return information about the user account. | |
{ | |
ID information | |
"username": (string) Internal U1 username | |
"openid": (string) OpenID identifier for account | |
"first_name": (string) user-displayable first name (may be empty) | |
"last_name": (string) user-displayable last name (may be empty) | |
"nickname": (string) user-displayable nickname (may be empty) | |
"email": (string) user-displayable email address (may be empty) | |
"id": (int) Internal U1 ID number | |
CouchDB (store data) information | |
"couchdb_root": "https://couchdb.one.ubuntu.com/u/ABC/DEF/NNNNNN" | |
"couchdb": { | |
"host": "https://couchdb.one.ubuntu.com", | |
"root": "https://couchdb.one.ubuntu.com/u/ABC/DEF/NNNNNN", | |
"dbpath": "u/ABC/DEF/NNNNNN" | |
}, | |
Information about the user's subscriptions and features available | |
"features": [ | |
A list of codenames for available features. Possible features: | |
"contacts_syncml" | |
"couch_storage" | |
"file_storage" | |
"music_streaming" | |
"notes_sync" | |
], | |
"subscription": { | |
"description": (string) user-displayable description of account | |
"currency": (string) three-character currency code for account payments | |
"started": (string) ISO 8601 date for subscription start | |
"is_paid": (boolean) subscription paid status | |
"expires": (string) ISO 8601 date for subscription expiry | |
"id": (int) Internal U1 subscription ID | |
"price": (float) subscription cost | |
"trial": (boolean) is subscription a trial? | |
"qty": (int) subscription quantity | |
"upgrade_available": (boolean) is upgrade available? | |
"name": (string) user-displayable name of subscription | |
}, | |
"current_plan": (string, embedded newlines) user-displayable description of current plan | |
} | |
""" | |
resp = requests.get( | |
'https://one.ubuntu.com/api/account/', | |
headers=self.authorization_header()) | |
return resp.json() | |
def get_file_storage(self, volume=None, path=None, include_children=False): | |
endpoint = 'https://one.ubuntu.com/api/file_storage/v1' | |
volume = urllib.quote(volume) | |
path = urllib.quote(path) | |
if volume and path: | |
endpoint += '%s/%s' % (volume, path) | |
if include_children: | |
endpoint += '?include_children=true' | |
return requests.get( | |
endpoint, | |
headers=self.authorization_header() | |
) | |
def get_file_storage_volumes(self): | |
return requests.get( | |
'https://one.ubuntu.com/api/file_storage/v1/volumes', | |
headers=self.authorization_header() | |
) | |
def put_file_storage(self, volume, path, data=None): | |
volume = urllib.quote(volume) | |
path = urllib.quote(path) | |
if data: | |
data = json.dumps(data) | |
return requests.put( | |
'https://one.ubuntu.com/api/file_storage/v1%s/%s/' % ( | |
volume, path), | |
headers=self.authorization_header(), | |
data=data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urlparse | |
from django.contrib.auth import authenticate | |
from django.contrib.auth.decorators import login_required | |
from django.utils.decorators import method_decorator | |
from django.contrib.auth import get_user_model | |
from django.shortcuts import get_object_or_404 | |
from django.http import HttpResponse, HttpResponseRedirect | |
from django.contrib.auth import login | |
from solar.accounts.models import OAuthRequest, OAuthAccess | |
User = get_user_model() | |
def authorize(request): | |
resp = OAuthRequest.request_authorization() | |
if resp.status_code == 200: | |
tokens = urlparse.parse_qs(resp.text) | |
tokens['description'] = 'Ubuntu One @ web.c1.com.bo [solar]' | |
tokens['oauth_token'] = tokens['oauth_token'][0] | |
tokens['oauth_token_secret'] = tokens['oauth_token_secret'][0] | |
OAuthRequest.objects.create( | |
token=tokens['oauth_token'], | |
token_secret=tokens['oauth_token_secret'] | |
) | |
return HttpResponseRedirect( | |
'https://one.ubuntu.com/oauth/authorize/?' | |
'oauth_token={oauth_token}&' | |
'description={description}'.format(**tokens)) | |
return HttpResponse(resp.text) | |
def verify(request): | |
oauth_token = request.GET.get('oauth_token') | |
oauth_verifier = request.GET.get('oauth_verifier') | |
oauth_request = get_object_or_404(OAuthRequest, token=oauth_token) | |
resp = oauth_request.verify_authorization(oauth_verifier) | |
if resp.status_code == 200: | |
tokens = urlparse.parse_qs(resp.text) | |
access_token = OAuthAccess.objects.create( | |
token=tokens['oauth_token'][0], | |
token_secret=tokens['oauth_token_secret'][0], | |
request=oauth_request, | |
) | |
user = authenticate(access_token=access_token) | |
login(request, user) | |
return HttpResponseRedirect('/websites/') | |
return HttpResponse(resp.text) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment