Skip to content

Instantly share code, notes, and snippets.

@freeatnet
Created November 11, 2012 15:36
Show Gist options
  • Save freeatnet/4055240 to your computer and use it in GitHub Desktop.
Save freeatnet/4055240 to your computer and use it in GitHub Desktop.
import cookielib
import gzip
import httplib
import StringIO
import urllib
import urllib2
class SmartRedirectHandler(urllib2.HTTPRedirectHandler):
def http_error_301(self, req, fp, code, msg, headers):
result = urllib2.HTTPRedirectHandler.http_error_301(
self, req, fp, code, msg, headers)
result.status = code
return result
def http_error_302(self, req, fp, code, msg, headers):
result = urllib2.HTTPRedirectHandler.http_error_302(
self, req, fp, code, msg, headers)
result.status = code
return result
class DefaultErrorHandler(urllib2.HTTPDefaultErrorHandler):
def http_error_default(self, req, fp, code, msg, headers):
result = urllib2.HTTPError(
req.get_full_url(), code, msg, headers, fp)
result.status = code
return result
class Conn:
__reqest = None
__opener = None
__fh = None
__host = ''
__page = ''
__extra_headers = {}
__data = {'get': {}, 'post': {}}
__cj = None
__result = {}
def __init__(self, host, page):
self.__host = host
self.__page = page
self.__data = {'get': {}, 'post': {}}
if not self.__class__.__extra_headers == {}:
self.__extra_headers = self.__class__.__extra_headers.copy()
return None
def reset_data(self):
self.__data = {'get': {}, 'post': {}}
return self
def get_data(self, data):
if not isinstance(data, dict):
raise TypeError, "Argument data is expected to be dict"
self.__set_data('get', data)
return self
def post_data(self, data):
if not isinstance(data, dict):
raise TypeError, "Argument data is expected to be dict"
self.__set_data('post', data)
return self
def __set_data(self, method, data):
self.__data[method].update(data)
return None
def set_cookiejar(self, jar=False, default=False):
if not (jar == False or isinstance(jar, cookielib.CookieJar)):
raise TypeError, "Argument jar is expected to be cookielib.CookieJar instance"
if default == True:
if not self.__class__.__cj == None:
self.__cj = self.__class__.__cj
else:
self.__cj = self.__class__.__cj = cookielib.CookieJar()
else:
self.__cj = jar
return self
def __set_global_headers(self, type, content):
self.__class__.__extra_headers[type] = content
return None
def __set_extra_headers(self, type, content):
self.__extra_headers[type] = content
return None
def set_authorization(self, auth_type, username, password):
auth_string = ""
if auth_type.lower() == "basic":
from base64 import b64encode
auth_string += "Basic "
auth_string += b64encode("%s:%s" % (username, password, ))
self.__set_global_headers('Authorization', auth_string)
print self.__class__.__extra_headers
return self
def set_useragent(self, agentstring):
self.__set_global_headers('User-Agent', agentstring)
return self
def set_referer(self, referer):
self.__set_extra_headers('Referer', referer)
return self
def set_xforwarded(self, client):
self.__set_extra_headers('Client-IP', client)
self.__set_extra_headers('X-Forwarded-For', client)
return self
def execute(self):
full_url = '%s/%s?%s' % (self.__host, self.__page, urllib.urlencode(self.__data['get']))
self.__request = urllib2.Request(full_url)
if len(self.__extra_headers) > 0:
for (k, v) in self.__extra_headers.items():
self.__request.add_header(k, v)
self.__request.add_header('Accept-encoding', 'gzip')
self.__request.add_header('Connection', 'close')
if not self.__data['post'] == {}:
self.__request.add_data(urllib.urlencode(self.__data['post']))
self.__opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel=1), SmartRedirectHandler(), DefaultErrorHandler())
if not self.__cj == None:
self.__opener.add_handler(urllib2.HTTPCookieProcessor(self.__cj))
self.__fh = self.__opener.open(self.__request)
return self
def read(self):
self.__result['data'] = self.__fh.read()
if hasattr(self.__fh, 'headers'):
if self.__fh.headers.get('content-encoding') == 'gzip':
# data came back gzip-compressed, decompress it
self.__result['data'] = gzip.GzipFile(fileobj=StringIO.StringIO(self.__result['data'])).read()
if self.__fh.headers.get('location'):
self.__result['location'] = self.__fh.headers.get('location')
if hasattr(self.__fh, 'url'):
self.__result['url'] = self.__fh.url
self.__result['status'] = 200
if hasattr(self.__fh, 'status'):
self.__result['status'] = self.__fh.status
self.__fh.close()
return self.__result
def get_url(self):
url = '%s/%s?%s' % (self.__host, self.__page, urllib.urlencode(self.__data['get']))
return url
dict = {'key1': 'value1'}
c = Conn('http://example.com', 'endpoint').get_data(dict).execute()
c.read()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment