Created
January 5, 2019 16:42
-
-
Save weaming/51b8afaf0009caee7aae5053bf6bbfef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from oauthlib.common import urldecode | |
from requests_oauthlib import OAuth1Session, OAuth1 | |
from django.db import models | |
from maid.models.base.model import ModelDateTimeMixin, ModelHelperMixin | |
from maid.utils import generate_uuid4 | |
class OAuth(ModelDateTimeMixin, ModelHelperMixin): | |
uuid = models.CharField(max_length=36, primary_key=True, default=generate_uuid4) | |
client_key = models.CharField(max_length=128) | |
client_secret = models.CharField(max_length=128) | |
request_token = models.CharField(max_length=128) | |
request_token_secret = models.CharField(max_length=128) | |
verifier = models.CharField(max_length=128, null=True) | |
token = models.CharField(max_length=128, null=True) | |
token_secret = models.CharField(max_length=128, null=True) | |
class Meta: | |
db_table = 'oauth' | |
class OAuthHelper: | |
def fetch_request_token(self, client_key, client_secret, *, uuid=None): | |
oauth = OAuth1(client_key, client_secret=client_secret, callback_uri='oob') | |
res = requests.post(url=API.oauth.request_token, auth=oauth) | |
token = dict(urldecode(res.text.strip())) | |
resource_owner_key = token['oauth_token'] | |
resource_owner_secret = token['oauth_token_secret'] | |
# save db | |
if uuid: | |
oa: OAuth = OAuth.objects.get(uuid=uuid) | |
update_x(oa, { | |
'client_key': client_key, | |
'client_secret': client_secret, | |
'request_token': resource_owner_key, | |
'request_token_secret': resource_owner_secret, | |
}) | |
else: | |
oa: OAuth = create_x(OAuth, { | |
'client_key': client_key, | |
'client_secret': client_secret, | |
'request_token': resource_owner_key, | |
'request_token_secret': resource_owner_secret, | |
}) | |
# get auth url | |
authorize_url = API.oauth.authorize + '?oauth_token=' | |
authorize_url += resource_owner_key | |
return oa, authorize_url | |
def fetch_access_token(self, uuid, verifier): | |
# query db | |
oa: OAuth = OAuth.objects.filter(uuid=uuid).first() | |
if not oa: | |
raise Exception(f'uuid {uuid} does not exist') | |
client_key, client_secret = oa.client_key, oa.client_secret | |
resource_owner_key, resource_owner_secret = oa.request_token, oa.request_token_secret | |
if not (oa.token and oa.token_secret): | |
oauth = OAuth1(client_key, | |
client_secret=client_secret, | |
resource_owner_key=resource_owner_key, | |
resource_owner_secret=resource_owner_secret, | |
verifier=verifier) | |
res = requests.post(url=API.oauth.access_token, auth=oauth) | |
if res.status_code == 200: | |
token_dict = dict(urldecode(res.text.strip())) | |
else: | |
raise ExceptionWithStatus(f'twitter {res.status_code}: {res.content}', status=res.status_code) | |
token = token_dict['oauth_token'] | |
token_secret = token_dict['oauth_token_secret'] | |
# update db | |
update_x(oa, {'token': token, 'token_secret': token_secret}) | |
oauth = OAuth1Session(client_key, | |
client_secret=client_secret, | |
resource_owner_key=oa.token, | |
resource_owner_secret=oa.token_secret) | |
return oauth |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment