Skip to content

Instantly share code, notes, and snippets.

@keyz182
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save keyz182/00707ac10e3af0608027 to your computer and use it in GitHub Desktop.
Save keyz182/00707ac10e3af0608027 to your computer and use it in GitHub Desktop.
Grab Tweets from Twitter and split into geotagged and non geotagged tables.
#!/usr/bin/python
__author__ = 'keyz'
from Streamer import SentinelStreamer, StreamForever
twitter_auth = { 'app_key' : '1234',
'app_secret' : '5678',
'oauth_token' : '9012',
'oauth_token_secret' : '3456' }
location = '-10,45,10,55'
table_name = "geo_tweets"
non_geo_table_name = "tweets"
dbString = "dbname=localhost user=user password=password"
stream = PostgisStreamer(twitter_auth=twitter_auth,
table_name=table_name,
non_geo_table_name=non_geo_table_name,
dbString=dbString)
StreamForever(stream,location)
__author__ = 'keyz'
import sys
import psycopg2
import ppygis
import simplejson
from datetime import datetime
import time
from twython import TwythonStreamer
def StreamForever(stream,location,track):
while True:
try:
stream.statuses.filter(locations=location,track=track)
except KeyboardInterrupt:
print "Ctrl+C Caught, ByeBye!"
sys.exit()
except Exception as ex:
print ex
print "Something broke! Restarting!"
class PostgisStreamer(TwythonStreamer):
srid = 4326
init_db = '''-- Enable PostGIS (includes raster)
--CREATE EXTENSION IF NOT EXISTS postgis;
-- Enable Topology
--CREATE EXTENSION IF NOT EXISTS postgis_topology;
-- Create tweet table
CREATE TABLE IF NOT EXISTS %(tbl)s (
id bigint PRIMARY KEY NOT NULL,
tweet json NOT NULL,
geometry GEOMETRY(Point,4326) NULL,
timestamp TIMESTAMP WITH TIME ZONE,
CONSTRAINT enforce_srid_geometry CHECK (st_srid(geometry) = 3857));
-- Create spatial index
CREATE INDEX %(tbl)s_gix ON %(tbl)s USING GIST(geometry);
CREATE INDEX %(tbl)s_timezone_ix ON %(tbl)s ("timestamp" ASC NULLS LAST);
CREATE TABLE %(tbl2)s (
id bigint PRIMARY KEY NOT NULL,
tweet json NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE,
)
CREATE INDEX %(tbl2)s_timezone_ix ON %(tbl2)s ("timestamp" ASC NULLS LAST);
'''
'''
twitter_auth (dict) - dict containing the twitter api auth keys (app_key, app_secret, oauth_token, oauth_token_secret)
dbString (str) - Connection string for psycopg2 to connect to postgres.
table_name (str) - The name of the table to use in the database
'''
def __init__(self, twitter_auth, table_name, non_geo_table_name, dbString):
super(SentinelStreamer,self).__init__(twitter_auth['app_key'],twitter_auth['app_secret'],
twitter_auth['oauth_token'],twitter_auth['oauth_token_secret'])
#Check the table_name arg, and substitute into appropriate queries
if table_name is not None and non_geo_table_name is not None:
self.table_name = table_name
self.non_geo_table_name = non_geo_table_name
self.insert_coords_sql = 'INSERT INTO ' + self.table_name + ' (id,tweet,geometry,timestamp) VALUES (%s,%s,%s,%s);'
self.insert_sql = 'INSERT INTO ' + self.non_geo_table_name + ' (id,tweet,timestamp) VALUES (%s,%s,%s);'
self.init_db = self.init_db % {'tbl':self.table_name,'tbl2':self.non_geo_table_name}
else:
self.log("No tablename found!")
exit()
#Check the dbString arg, then create a connection to the DB. If the table doesn't exist, create it.
if dbString is not None:
self.dbString = dbString
self.conn = psycopg2.connect(self.dbString)
self.conn.autocommit=True
self.cur = self.conn.cursor()
self.cur.execute("select exists(select * from information_schema.tables where table_name=%s)", (self.table_name,))
if not self.cur.fetchone()[0]:
self.cur.execute(self.init_db)
else:
self.log("No dbString found!")
exit()
'''
Quick logging method, replace later with proper logging api/library
'''
def log(self,line):
print '%s: %s' %(datetime.now(),line)
'''
Called when a tweet is successfully received.
'''
def on_success(self, data):
self.log("Incoming tweet!")
#Check the tweet contains needed info
if 'text' in data:
self.log( data['text'].encode('utf-8'))
if 'id' in data:
id = data['id']
else:
self.log("No ID found in tweet data, ignoring!")
return
if 'created_at' in data:
date = self.stringToDate(data['created_at'])
else:
self.log("Created_at missing")
return
#Dump the tweet to a json string for storage in Postgres' Json type
json = simplejson.dumps(data)
#If coords are populated in the tweet, use them, otherwise ignore.
if 'coordinates' in data:
if data['coordinates'] is not None:
self.log("It's Geotagged!")
c = data['coordinates']
#Create the proper ppygis object and set srid so that it gets formated for postgres properly
point = ppygis.Point(c['coordinates'][0],c['coordinates'][1])
point.srid = self.srid
try:
self.cur.execute(self.insert_coords_sql, (id, json, point,date))
except Exception as e:
self.log("Exception encountered wrtiting tweet")
self.log(e)
return
#Write non geotagged tweet
try:
self.cur.execute(self.insert_sql, (id, json,date))
except Exception as e:
self.log("Exception encountered wrtiting tweet")
self.log(e)
def on_error(self, status_code, data):
if status_code is not 200:
self.log( status_code)
# Want to stop trying to get data because of the error?
# Uncomment the next line!
# self.disconnect()
'''
Convert Tweets 'created_at' date string to python date.
'''
def stringToDate(self,date_str):
time_struct = time.strptime(date_str, "%a %b %d %H:%M:%S +0000 %Y")#Tue Apr 26 08:57:55 +0000 2011
return datetime.fromtimestamp(time.mktime(time_struct))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment