Skip to content

Instantly share code, notes, and snippets.

@dannyroa
Created May 1, 2012 20:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dannyroa/2571132 to your computer and use it in GitHub Desktop.
Save dannyroa/2571132 to your computer and use it in GitHub Desktop.
yelp client
import json
import oauth2
import optparse
import urllib
import urllib2
from django.conf import settings
class YelpClient:
host = 'api.yelp.com'
token = None
token_secret = None
consumer = None
consumer_key = None
consumer_secret = None
def __init__(self, consumer_key=None, consumer_secret=None, token=None, token_secret=None):
if consumer_key:
self.consumer_key = consumer_key
else:
self.consumer_key = settings.YELP_CONSUMER_KEY
if consumer_key:
self.consumer_secret = consumer_secret
else:
self.consumer_secret = settings.YELP_CONSUMER_SECRET
if token:
self.token = token
else:
self.token = settings.YELP_TOKEN
if token_secret:
self.token_secret = token_secret
else:
self.token_secret = settings.YELP_TOKEN_SECRET
self.consumer = oauth2.Consumer(self.consumer_key, self.consumer_secret)
def get_business(self, yelp_id):
path = '/v2/business/%s' % yelp_id
url = 'http://%s%s' % (self.host, path)
# Sign the URL
oauth_request = oauth2.Request('GET', url, {})
oauth_request.update({'oauth_nonce': oauth2.generate_nonce(),
'oauth_timestamp': oauth2.generate_timestamp(),
'oauth_token': self.token,
'oauth_consumer_key': self.consumer_key})
oauth_token = oauth2.Token(self.token, self.token_secret)
oauth_request.sign_request(oauth2.SignatureMethod_HMAC_SHA1(), self.consumer, oauth_token)
signed_url = oauth_request.to_url()
#print 'Signed URL: %s\n' % (signed_url,)
# Connect
try:
conn = urllib2.urlopen(signed_url, None)
try:
response = json.loads(conn.read())
finally:
conn.close()
except urllib2.HTTPError, error:
response = json.loads(error.read())
return response
def search_businesses(self, term="", location=None, bounds=None, ll=None, offset=None, limit=20, cc=None, lang=None, cll=None, radius_filter=40000, category_filter=None):
url_params = {}
if term:
url_params['term'] = term
if location:
url_params['location'] = location
if bounds:
url_params['bounds'] = bounds
if ll:
url_params['ll'] = ll
if offset:
url_params['offset'] = offset
if limit:
url_params['limit'] = limit
if cc:
url_params['cc'] = cc
if lang:
url_params['lang'] = lang
if cll:
url_params['cll'] = cll
if radius_filter:
url_params['radius_filter'] = radius_filter
if category_filter:
url_params['category_filter'] = category_filter
encoded_params = None
if url_params:
encoded_params = urllib.urlencode(url_params)
path = '/v2/search'
url = 'http://%s%s?%s' % (self.host, path, encoded_params)
# Sign the URL
oauth_request = oauth2.Request('GET', url, {})
oauth_request.update({'oauth_nonce': oauth2.generate_nonce(),
'oauth_timestamp': oauth2.generate_timestamp(),
'oauth_token': self.token,
'oauth_consumer_key': self.consumer_key})
oauth_token = oauth2.Token(self.token, self.token_secret)
oauth_request.sign_request(oauth2.SignatureMethod_HMAC_SHA1(), self.consumer, oauth_token)
signed_url = oauth_request.to_url()
#print 'Signed URL: %s\n' % (signed_url,)
# Connect
try:
conn = urllib2.urlopen(signed_url, None)
try:
response = json.loads(conn.read())
finally:
conn.close()
except urllib2.HTTPError, error:
response = json.loads(error.read())
return response['businesses']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment