Created
April 9, 2014 19:54
-
-
Save alixander/10308030 to your computer and use it in GitHub Desktop.
Crawler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from scrapy.spider import Spider | |
from scrapy.contrib.linkextractors.sgml import SgmlLinkExtractor | |
from scrapy.selector import Selector | |
from scrapy.http import Request | |
import re | |
from spoton_challenge.items import Event_Url | |
class SpotonSpider(Spider): | |
name = "SpotonSpider" | |
def __init__(self, url=None, *args, **kwargs): | |
super(SpotonSpider, self).__init__(*args, **kwargs) | |
self.URL_CAP = 10 | |
self.returned = 0 | |
self.start_urls = ['%s' % url] # Pass a url in as argument | |
self.keywords = ['related', 'upcoming', 'nearby', 'recommended', 'hot', 'new'] | |
self.additionals = ['tickets', 'events', 'activities', 'classes', 'sessions', 'performances', 'shows', 'festivals', 'plays', 'pages'] | |
self.singulars = [] | |
for word in self.additionals: | |
self.singulars.append(word[:-1]) | |
def parse(self, response): | |
if (not self.start_urls): # Needs an argument | |
return | |
sel = Selector(response) | |
output_urls = [] | |
urls_on_page = [] | |
for url in self.search_pagelinks(sel, response): | |
urls_on_page.append(self.itemize(url)) | |
if len(urls_on_page) < self.URL_CAP: # Need more URLs | |
self.URL_CAP -= len(urls_on_page) | |
request = Request(url=self.search_eventspage(sel, response), callback=self.parse_eventspage) | |
request.meta['items'] = urls_on_page # Pass meta data to next request | |
# print(urls_on_page) | |
return request | |
else: | |
for url in urls_on_page: # Meets or is above URL cap | |
if len(output_urls) >= self.URL_CAP: | |
break | |
output_urls.append(self.itemize(url)) | |
return output_urls | |
def itemize(self, url): | |
""" Make a Scrapy item out of a url """ | |
item = Event_Url() | |
item['url'] = url | |
return item | |
def remove_css_js(self, hrefs): | |
for href in hrefs: | |
regex_split = re.split("[^a-zA-Z]*", href.lower()) | |
slash_split = href.split("/") | |
remove = False | |
for part in regex_split: | |
if "css" in part or "js" in part: | |
remove = True | |
# print(slash_split) | |
for part in slash_split: | |
if "css" in part or "js" in part: | |
remove = True | |
if (remove): | |
hrefs.remove(href) | |
return hrefs | |
def parse_eventspage(self, response): | |
""" Parse page of events instead of one event page | |
Returns a list of links to events | |
""" | |
sel = Selector(response) | |
outputs = [] | |
for item in response.meta['items']: | |
outputs.append(item) | |
hrefs = sel.xpath('//@href').extract() | |
hrefs = self.remove_css_js(hrefs) | |
root_domain = response.url.split("/")[2] | |
input_parts = response.url.lower().split("/") # Split up argument url | |
chosen_links = [] | |
for href in hrefs: | |
href = self.make_relative_absolute(href, response) | |
match1 = False # Make sure comes from root domain | |
match2 = False # Make sure matches at least one keyword | |
href_parts = href.lower().split("/") # Split up to compare to argument url | |
for href_part in href_parts: | |
if href_part == root_domain: # Just need one to match root domain | |
match1 = True | |
elif href_part in self.additionals: | |
match2 = True | |
else: | |
regex_split = re.split("[^a-zA-Z]*", href_part.lower()) | |
for word in regex_split: | |
if word in self.additionals or word in self.singulars: | |
match2 = True | |
if (match1 and match2) and (len(href_parts) in range(len(input_parts)-2, len(input_parts)+2)): # Both from root domain and has a keyword | |
if href not in chosen_links: # Don't add redundant url's | |
chosen_links.append(href) | |
for url in chosen_links: | |
if len(outputs) < self.URL_CAP: | |
outputs.append(self.itemize(url)) | |
else: | |
break | |
return outputs | |
def search_eventspage(self, sel, response): | |
""" Search for an and returns a url to events or calendar page on domain | |
Returns empty list if page not found | |
""" | |
keywords = self.keywords | |
additionals = self.additionals | |
clickables = sel.xpath("//a").extract() | |
image_clickables = sel.xpath("//a[contains('@src', .)]/img/@src").extract() # For those pesky sites that use images as menu items | |
valid_img_link_lists = [] | |
for img_src in image_clickables: # Search through image sources | |
img_words = re.split("[^a-zA-Z]*", img_src.lower()) # Only want alphabetical characters | |
for word in img_words: | |
if word in additionals: # Matches a keyword | |
valid_img_link_lists.append(img_words) # Probably valid | |
for clickable in clickables: | |
clickable_words = re.split("[^a-zA-Z]*", clickable.lower()) | |
if len(valid_img_link_lists) > 0: # Events url is hrefed by image | |
valid = True | |
for valid_img_links in valid_img_link_lists: # Probably better way to do this than 3 for loops | |
for valid_img_link in valid_img_links: | |
if valid_img_link not in clickable_words: | |
valid = False | |
if (valid): | |
for word in clickable.split(" "): | |
if 'href' in word: | |
return ""+self.make_relative_absolute(word[6:-1], response) # Get only the url without quotes | |
else: # Events url in plain text | |
for clickable_word in clickable_words: | |
if clickable_word in keywords: | |
return ""+self.make_relative_absolute(clickable_word, response) | |
def search_pagelinks(self, sel, response): | |
""" Search the current page for links to other events | |
Returns empty list if page doesn't contain keywords that hint to other pages | |
""" | |
keywords = self.keywords | |
additionals = self.additionals | |
potential_links = False # Assume page has no useful links | |
# Presumably important information inside these tags | |
# relevant_text = sel.xpath("//h1/text() | //h2/text() | //h3/text() | //div/text() | //li/text() | //a/text() | //p/text()").extract() | |
# for word in relevant_text: | |
# if len(word) > 3: # Filter out useless words | |
# for keyword in keywords: # Search through keywords | |
# if keyword in word.lower(): # Check if keyword is in the text | |
# for additional in additionals: # Need another match | |
# if additional in word.lower(): | |
# potential_links = True # This page probably has useful links | |
# if (not potential_links): # Not sure if this is a necessary check | |
# return [] | |
hrefs = sel.xpath('//@href').extract() | |
hrefs = self.remove_css_js(hrefs) | |
prepend = "http://" + response.url.split("/")[2] # Prepend to relative urls | |
input_parts = response.url.lower().split("/") # Split up argument url | |
chosen_links = [] | |
for href in hrefs: | |
match = False # Assume link is a false positive | |
href_parts = href.lower().split("/") # Split up to compare to argument url | |
for href_part in href_parts: | |
if href_part in input_parts and href_part not in prepend.split("/"): # Some relevant parts of url match argument | |
match = True | |
if (match) and (len(href_parts) in range(len(input_parts)-2, len(input_parts))): # Url probably ~same length of argument url | |
href = self.make_relative_absolute(href, response) | |
if href not in chosen_links: # Don't add redundant url's | |
# print(href) | |
chosen_links.append(href) | |
return chosen_links | |
def make_relative_absolute(self, href, response): | |
""" Prepends a http://[domain] to beginning of relative urls | |
Returns without change if href already absolute | |
""" | |
prepend = "http://" + response.url.split("/")[2] | |
href_parts = href.lower().split("/") | |
if "http:" not in href_parts: # Make relative url absolute | |
href = prepend + href | |
# print(href) | |
return href |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment