Skip to content

Instantly share code, notes, and snippets.

@riccardoscalco
Last active January 11, 2022 16:20
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save riccardoscalco/37c06615ce51534de742 to your computer and use it in GitHub Desktop.
Save riccardoscalco/37c06615ce51534de742 to your computer and use it in GitHub Desktop.
Twitter Influencers

Twitter users visibility by means of regular Markov chains.

This is an attempt to define user visibility on a specific topic. Briefly, tweets are collected via the Twitter streaming API, stored in sqlite databases and then processed in order to create a regular Markov chain. The steady state distribution of the chain defines a metric on the set of Twitter users, which can be used to retrieve an ordered list of users.

Have a look at this paper and this other paper for further details about the mathematical methods.

Be careful, the procedure described here is experimental and it is not meant to be used in production environments.

Fetching and storing from the Twitter streaming

The file tweet_stream.py has to be filled with your Twitter credentials, furthermore change at will keywords and languages. Then start the script:

$ nohup python tweet_stream.py #Python3

The script will save a sqlite database containing informations about the tweets: tweet id, language, text, date of creation, user id, username, mentions/hashtags/urls contained and, in case of retweets, the id and user id of the original tweet. The script keeps running, it is up to you to stop it.

Define a ordered list of twitter users

Dependences: before to continue, please install networkx, dataset and pykov.

Open a Python shell. The following command create a directed graph G from the collected tweets:

>>> import analysis
>>> G, Q = analysis.getGraph('tweets-Thu-Oct--2-06:02:23-2014.db')

G is a netwokx DiGraph object, nodes are Twitter user_ids and edges are links among them (expressed as tuples).

>>> type(G)
<class 'networkx.classes.digraph.DiGraph'>
>>> G.edges()[:3]
[('231907053', '1117325342'), ('2567615524', '928342410'), ('2567615524', '18396319')]
>>> G.nodes()[:3]
['2456481724', '231907053', '2567615524']

Q is a python dictionary, where keys are links among Twitter users and values are the number of times that links have been observed:

>>> Q[('525630064', '282695161')]
3.0

The directed graph results to be disconnected, as expected. The lack of connectivity is a trivial consequence of the fact that each link is created independently from the others, and therefore there is no way to expect the existence of a connecting path between any couple of nodes.

The largest strongly connected component can be found with the following code:

>>> import networkx as nx
>>> scc = list(nx.strongly_connected_component_subgraphs(G))
>>> scc = sorted(scc,key = lambda graph: len(graph.nodes()))
>>> F = scc[-1]

The regular Markov chain is created on top of the largest component:

>>> T = analysis.getTransitionMatrix(F,Q)
>>> type(T)
<class 'pykov.Chain'>

The above chain is ergodic and its steady state can be easily derived. Users with the highest stationary probability are the most visible:

>>> s = T.steady().sort(True)
>>> s[:5]
[(u'14499829', 0.17468580328789371), (u'5402612', 0.037378364822550497), (u'427900496', 0.033809038083563836), (u'2195671183', 0.026123344415636889), (u'17899109', 0.022593710080353133)]

Written with StackEdit.

# -*- coding: utf-8 -*-
import datetime
import sqlite3
import dataset
import pykov
import networkx as nx
def getGraph(path_dbfile):
"""
Example:
path_dbfile = '/path/to/tweets-Fri-Sep--5-12:47:34-2014.db'
"""
tweetsSet = set()
G = nx.DiGraph()
Q = {}
# read the database
db = dataset.connect('sqlite:///' + path_dbfile)
tweets = db[u'tweet']
# create the objects
for tweet in tweets:
# avoid clones
if tweet['id_str'] not in tweetsSet:
tweetsSet.add(tweet['id_str'])
# if the tweet is a retweet add a link (user, original user)
if tweet["rtFrom"]:
G.add_edge(tweet['user'],tweet["rtFrom"])
Q[(tweet['user'],tweet["rtFrom"])] = Q.get((tweet['user'],tweet["rtFrom"]), 0.) + 1
# if the tweet contains mentions add a link (user, mentioned user)
# only in the case user and mentioned user are not equal
if tweet['user_mentions']:
for user in decodeMentions(tweet['user_mentions']):
#if userA RT userB, twitter API return that userA mentions userB. Weird.
if (user != tweet["rtFrom"]) and (user != tweet['user']):
G.add_edge(tweet['user'],user)
Q[(tweet['user'],user)] = Q.get((tweet['user'],user), 0.) + 1
return G, Q
def decodeMentions(data):
"""
"""
return [d.split("'")[1] for d in data.split(",")]
def getTransitionMatrix(G,Q):
"""
"""
T = pykov.Chain()
for edge in G.edges():
T[edge] = Q[edge]
T.stochastic()
return T
# fork of https://github.com/adamdrake/twitterstreamtemplate
from threading import Thread
from queue import Queue
from twython import TwythonStreamer
from requests.exceptions import ChunkedEncodingError
import dataset, datetime
class TwitterStream(TwythonStreamer):
def __init__(self, consumer_key, consumer_secret, token, token_secret, tqueue):
self.tweet_queue = tqueue
super(TwitterStream, self).__init__(consumer_key, consumer_secret, token, token_secret)
def on_success(self, data):
if 'text' in data:
self.tweet_queue.put(data)
def on_error(self, status_code, data):
print(status_code)
# Want to stop trying to get data because of the error?
# Uncomment the next line!
# self.disconnect()
def stream_tweets(tweets_queue):
# Input your credentials below
consumer_key = ''
consumer_secret = ''
token = ''
token_secret = ''
try:
stream = TwitterStream(consumer_key, consumer_secret, token, token_secret, tweets_queue)
stream.statuses.filter(track='ebola, mers, antivirals, bigpharma antivirals, bigpharma pandemic flu, bird flu, H1N1, H5N1, H7N9, infectious outbreak, oseltamivir, pandemic, pandemic flu, pandemic vaccine, relenza, tamiflu, zanamivir, vaccine, vaccines, vaccinated, vaccinates, vaccinate, vaccination, vaccinations', language='it,en,fr,de,es')
except ChunkedEncodingError:
# Sometimes the API sends back one byte less than expected which results in an exception in the
# current version of the requests library
stream_tweets(tweet_queue)
def process_tweets(tweets_queue):
while True:
data = tweets_queue.get()
# Do something with the tweet
obs = ['retweet_count','favorite_count','lang','text','created_at','id_str']
tmp = dict([(i, data[i]) for i in obs if i in data])
# save coordinates
try:
if data['coordinates']:
tmp['coordinates'] = str(data['coordinates']['coordinates'])
except KeyError:
pass
# save user id
try:
if data['user']:
tmp['user'] = data['user']['id_str']
tmp['username'] = data['user']['name']
except KeyError:
pass
# if RT, save user id and tweet id of the original tweet
try:
if data['retweeted_status']:
tmp['rtOf'] = data['retweeted_status']['id_str']
tmp['rtFrom'] = data['retweeted_status']['user']['id_str']
except KeyError:
pass
# save mentions
try:
if data['entities']['user_mentions']:
tmp['user_mentions'] = str([i['id_str'] for i in data['entities']['user_mentions']])
except KeyError:
pass
# save hashtags
try:
if data['entities']['hashtags']:
tmp['hashtags'] = str([i['text'] for i in data['entities']['hashtags']])
except KeyError:
pass
# save urls
try:
if data['entities']['urls']:
tmp['urls'] = str([i['expanded_url'] for i in data['entities']['urls']])
except KeyError:
pass
#print tmp
table.insert(tmp)
# Indicate that the above enqueued task is complete.
tweets_queue.task_done()
if __name__ == '__main__':
now = datetime.datetime.today().ctime().replace(' ','-')
db = dataset.connect('sqlite:///tweets-'+now+'.db')
table = db['tweet']
tweet_queue = Queue()
Thread(target=stream_tweets, args=(tweet_queue,), daemon=True).start()
process_tweets(tweet_queue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment