Skip to content

Instantly share code, notes, and snippets.

@Sesim
Last active August 29, 2015 14:21
Show Gist options
  • Save Sesim/6380dedc5db31df2dc51 to your computer and use it in GitHub Desktop.
Save Sesim/6380dedc5db31df2dc51 to your computer and use it in GitHub Desktop.
Gelocation Scraper for Twitter
#!/usr/bin/python
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from requests.packages.urllib3.exceptions import ProtocolError
from sheetsync import Sheet
import xml.etree.ElementTree as ElementTree
import json
import sys
import traceback
from threading import Thread, RLock
import datetime
from dateutil import parser
class DataHandler(Thread):
"""
Thread class that handles provided data and commits it
to a google spreadsheet.
"""
def __init__(self, element):
"""
Default constructor. Calls parent constructor
and parses the given element to retrieve
google drive configuration.
Keywords arguments:
element --- XML root element that contains drive configuration.
"""
Thread.__init__(self)
self.lock = RLock()
self.pool = {}
# Parse XML configuration.
user = element.find("user").text
password = element.find("password").text
document = element.find("document").text
# Creates sheet reference.
self.sheet = Sheet(username=user, password=password, document_key=document)
def push(self, key, value):
"""
Pushes the given entry to the queue.
Keywords arguments:
entry ---
"""
with self.lock:
self.pool[key] = value
def shutdown(self):
""" Terminate current thread execution. """
with self.lock:
self.isRunning = False
def run(self):
"""
Run method implementation. Ensures queue
is not empty and pop data from it for
commits to the sheet.
"""
self.isRunning = True
while self.isRunning:
with self.lock:
if len(self.pool) != 0:
print("DataHandler.run :: inject " + str(len(self.pool)) + " tweets.")
self.sheet.inject(self.pool)
self.pool.clear()
# TODO : Consider sleeping.
class Scraper:
"""
This class allows to scrap twitter live content using
Streaming API. Tweets are selected by search tag and
exported to a target google document file.
"""
class Listener(StreamListener):
"""
Custom listener implementation that write received
data to a google document file.
"""
def __init__(self, element, parent):
"""
Default constructor. Calls parent constructor
and parses the given element to retrieve
google drive configuration.
Keywords arguments:
element --- XML root element that contains drive configuration.
"""
self.handler = DataHandler(element)
self.handler.start()
StreamListener.__init__(self)
self.parent = parent
def shutdown(self):
""" Terminate the internal thread """
self.handler.shutdown()
self.handler.join()
def setFilters(self, filters):
"""
Initializes filters lists.
Spliting in three category : hashtag, reference, and simple search term (tokens).
Keywords arguments:
filters --- List of filter to use.
"""
# Parsing parameters.
self.rawfilters = filters
self.hashtags = [f for f in filters if "#" in f]
self.references = [f for f in filters if "@" in f]
self.tokens = []
print("Scraper.Listener.setFilter :: hashtag filters : " + str(self.hashtags))
print("Scraper.Listener.setFilter :: reference filters : " + str(self.references))
keywords = []
for filter in filters:
if filter in self.hashtags or filter in self.references:
keywords.append(filter[1:])
else:
self.tokens.append(filter)
keywords.append(filter)
print("Scraper.Listener.setFilter :: keywords filter : " + str(keywords))
return keywords
def filter(self, tweet, filters):
"""
Predicate that filters the given tweet using the given list of
filtering token.
Keywords arguments:
tweet --- Target tweet content to filter.
filters --- List of filter token to use.
"""
for filter in filters:
if filter in tweet:
return True
return False
def on_data(self, rawdata):
"""
Callback method that is used for each filtered tweet.
Keywords arguments:
data --- Data that has been parsed by the streaming API.
"""
data = json.loads(rawdata)
if "limit" in data:
return
tweet = data["text"]
# Ensure tweet is valid.
if self.filter(tweet, self.hashtags) or self.filter(tweet, self.references) or self.filter(tweet, self.tokens):
filtered = {}
filtered["text"] = data["text"].encode("utf-8")
filtered["created_at"] = (parser.parse(data["user"]["created_at"].encode("utf-8")) + datetime.timedelta(hours=1)).strftime("%d/%m/%y")
filtered["tweets"] = data["user"]["statuses_count"]
filtered["tweets"] = data["user"]["statuses_count"]
filtered["user.name"] = data["user"]["name"].encode("utf-8")
filtered["user.location"] = data["user"]["location"].encode("utf-8")
filtered["followers"] = data["user"]["followers_count"]
filtered["friends"] = data["user"]["friends_count"]
filtered["date"] = data["created_at"].encode("utf-8")
filtered["coordinates"] = str(data["coordinates"]).encode("utf-8")
print("Scraper.Listener.on_data :: adding data to queue #" + str(data["id"]))
self.handler.push(str(data["id"]), filtered)
def on_error(self, status):
"""
Callback method that is used when an API error is thrown.
Keywords arguments:
status --- Error code.
"""
print("Error status : " + str(status))
def on_timeout(self):
""" Callback method that is used when an connection timeout occurs. """
print("Scraper.Listener.on_timeout :: Timeout catched.")
def on_disconnect(self, notice):
"""
Callback method that is used when a disconnection occurs.
Keywords arguments:
notice --- Notice message received when disconnecting.
"""
print("Scraper.Listener.on_disconnect :: Disconnection occurs : " + str(notice) + ". Resets connection.")
self.parent.connect()
self.parent.scrap(self.rawfilters)
def __init__(self, file):
"""
Default constructor. Initializes connection handler
using given configuration file and setups stream.
Keywords arguments:
file --- Path of the configuration file to use for creating handler.
"""
# Loads XML configuration file.
tree = ElementTree.parse(file)
root = tree.getroot()
# Creates listener.
print("Scraper.__init__ :: Creates stream listener.")
self.listener = Scraper.Listener(root.find("drive"), self)
# Parse twitter parameters.
self.consumer = root.find("consumer")
self.access = root.find("access")
self.connect()
def connect(self):
""" Establishes a connection to the twitter streaming API. """
print("Scraper.connect :: Creates OAuth handler.")
self.handler = OAuthHandler(self.consumer.find("key").text, self.consumer.find("secret").text)
self.handler.set_access_token(self.access.find("token").text, self.access.find("secret").text)
self.stream = Stream(self.handler, self.listener)
def scrap(self, filters):
"""
Starts scraping phase using live streaming API.
Keywords arguments:
filters --- List of target keywords to match.
"""
while True:
try:
self.stream.filter(track=self.listener.setFilters(filters))
except Exception:
self.stream.disconnect()
self.connect()
print("Scraper.scrap :: Error caught, restart.")
continue
# except Exception as err:
# print traceback.format_exc()
# print sys.exc_info()[0]
# print("Scraper.scrap :: Exception caught, abort")
# self.stream.disconnect()
# self.listener.shutdown()
# break
# Main entry point.
if __name__ == "__main__":
print sys.argv
scraper = Scraper("configuration.xml")
filters = sys.argv
filters.pop(0)
print("Receives parameters : " + str(filters))
scraper.scrap(filters)
@Sesim
Copy link
Author

Sesim commented May 13, 2015

It retrieves and packages all dependencies required for the script.

It is a packaged version which contains all libraries required. Nothing else needs to be installed. It just requires Terminal and a Python running.

The scraper injects data into spreadsheet directly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment