Skip to content

@lrvick /kralr.py
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Tasks to support Twitter kralr.py
import httplib,pycurl,json
from django.conf import settings
from tasks import ProcessTweet
QUERY="android"
class Twitter():
default_retry_delay = 5
def run(self,query):
self.buffer = ""
self.stream = pycurl.Curl()
self.stream.setopt(pycurl.USERPWD, "%s:%s" % (settings.TWITTER_USER, settings.TWITTER_PASS))
self.stream.setopt(pycurl.URL, "http://stream.twitter.com/1/statuses/filter.json?track=%s" % (query))
self.stream.setopt(pycurl.WRITEFUNCTION, self.on_receive)
self.stream.perform()
def on_receive(self, data):
self.buffer += data
if data.endswith("\r\n") and self.buffer.strip():
ProcessTweet(self.buffer).delay()
self.buffer = ""
import httplib,urlparse,pycurl,json,time,re,sys,time,datetime,os,threading
from celery.task.base import Task
from celery.decorators import task
from django.conf import settings
from models import *
from kral.models import *
@task()
class ProcessTweet(Task):
def run(self, data):
content = json.loads(data)
user_id = content["user"].get('id_str', None)
if user_id is not None:
for url in urls:
if url['expanded_url']:
ExpandURL(url['expanded_url'])
else:
ExpandURL(url['url'])
try:
twitter_user = TwitterUser.objects.get(user_id=user_id)
twitter_user.total_tweets = content["user"]["statuses_count"],
twitter_user.listed = content["user"]["listed_count"],
twitter_user.following = content["user"]["friends_count"],
twitter_user.followers = content["user"]["followers_count"],
twitter_user.save()
except:
twitter_user = TwitterUser (
user_id = user_id,
user_name = content["user"]["screen_name"],
real_name = content["user"]["name"],
#location = content["user"]["location"],
avatar = content["user"]["profile_image_url"],
date = datetime.datetime.fromtimestamp(time.mktime(time.strptime(content["user"]["created_at"], '%a %b %d %H:%M:%S +0000 %Y'))),
language = content["user"]["lang"],
total_tweets = content["user"]["statuses_count"],
#time_zone = content["user"]["time_zone"],
listed = content["user"]["listed_count"],
following = content["user"]["friends_count"],
followers = content["user"]["followers_count"],
geo_enabled = content["user"]["geo_enabled"],
contributors_enabled = content["user"]["contributors_enabled"],
#utc_offset = content["user"]["utc_offset"],
)
twitter_user.save()
try:
twitter_user = TwitterUser.objects.get(user_id=user_id)
twitter_tweet = TwitterTweet (
date = datetime.datetime.fromtimestamp(time.mktime(time.strptime(content["created_at"], '%a %b %d %H:%M:%S +0000 %Y'))),
tweet_id = content["id_str"],
user_id = TwitterUser.objects.get(user_id=content["user"]["id_str"]),
text = content["text"],
#place = content["user"]["place"],
truncated = content['truncated'],
geo = content["user"]["location"],
contributors = content["contributors"],
#retweeted = content['retweeted'],
#irt_status_id = content['in_reply_to_status_id'],
#irt_status_name = content['in_reply_to_status_name'],
#retweet_count = content['retweet_count'],
#geo = content['geo'],
)
twitter_tweet.save()
except:
return False
@task()
class ExpandURL(Task):
def run(self,url, n=1):
headers = {"User-Agent": "Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.7.6) Gecko/20050512 Firefox"}
parsed_url = urlparse.urlsplit(url)
request = urlparse.urlunsplit(('', '', parsed_url.path, parsed_url.query, parsed_url.fragment))
connection = httplib.HTTPConnection(parsed_url.netloc)
try :
connection.request('HEAD', request, "", headers)
response = connection.getresponse()
except:
return "Connection request failed"
current_url = response.getheader('Location')
n += 1
if n > 3:
elif current_url == None:
ProcessURL(url)
else:
ExpandUrl(current_url, n)
@task()
class ProcessURL(Task):
try:
old_link = WebLink.objects.get(url=url)
old_link.total_mentions += 1
old_link.save()
except:
weblink = WebLink(
url = url,
)
weblink.save()
ProcessURL(url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.