Skip to content

Instantly share code, notes, and snippets.

@Segerberg
Last active September 15, 2018 11:00
Show Gist options
  • Save Segerberg/3877ced5d16171e8902860816b9f82d5 to your computer and use it in GitHub Desktop.
Save Segerberg/3877ced5d16171e8902860816b9f82d5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
This utility extracts media urls from tweet jsonl.gz and save them as warc records.
Warcio (https://github.com/webrecorder/warcio) is a dependency and before you can use it you need to:
% pip install warcio
You run it like this:
% python media2warc.py /mnt/tweets/ferguson/tweets-0001.jsonl.gz /mnt/tweets/ferguson/tweets-0001.warc.gz
The input file will be checked for duplicate urls to avoid duplicates within the input file. Subsequent runs
will be deduplicated using a sqlite db. If an identical-payload-digest is found a revist record is created.
The script is able to fetch media resources in multiple threads (maximum 2) by passing --threads <int> (default to a single thread).
Please be careful modifying this script to use more than two threads since it can be interpreted as a DoS-attack.
"""
import queue
import threading
import requests
import time
from datetime import timedelta
from warcio.warcwriter import WARCWriter
from warcio.statusandheaders import StatusAndHeaders
import json
import gzip
import sqlite3
import hashlib
import os
import logging
import argparse
q = queue.Queue()
out_queue = queue.Queue()
BLOCK_SIZE = 25600
class getResource(threading.Thread):
def __init__(self, q):
threading.Thread.__init__(self)
self.q = q
self.rlock = threading.Lock()
self.out_queue = out_queue
self.d = dedup()
def run(self):
while True:
host = self.q.get()
try:
r = requests.get(host, headers={'Accept-Encoding': 'identity'}, stream=True)
data = [r.raw.headers.items(), r.raw, host, r.status_code, r.reason]
print(data[2])
self.out_queue.put(data)
self.q.task_done()
except requests.exceptions.RequestException as e:
logging.error('%s for %s', e, data[2])
print(e)
self.q.task_done()
continue
class write2Warc(threading.Thread):
def __init__(self, out_queue, warcfile):
threading.Thread.__init__(self)
self.out_queue = out_queue
self.lock = threading.Lock()
self.warcfile = warcfile
self.dedup = dedup()
def run(self):
with open(self.warcfile, 'ab') as output:
while True:
self.lock.acquire()
data = self.out_queue.get()
writer = WARCWriter(output, gzip=False)
headers_list = data[0]
http_headers = StatusAndHeaders('{} {}'.format(data[3], data[4]), headers_list, protocol='HTTP/1.0')
record = writer.create_warc_record(data[2], 'response', payload=data[1], http_headers=http_headers)
h = hashlib.sha1()
h.update(record.raw_stream.read(BLOCK_SIZE))
if self.dedup.lookup(h.hexdigest()):
record = writer.create_warc_record(data[2], 'revisit',
http_headers=http_headers)
writer.write_record(record)
self.out_queue.task_done()
self.lock.release()
else:
self.dedup.save(h.hexdigest(),data[2])
record.raw_stream.seek(0)
writer.write_record(record)
self.out_queue.task_done()
self.lock.release()
class dedup:
"""
Stolen from warcprox
https://github.com/internetarchive/warcprox/blob/master/warcprox/dedup.py
"""
def __init__(self, file='./dedup.db'):
self.file = file
def start(self):
conn = sqlite3.connect(self.file)
conn.execute(
'create table if not exists dedup ('
' key varchar(300) primary key,'
' value varchar(4000)'
');')
conn.commit()
conn.close()
def save(self, digest_key, url):
conn = sqlite3.connect(self.file)
conn.execute(
'insert or replace into dedup (key, value) values (?, ?)',
(digest_key, url))
conn.commit()
conn.close()
def lookup(self, digest_key, url=None):
result = False
conn = sqlite3.connect(self.file)
cursor = conn.execute('select value from dedup where key = ?', (digest_key,))
result_tuple = cursor.fetchone()
conn.close()
if result_tuple:
result = True
return result
def parse_extended_entities(extended_entities_dict):
"""Parse media file URL:s form tweet data
:extended_entities_dict:
:returns: list of media file urls
"""
urls = []
if "media" in extended_entities_dict.keys():
for item in extended_entities_dict["media"]:
# add static image
urls.append(item["media_url_https"])
# add best quality video file
if "video_info" in item.keys():
max_bitrate = -1 # handle twitters occasional bitrate=0
video_url = None
for video in item["video_info"]["variants"]:
if "bitrate" in video.keys() and "content_type" in video.keys():
if video["content_type"] == "video/mp4":
if int(video["bitrate"]) > max_bitrate:
max_bitrate = int(video["bitrate"])
video_url = video["url"]
if not video_url:
print("Error: No bitrate / content_type")
print(item["video_info"])
else:
urls.append(video_url)
return urls
def parse_binlinks_from_tweet(tweetdict):
"""Parse binary file url:s from a single tweet.
:tweetdict: json data dict for tweet
:returns: list of urls for media files
"""
urls = []
if "user" in tweetdict.keys():
urls.append(tweetdict["user"]["profile_image_url_https"])
urls.append(tweetdict["user"]["profile_background_image_url_https"])
if "extended_entities" in tweetdict.keys():
urls.extend(parse_extended_entities(tweetdict["extended_entities"]))
return urls
def main():
start = time.time()
logging.basicConfig(filename=os.path.join(args.archive_dir, "media_harvest.log"),
level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
if not os.path.isdir(args.archive_dir):
os.mkdir(args.archive_dir)
logging.getLogger(__name__)
logging.info("Logging media harvest for %s",args.tweet_file)
urls = []
d = dedup()
d.start()
uniqueUrlCount = 0
duplicateUrlCount = 0
with gzip.open(args.tweet_file, 'r') as tweetfile:
logging.info("Checking for duplicate urls")
for line in tweetfile:
tweet = json.loads(line)
tweet_urls = parse_binlinks_from_tweet(tweet)
for url in tweet_urls:
if not url in urls:
urls.append(url)
q.put(url)
uniqueUrlCount +=1
else:
duplicateUrlCount += 1
logging.info("Found %s total media urls %s unique and %s duplicates", uniqueUrlCount+duplicateUrlCount, uniqueUrlCount,duplicateUrlCount)
threads = int(args.threads)
if threads > 2:
threads = 2
for i in range(threads):
t = getResource(q)
t.setDaemon(True)
t.start()
wt = write2Warc(out_queue, os.path.join(args.archive_dir,'warc.warc'))
wt.setDaemon(True)
wt.start()
q.join()
out_queue.join()
logging.info("Finished media harvest in %s",str(timedelta(seconds=(time.time() - start))))
if __name__ == '__main__':
parser = argparse.ArgumentParser("archive")
parser.add_argument("tweet_file", action="store", help="a twitter jsonl.gz input file")
parser.add_argument("archive_dir", action="store", help="a directory where the resulting warc is stored")
parser.add_argument("--threads", action="store", default=1, help="Twitter API consumer key")
args = parser.parse_args()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment