Skip to content

Instantly share code, notes, and snippets.

@oneamitj
Last active April 20, 2018 11:47
Show Gist options
  • Save oneamitj/14de90923ade47339eb93d61514935fd to your computer and use it in GitHub Desktop.
Save oneamitj/14de90923ade47339eb93d61514935fd to your computer and use it in GitHub Desktop.
Crawl facebook public page for posts with comments and save it to sqlite db, using python with facebook graph api.
import time
import datetime as dt
import sqlite3
import json
import urllib.request, urllib.error, urllib.parse
FB_PAGE_ACCESS_API = "YOUR FACEBOOK PAGE ACCESS API KEY HERE"
class Database:
# Initialize Database, default is local = sqlites
def __init__(self):
self.db = self.local_db_instance()
# Initialize sqlite with initdb.sql script
def local_db_instance(self):
self.connection = sqlite3.connect('fbdata.db')
cursor = self.connection.cursor()
with open('init.sql', mode='r') as f:
cursor.executescript(f.read())
self.connection.commit()
return cursor
# Upload data to db
def upload_post(self, data):
try:
self.db.executemany('INSERT OR IGNORE INTO posts (page, id, message, created) VALUES (?, ?, ?, ?)', data)
self.connection.commit()
return True
except Exception as e:
print(e.args)
return False
# Upload data to db
def upload_comment(self, data):
try:
self.db.executemany('INSERT OR IGNORE INTO comments (id, post_id, page, message, created) VALUES (?, ?, ?, ?, ?)', data)
self.connection.commit()
return True
except Exception as e:
print(e.args)
return False
def query_db(self, query, args=(), one=False):
if self.local:
self.db.execute(query, args)
data = self.db.fetchall()
return (data[0] if data else None) if one else data
else:
pass
def fetch_posts(self, last_row_only=False):
if last_row_only:
self.db.execute('SELECT * FROM posts ORDER BY created DESC LIMIT 1 ')
data = self.db.fetchone()
return data
else:
self.db.execute('SELECT * FROM posts ORDER BY created')
data = self.db.fetchall()
return data
def fetch_post_comments(self, post_id, last_row_only=False):
if last_row_only:
self.db.execute('SELECT * FROM comments WHERE post_id = ? ORDER BY created DESC LIMIT 1 ', ([post_id]))
data = self.db.fetchone()
return data
else:
self.db.execute('SELECT * FROM comments WHERE post_id = ? ORDER BY created', ([post_id]))
data = self.db.fetchall()
return data
def fetch_page_comments(self, page, last_row_only=False):
if last_row_only:
self.db.execute('SELECT * FROM comments WHERE page = ? ORDER BY created DESC LIMIT 1 ', ([page]))
data = self.db.fetchone()
return data
else:
self.db.execute('SELECT * FROM comments WHERE page = ? ORDER BY created', ([page]))
data = self.db.fetchall()
return data
def get_fb_data(url):
user_agent = 'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_4; en-US) AppleWebKit/534.3 (KHTML, like Gecko) Chrome/6.0.472.63 Safari/534.3'
headers = { 'User-Agent' : user_agent }
try:
req = urllib.request.Request(url, None, headers)
response = urllib.request.urlopen(req)
except:
print("URL Open error.")
return
# response_bytes = response.read().decode('utf8')
# print(response.read())
return json.loads(response.read().decode('utf8'))
def get_posts(page, last_date):
db = Database()
url = "https://graph.facebook.com/v2.12/{}/posts?access_token={}&debug=all&format=json&method=get&pretty=0&suppress_http_code=1&limit=100".format(page, FB_PAGE_ACCESS_API)
posts = get_fb_data(url)
i = 0
# print(posts)
date = dt.datetime.utcnow().replace(microsecond=0)
while(date >= last_date and url != ""):
data = []
for post in posts["data"]:
print("Page {} Post {}".format(page, post["id"]))
if "message" in post.keys():
data.append([page, post["id"], post["message"], post["created_time"]])
created = post["created_time"]
elif "story" in post.keys():
data.append([page, post["id"], post["story"], post["created_time"]])
created = post["created_time"]
get_comments(post["id"], page)
db.upload_post(data)
try:
date = dt.datetime.strptime(created.split("+")[0], "%Y-%m-%dT%H:%M:%S")
url = posts["paging"]["next"]
posts = get_fb_data(url)
print("Next Page URL {}".format(url))
except:
url = ""
def get_comments(post_id, page=""):
db = Database()
url = "https://graph.facebook.com/v2.12/{}/comments?access_token={}&debug=all&format=json&method=get&pretty=0&suppress_http_code=1&limit=100".format(post_id, FB_PAGE_ACCESS_API)
comments = get_fb_data(url)
print("Comments for post {}".format(post_id))
while(url != ""):
data = []
for comment in comments["data"]:
if "message" in comment.keys():
data.append([comment["id"], post_id, page, comment["message"], comment["created_time"]])
created = comment["created_time"]
db.upload_comment(data)
try:
url = comments["paging"]["next"]
comments = get_fb_data(url)
except:
url = ""
if __name__ == '__main__':
get_posts("PAGE NAME HERE", dt.datetime(year=2017, month=1, day=1))
CREATE TABLE IF NOT EXISTS posts (
page TEXT NOT NULL,
id TEXT NOT NULL,
message TEXT NOT NULL,
created DATETIME NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE IF NOT EXISTS comments (
id TEXT NOT NULL,
post_id TEXT NOT NULL,
page TEXT,
message TEXT NOT NULL,
created DATETIME NOT NULL,
PRIMARY KEY (id)
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment