Skip to content

Instantly share code, notes, and snippets.

@Greyvend
Last active September 14, 2017 10:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Greyvend/f730ccd5dc1e7eacc4f27b0c9da86eee to your computer and use it in GitHub Desktop.
Save Greyvend/f730ccd5dc1e7eacc4f27b0c9da86eee to your computer and use it in GitHub Desktop.
Blog scraping and analysis code snippets. Refer to the corresponding repo for the full working example: https://github.com/Databrawl/blog_analysis.
from itertools import groupby
from operator import itemgetter
from utils import outlier_threshold
def filter_view_deviations(data):
query_sorted_data = sorted(data, key=itemgetter('query'))
result = []
for k, group in groupby(query_sorted_data, key=itemgetter('query')):
group = list(group)
daily_page_views = [elem['daily_page_views'] for elem in group]
threshold = outlier_threshold(daily_page_views)
filtered_data = filter(lambda p: p['daily_page_views'] <= threshold,
group)
filtered_data = list(filtered_data)
for elem in filtered_data:
if elem['daily_page_views'] > 1000000:
print(elem['url'], 'is huge!')
result.extend(filtered_data)
return result
def get_languages_popularity(data):
query_sorted_data = sorted(data, key=itemgetter('query'))
result = {'languages': [], 'views': []}
popularity = []
for k, group in groupby(query_sorted_data, key=itemgetter('query')):
group = list(group)
daily_page_views = map(lambda r: int(r['daily_page_views']), group)
total_page_views = sum(daily_page_views)
popularity.append((group[0]['query'], total_page_views))
sorted_popularity = sorted(popularity, key=itemgetter(1), reverse=True)
languages, views = zip(*sorted_popularity)
result['languages'] = languages
result['views'] = views
return result
def get_ranking_and_views(data, languages):
filtered_data = filter(lambda elem: elem['query'] in languages, data)
query_sorted_data = sorted(filtered_data, key=itemgetter('query'))
result = {}
for k, group in groupby(query_sorted_data, key=itemgetter('query')):
group = list(group)
ranks_views_data = [(r['rank'] + 1, int(r['daily_page_views']))
for r in group]
ranks, views = zip(*ranks_views_data)
result[group[0]['query']] = ranks, views
return result
import urllib.parse
import scrapy
from scrapy_splash import SplashRequest
class BlogsSpider(scrapy.Spider):
name = 'blogs'
allowed_domains = ['cse.google.com']
def __init__(self, queries):
super(BlogsSpider, self).__init__()
self.queries = queries
def start_requests(self):
params_dict = {
'cx': ['partner-pub-9634067433254658:5laonibews6'],
'cof': ['FORID:10'],
'ie': ['ISO-8859-1'],
'q': ['query'],
'sa.x': ['0'],
'sa.y': ['0'],
'sa': ['Search'],
'ad': ['n9'],
'num': ['10'],
'rurl': [
'http://www.blogsearchengine.org/search.html?cx=partner-pub'
'-9634067433254658%3A5laonibews6&cof=FORID%3A10&ie=ISO-8859-1&'
'q=query&sa.x=0&sa.y=0&sa=Search'
],
'siteurl': ['http://www.blogsearchengine.org/']
}
params = urllib.parse.urlencode(params_dict, doseq=True)
url_template = urllib.parse.urlunparse(
['https', self.allowed_domains[0], '/cse',
'', params, 'gsc.tab=0&gsc.q=query&gsc.page=page_num'])
for query in self.queries:
for page_num in range(1, 11):
url = url_template.replace('query', urllib.parse.quote(query))
url = url.replace('page_num', str(page_num))
yield SplashRequest(url, self.parse, endpoint='render.html',
args={'wait': 0.5})
def parse(self, response):
urls = response.css('div.gs-title.gsc-table-cell-thumbnail') \
.xpath('./a/@href').extract()
gsc_fragment = urllib.parse.urlparse(response.url).fragment
fragment_dict = urllib.parse.parse_qs(gsc_fragment)
page_num = int(fragment_dict['gsc.page'][0])
query = fragment_dict['gsc.q'][0]
page_size = len(urls)
for i, url in enumerate(urls):
parsed_url = urllib.parse.urlparse(url)
rank = (page_num - 1) * page_size + i
yield {
'rank': rank,
'url': parsed_url.netloc,
'query': query
}
import urllib.parse
import scrapy
from scrapy_splash import SplashRequest
class TrafficSpider(scrapy.Spider):
name = 'traffic'
allowed_domains = ['www.statshow.com']
def __init__(self, blogs_data):
super(TrafficSpider, self).__init__()
self.blogs_data = blogs_data
def start_requests(self):
url_template = urllib.parse.urlunparse(
['http', self.allowed_domains[0], '/www/{path}', '', '', ''])
for blog in self.blogs_data:
url = url_template.format(path=blog['url'])
request = SplashRequest(url, endpoint='render.html',
args={'wait': 0.5}, meta={'blog': blog})
yield request
def parse(self, response):
site_data = response.xpath('//div[@id="box_1"]/span/text()').extract()
views_data = list(filter(lambda r: '$' not in r, site_data))
if views_data:
blog_data = response.meta.get('blog')
traffic_data = {
'daily_page_views': int(views_data[0].translate({ord(','): None})),
'daily_visitors': int(views_data[1].translate({ord(','): None}))
}
blog_data.update(traffic_data)
yield blog_data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment