Skip to content

Instantly share code, notes, and snippets.

@maxcountryman
Created February 21, 2013 15:41
Show Gist options
  • Save maxcountryman/5005566 to your computer and use it in GitHub Desktop.
Save maxcountryman/5005566 to your computer and use it in GitHub Desktop.
Improved test suite for rauth.service.
# -*- coding: utf-8 -*-
'''
rauth.test_service
------------------
Test suite for rauth.service.
'''
from base import RauthTestCase
from rauth.service import OAuth1Service, OAuth2Service, OflyService, Service
from rauth.session import (OAUTH1_DEFAULT_TIMEOUT, OAUTH2_DEFAULT_TIMEOUT,
OFLY_DEFAULT_TIMEOUT, OAuth1Session, OAuth2Session,
OflySession)
from rauth.utils import FORM_URLENCODED
from datetime import datetime
from hashlib import sha1
from mock import patch
import rauth
import requests
import json
class MutableDatetime(datetime):
def __new__(cls, *args, **kwargs):
return datetime.__new__(datetime, *args, **kwargs)
class MockSha1(object):
def __new__(cls, *args, **kwargs):
return cls
@staticmethod
def hexdigest():
return 'foo'
class HttpMixin(object):
def assert_ok(self, r):
self.assertEqual(json.loads(r.content), {'status': 'ok'})
def test_head(self):
r = self.service.head('http://example.com/')
self.assert_ok(r)
def test_get(self):
r = self.service.get('http://example.com/')
self.assert_ok(r)
def test_post(self):
r = self.service.post('http://example.com/')
self.assert_ok(r)
def test_put(self):
r = self.service.put('http://example.com/')
self.assert_ok(r)
def test_delete(self):
r = self.service.delete('http://example.com/')
self.assert_ok(r)
class OAuth1ServiceTestCase(RauthTestCase, HttpMixin):
def setUp(self):
RauthTestCase.setUp(self)
request_token_url = 'http://example.com/request'
access_token_url = 'http://example.com/access'
authorize_url = 'http://example.com/authorize'
base_url = 'http://example.com/api/'
self.service = OAuth1Service('000',
'111',
name='service',
request_token_url=request_token_url,
access_token_url=access_token_url,
authorize_url=authorize_url,
base_url=base_url)
self.service.request = self.fake_request
self.service.access_token = '123'
self.service.access_token_secret = '456'
@patch.object(rauth.session.HmacSha1Signature, 'sign')
@patch.object(rauth.session, 'time')
@patch.object(rauth.session, 'random')
@patch.object(requests.Session, 'request')
def fake_request(self,
method,
url,
mock_request,
mock_random,
mock_time,
mock_sig,
access_token=None,
access_token_secret=None,
header_auth=False,
**kwargs):
fake_random = 1
fake_time = 1
fake_sig = 'foo'
mock_request.return_value = self.response
mock_random.return_value = fake_random
mock_time.return_value = fake_time
mock_sig.return_value = fake_sig
method = method
url = self.service._set_url(url)
access_token, access_token_secret = \
self.service._parse_access_tokens(access_token,
access_token_secret)
service = Service('service',
self.service.base_url,
self.service.authorize_url)
session = self.service.get_session((access_token, access_token_secret))
r = service.request(session,
method,
url,
header_auth=header_auth,
**kwargs)
oauth_params = {'oauth_consumer_key': self.service.consumer_key,
'oauth_nonce': sha1(str(fake_random)).hexdigest(),
'oauth_signature_method': 'HMAC-SHA1',
'oauth_timestamp': fake_time,
'oauth_token': access_token,
'oauth_version': session.VERSION,
'oauth_signature': fake_sig}
headers = {'Content-Type': FORM_URLENCODED}
if method in ('POST', 'PUT'):
kwargs.setdefault('data', {})
kwargs['data'].update(**oauth_params)
kwargs.setdefault('headers', {})
kwargs['headers'].update(**headers)
else:
kwargs.setdefault('params', {})
kwargs['params'].update(**oauth_params)
mock_request.assert_called_with(method,
url,
timeout=OAUTH1_DEFAULT_TIMEOUT,
**kwargs)
return r
def test_get_session(self):
s = self.service.get_session()
self.assertIsInstance(s, OAuth1Session)
def test_get_session_with_token(self):
token = ('foo', 'bar')
s1 = self.service.get_session(token)
s2 = self.service.get_session(token)
# ensure we are getting back the same object
self.assertIs(s1, s2)
def test_get_raw_request_token(self):
resp = 'oauth_token=foo&oauth_token_secret=bar'
self.response.content = resp
r = self.service.get_raw_request_token()
self.assertEqual(r.content, resp)
def test_get_raw_request_token_missing_request_token_url(self):
self.service.request_token_url = None
resp = 'oauth_token=foo&oauth_token_secret=bar'
self.response.content = resp
with self.assertRaises(TypeError) as e:
self.service.get_raw_request_token()
self.assertEqual(str(e.exception),
'request_token_url must not be None')
def test_get_request_token(self):
self.response.content = 'oauth_token=foo&oauth_token_secret=bar'
request_token, request_token_secret = self.service.get_request_token()
self.assertEqual(request_token, 'foo')
self.assertEqual(request_token_secret, 'bar')
def test_get_authorize_url(self):
self.response.content = 'oauth_token=foo&oauth_token_secret=bar'
request_token, request_token_secret = self.service.get_request_token()
url = self.service.get_authorize_url(request_token)
expected_fmt = 'http://example.com/authorize?oauth_token={0}'
self.assertEqual(url, expected_fmt.format(request_token))
def test_get_raw_access_token(self):
self.response.content = 'oauth_token=foo&oauth_token_secret=bar'
request_token, request_token_secret = self.service.get_request_token()
resp = 'oauth_token=foo&oauth_token_secret=bar'
self.response.content = resp
r = self.service.get_raw_access_token(request_token,
request_token_secret)
self.assertEqual(r.content, resp)
def test_get_raw_access_token_missing_access_token_url(self):
self.response.content = 'oauth_token=foo&oauth_token_secret=bar'
request_token, request_token_secret = self.service.get_request_token()
self.service.access_token_url = None
self.response.content = 'oauth_token=foo&oauth_token_secret=bar'
with self.assertRaises(TypeError) as e:
self.service.get_raw_access_token(request_token,
request_token_secret)
self.assertEqual(str(e.exception),
'access_token_url must not be None')
def test_get_access_token(self):
self.response.content = 'oauth_token=foo&oauth_token_secret=bar'
request_token, request_token_secret = self.service.get_request_token()
self.response.content = 'oauth_token=foo&oauth_token_secret=bar'
access_token, access_token_secret = \
self.service.get_access_token(request_token,
request_token_secret)
self.assertEqual(access_token, 'foo')
self.assertEqual(access_token_secret, 'bar')
def test_request(self):
r = self.service.request('GET', 'baz')
self.assertEqual(json.loads(r.content), {'status': 'ok'})
def test_request_malformed(self):
self.service.access_token_secret = None
with self.assertRaises(TypeError) as e:
self.service.request('GET',
'baz',
access_token='foo')
self.assertEqual(str(e.exception),
'Either both or neither access_token and '
'access_token_secret must be supplied')
class OAuth2ServiceTestCase(RauthTestCase, HttpMixin):
def setUp(self):
RauthTestCase.setUp(self)
self.access_token_url = 'https://example.com/access'
self.authorize_url = 'https://example.com/authorize'
self.base_url = 'https://example/api/'
self.service = OAuth2Service('000',
'111',
access_token_url=self.access_token_url,
authorize_url=self.authorize_url,
base_url=self.base_url)
self.service.request = self.fake_request
self.service.access_token = '123'
@patch.object(requests.Session, 'request')
def fake_request(self,
method,
url,
mock_request,
access_token=None,
**kwargs):
mock_request.return_value = self.response
access_token = access_token or self.service.access_token
url = self.service._set_url(url)
session = self.service.get_session(access_token)
service = Service('service',
self.service.base_url,
self.service.authorize_url)
r = service.request(session, method, url, **kwargs)
kwargs.setdefault('params', {})
kwargs['params'].update(**{'access_token': access_token})
mock_request.assert_called_with(method,
url,
timeout=OAUTH2_DEFAULT_TIMEOUT,
**kwargs)
return r
def test_get_session(self):
s = self.service.get_session()
self.assertIsInstance(s, OAuth2Session)
def test_get_session_with_token(self):
s1 = self.service.get_session('foo')
s2 = self.service.get_session('foo')
# ensure we are getting back the same object
self.assertIs(s1, s2)
def test_get_authorize_url(self):
url = self.service.get_authorize_url()
expected_fmt = 'https://example.com/authorize?client_id={0}'
self.assertEqual(url, expected_fmt.format(self.service.client_id))
def test_get_raw_access_token(self):
resp = 'access_token=123'
self.response.content = resp
r = self.service.get_raw_access_token()
self.assertEqual(r.content, resp)
def test_get_raw_access_token_with_params(self):
resp = 'access_token=123'
self.response.content = resp
r = self.service.get_raw_access_token(params={'a': 'b'})
self.assertEqual(r.content, resp)
def test_get_access_token(self):
self.response.content = 'access_token=123'
access_token = self.service.get_access_token()
self.assertEqual(access_token, '123')
@patch.object(requests.Session, 'request')
def test_request(self, mock_request):
r = self.service.request('GET', 'foo')
self.assertEqual(json.loads(r.content), {'status': 'ok'})
class OflyServiceTestCase(RauthTestCase, HttpMixin):
def setUp(self):
RauthTestCase.setUp(self)
self.authorize_url = 'http://example.com/authorize'
self.base_url = 'http://example.com/api/'
self.service = OflyService('000',
'111',
name='service',
authorize_url=self.authorize_url,
base_url=self.base_url)
self.service.request = self.fake_request
@patch('rauth.session.datetime', MutableDatetime)
@patch('rauth.session.sha1', MockSha1)
@patch.object(requests.Session, 'request')
def fake_request(self,
method,
url,
mock_request,
header_auth=False,
**kwargs):
MutableDatetime.utcnow = classmethod(lambda cls: datetime(1900, 1, 1))
mock_request.return_value = self.response
url = self.service._set_url(url)
params = 'oflyApiSig=foo&'
params += 'oflyAppId={0}&'.format(self.service.app_id)
params += 'oflyHashMeth=SHA1&'
params += 'oflyTimestamp=1900-01-01T00:00:00.0Z'
s = self.service.get_session()
service = Service('service',
self.service.base_url,
self.service.authorize_url)
url = service._set_url(url)
r = service.request(s, method, url, **kwargs)
mock_request.assert_called_with(method,
url,
headers={},
params=params,
timeout=OFLY_DEFAULT_TIMEOUT)
return r
def test_get_session(self):
s = self.service.get_session()
self.assertIsInstance(s, OflySession)
@patch('rauth.session.datetime', MutableDatetime)
@patch('rauth.session.sha1', MockSha1)
def test_get_authorize_url(self):
MutableDatetime.utcnow = classmethod(lambda cls: datetime(1900, 1, 1))
params = 'oflyApiSig=foo&'
params += 'oflyAppId={0}&'.format(self.service.app_id)
params += 'oflyHashMeth=SHA1&'
params += 'oflyTimestamp=1900-01-01T00:00:00.0Z'
expected_url = 'http://example.com/authorize?'
url = self.service.get_authorize_url()
self.assertEqual(url, expected_url + params)
def test_request(self):
r = self.service.request('GET', 'foo')
self.assertEqual(json.loads(r.content), {'status': 'ok'})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment