Created
February 21, 2013 15:41
-
-
Save maxcountryman/5005566 to your computer and use it in GitHub Desktop.
Improved test suite for rauth.service.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
''' | |
rauth.test_service | |
------------------ | |
Test suite for rauth.service. | |
''' | |
from base import RauthTestCase | |
from rauth.service import OAuth1Service, OAuth2Service, OflyService, Service | |
from rauth.session import (OAUTH1_DEFAULT_TIMEOUT, OAUTH2_DEFAULT_TIMEOUT, | |
OFLY_DEFAULT_TIMEOUT, OAuth1Session, OAuth2Session, | |
OflySession) | |
from rauth.utils import FORM_URLENCODED | |
from datetime import datetime | |
from hashlib import sha1 | |
from mock import patch | |
import rauth | |
import requests | |
import json | |
class MutableDatetime(datetime): | |
def __new__(cls, *args, **kwargs): | |
return datetime.__new__(datetime, *args, **kwargs) | |
class MockSha1(object): | |
def __new__(cls, *args, **kwargs): | |
return cls | |
@staticmethod | |
def hexdigest(): | |
return 'foo' | |
class HttpMixin(object): | |
def assert_ok(self, r): | |
self.assertEqual(json.loads(r.content), {'status': 'ok'}) | |
def test_head(self): | |
r = self.service.head('http://example.com/') | |
self.assert_ok(r) | |
def test_get(self): | |
r = self.service.get('http://example.com/') | |
self.assert_ok(r) | |
def test_post(self): | |
r = self.service.post('http://example.com/') | |
self.assert_ok(r) | |
def test_put(self): | |
r = self.service.put('http://example.com/') | |
self.assert_ok(r) | |
def test_delete(self): | |
r = self.service.delete('http://example.com/') | |
self.assert_ok(r) | |
class OAuth1ServiceTestCase(RauthTestCase, HttpMixin): | |
def setUp(self): | |
RauthTestCase.setUp(self) | |
request_token_url = 'http://example.com/request' | |
access_token_url = 'http://example.com/access' | |
authorize_url = 'http://example.com/authorize' | |
base_url = 'http://example.com/api/' | |
self.service = OAuth1Service('000', | |
'111', | |
name='service', | |
request_token_url=request_token_url, | |
access_token_url=access_token_url, | |
authorize_url=authorize_url, | |
base_url=base_url) | |
self.service.request = self.fake_request | |
self.service.access_token = '123' | |
self.service.access_token_secret = '456' | |
@patch.object(rauth.session.HmacSha1Signature, 'sign') | |
@patch.object(rauth.session, 'time') | |
@patch.object(rauth.session, 'random') | |
@patch.object(requests.Session, 'request') | |
def fake_request(self, | |
method, | |
url, | |
mock_request, | |
mock_random, | |
mock_time, | |
mock_sig, | |
access_token=None, | |
access_token_secret=None, | |
header_auth=False, | |
**kwargs): | |
fake_random = 1 | |
fake_time = 1 | |
fake_sig = 'foo' | |
mock_request.return_value = self.response | |
mock_random.return_value = fake_random | |
mock_time.return_value = fake_time | |
mock_sig.return_value = fake_sig | |
method = method | |
url = self.service._set_url(url) | |
access_token, access_token_secret = \ | |
self.service._parse_access_tokens(access_token, | |
access_token_secret) | |
service = Service('service', | |
self.service.base_url, | |
self.service.authorize_url) | |
session = self.service.get_session((access_token, access_token_secret)) | |
r = service.request(session, | |
method, | |
url, | |
header_auth=header_auth, | |
**kwargs) | |
oauth_params = {'oauth_consumer_key': self.service.consumer_key, | |
'oauth_nonce': sha1(str(fake_random)).hexdigest(), | |
'oauth_signature_method': 'HMAC-SHA1', | |
'oauth_timestamp': fake_time, | |
'oauth_token': access_token, | |
'oauth_version': session.VERSION, | |
'oauth_signature': fake_sig} | |
headers = {'Content-Type': FORM_URLENCODED} | |
if method in ('POST', 'PUT'): | |
kwargs.setdefault('data', {}) | |
kwargs['data'].update(**oauth_params) | |
kwargs.setdefault('headers', {}) | |
kwargs['headers'].update(**headers) | |
else: | |
kwargs.setdefault('params', {}) | |
kwargs['params'].update(**oauth_params) | |
mock_request.assert_called_with(method, | |
url, | |
timeout=OAUTH1_DEFAULT_TIMEOUT, | |
**kwargs) | |
return r | |
def test_get_session(self): | |
s = self.service.get_session() | |
self.assertIsInstance(s, OAuth1Session) | |
def test_get_session_with_token(self): | |
token = ('foo', 'bar') | |
s1 = self.service.get_session(token) | |
s2 = self.service.get_session(token) | |
# ensure we are getting back the same object | |
self.assertIs(s1, s2) | |
def test_get_raw_request_token(self): | |
resp = 'oauth_token=foo&oauth_token_secret=bar' | |
self.response.content = resp | |
r = self.service.get_raw_request_token() | |
self.assertEqual(r.content, resp) | |
def test_get_raw_request_token_missing_request_token_url(self): | |
self.service.request_token_url = None | |
resp = 'oauth_token=foo&oauth_token_secret=bar' | |
self.response.content = resp | |
with self.assertRaises(TypeError) as e: | |
self.service.get_raw_request_token() | |
self.assertEqual(str(e.exception), | |
'request_token_url must not be None') | |
def test_get_request_token(self): | |
self.response.content = 'oauth_token=foo&oauth_token_secret=bar' | |
request_token, request_token_secret = self.service.get_request_token() | |
self.assertEqual(request_token, 'foo') | |
self.assertEqual(request_token_secret, 'bar') | |
def test_get_authorize_url(self): | |
self.response.content = 'oauth_token=foo&oauth_token_secret=bar' | |
request_token, request_token_secret = self.service.get_request_token() | |
url = self.service.get_authorize_url(request_token) | |
expected_fmt = 'http://example.com/authorize?oauth_token={0}' | |
self.assertEqual(url, expected_fmt.format(request_token)) | |
def test_get_raw_access_token(self): | |
self.response.content = 'oauth_token=foo&oauth_token_secret=bar' | |
request_token, request_token_secret = self.service.get_request_token() | |
resp = 'oauth_token=foo&oauth_token_secret=bar' | |
self.response.content = resp | |
r = self.service.get_raw_access_token(request_token, | |
request_token_secret) | |
self.assertEqual(r.content, resp) | |
def test_get_raw_access_token_missing_access_token_url(self): | |
self.response.content = 'oauth_token=foo&oauth_token_secret=bar' | |
request_token, request_token_secret = self.service.get_request_token() | |
self.service.access_token_url = None | |
self.response.content = 'oauth_token=foo&oauth_token_secret=bar' | |
with self.assertRaises(TypeError) as e: | |
self.service.get_raw_access_token(request_token, | |
request_token_secret) | |
self.assertEqual(str(e.exception), | |
'access_token_url must not be None') | |
def test_get_access_token(self): | |
self.response.content = 'oauth_token=foo&oauth_token_secret=bar' | |
request_token, request_token_secret = self.service.get_request_token() | |
self.response.content = 'oauth_token=foo&oauth_token_secret=bar' | |
access_token, access_token_secret = \ | |
self.service.get_access_token(request_token, | |
request_token_secret) | |
self.assertEqual(access_token, 'foo') | |
self.assertEqual(access_token_secret, 'bar') | |
def test_request(self): | |
r = self.service.request('GET', 'baz') | |
self.assertEqual(json.loads(r.content), {'status': 'ok'}) | |
def test_request_malformed(self): | |
self.service.access_token_secret = None | |
with self.assertRaises(TypeError) as e: | |
self.service.request('GET', | |
'baz', | |
access_token='foo') | |
self.assertEqual(str(e.exception), | |
'Either both or neither access_token and ' | |
'access_token_secret must be supplied') | |
class OAuth2ServiceTestCase(RauthTestCase, HttpMixin): | |
def setUp(self): | |
RauthTestCase.setUp(self) | |
self.access_token_url = 'https://example.com/access' | |
self.authorize_url = 'https://example.com/authorize' | |
self.base_url = 'https://example/api/' | |
self.service = OAuth2Service('000', | |
'111', | |
access_token_url=self.access_token_url, | |
authorize_url=self.authorize_url, | |
base_url=self.base_url) | |
self.service.request = self.fake_request | |
self.service.access_token = '123' | |
@patch.object(requests.Session, 'request') | |
def fake_request(self, | |
method, | |
url, | |
mock_request, | |
access_token=None, | |
**kwargs): | |
mock_request.return_value = self.response | |
access_token = access_token or self.service.access_token | |
url = self.service._set_url(url) | |
session = self.service.get_session(access_token) | |
service = Service('service', | |
self.service.base_url, | |
self.service.authorize_url) | |
r = service.request(session, method, url, **kwargs) | |
kwargs.setdefault('params', {}) | |
kwargs['params'].update(**{'access_token': access_token}) | |
mock_request.assert_called_with(method, | |
url, | |
timeout=OAUTH2_DEFAULT_TIMEOUT, | |
**kwargs) | |
return r | |
def test_get_session(self): | |
s = self.service.get_session() | |
self.assertIsInstance(s, OAuth2Session) | |
def test_get_session_with_token(self): | |
s1 = self.service.get_session('foo') | |
s2 = self.service.get_session('foo') | |
# ensure we are getting back the same object | |
self.assertIs(s1, s2) | |
def test_get_authorize_url(self): | |
url = self.service.get_authorize_url() | |
expected_fmt = 'https://example.com/authorize?client_id={0}' | |
self.assertEqual(url, expected_fmt.format(self.service.client_id)) | |
def test_get_raw_access_token(self): | |
resp = 'access_token=123' | |
self.response.content = resp | |
r = self.service.get_raw_access_token() | |
self.assertEqual(r.content, resp) | |
def test_get_raw_access_token_with_params(self): | |
resp = 'access_token=123' | |
self.response.content = resp | |
r = self.service.get_raw_access_token(params={'a': 'b'}) | |
self.assertEqual(r.content, resp) | |
def test_get_access_token(self): | |
self.response.content = 'access_token=123' | |
access_token = self.service.get_access_token() | |
self.assertEqual(access_token, '123') | |
@patch.object(requests.Session, 'request') | |
def test_request(self, mock_request): | |
r = self.service.request('GET', 'foo') | |
self.assertEqual(json.loads(r.content), {'status': 'ok'}) | |
class OflyServiceTestCase(RauthTestCase, HttpMixin): | |
def setUp(self): | |
RauthTestCase.setUp(self) | |
self.authorize_url = 'http://example.com/authorize' | |
self.base_url = 'http://example.com/api/' | |
self.service = OflyService('000', | |
'111', | |
name='service', | |
authorize_url=self.authorize_url, | |
base_url=self.base_url) | |
self.service.request = self.fake_request | |
@patch('rauth.session.datetime', MutableDatetime) | |
@patch('rauth.session.sha1', MockSha1) | |
@patch.object(requests.Session, 'request') | |
def fake_request(self, | |
method, | |
url, | |
mock_request, | |
header_auth=False, | |
**kwargs): | |
MutableDatetime.utcnow = classmethod(lambda cls: datetime(1900, 1, 1)) | |
mock_request.return_value = self.response | |
url = self.service._set_url(url) | |
params = 'oflyApiSig=foo&' | |
params += 'oflyAppId={0}&'.format(self.service.app_id) | |
params += 'oflyHashMeth=SHA1&' | |
params += 'oflyTimestamp=1900-01-01T00:00:00.0Z' | |
s = self.service.get_session() | |
service = Service('service', | |
self.service.base_url, | |
self.service.authorize_url) | |
url = service._set_url(url) | |
r = service.request(s, method, url, **kwargs) | |
mock_request.assert_called_with(method, | |
url, | |
headers={}, | |
params=params, | |
timeout=OFLY_DEFAULT_TIMEOUT) | |
return r | |
def test_get_session(self): | |
s = self.service.get_session() | |
self.assertIsInstance(s, OflySession) | |
@patch('rauth.session.datetime', MutableDatetime) | |
@patch('rauth.session.sha1', MockSha1) | |
def test_get_authorize_url(self): | |
MutableDatetime.utcnow = classmethod(lambda cls: datetime(1900, 1, 1)) | |
params = 'oflyApiSig=foo&' | |
params += 'oflyAppId={0}&'.format(self.service.app_id) | |
params += 'oflyHashMeth=SHA1&' | |
params += 'oflyTimestamp=1900-01-01T00:00:00.0Z' | |
expected_url = 'http://example.com/authorize?' | |
url = self.service.get_authorize_url() | |
self.assertEqual(url, expected_url + params) | |
def test_request(self): | |
r = self.service.request('GET', 'foo') | |
self.assertEqual(json.loads(r.content), {'status': 'ok'}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment