Skip to content

Instantly share code, notes, and snippets.

@ihadgraft
Created January 16, 2020 14:59
Show Gist options
  • Save ihadgraft/5ac1c02cba98ca194248cc41fa67604a to your computer and use it in GitHub Desktop.
Save ihadgraft/5ac1c02cba98ca194248cc41fa67604a to your computer and use it in GitHub Desktop.
python scrapy-simple-processor.py https://example.com --output-file=/path/to/file.csv
import csv
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.selector.unified import Selector
import argparse
from tempfile import mktemp
from urllib.parse import urlsplit
from scrapy.http.response.html import HtmlResponse
class MySpider(scrapy.Spider):
name = 'spider'
writer = None
def __init__(self, name=None, writer=None, **kwargs):
super().__init__(name, **kwargs)
self.writer = writer
def _get_url_to_follow(self, selector, response):
"""
Get a URL from a selector, and return a URL to follow, or None to skip.
Arguments:
selector (Selector): The tag to evaluate.
response: The Scrapy response object.
Return:
None or str: Returns the extracted URL if it should be followed; otherwise returns None to prevent
following the link.
"""
# TODO Modify with your own logic for whatever is appropriate
# Only follow 'a' tags.
if selector.root.tag == 'a':
# Get the 'href' or 'src' attribute, i.e. whichever is present.
url_to_follow = response.urljoin(selector.attrib.get('href'))
# Follow only if domain of destination is same as domain of origin
if urlsplit(response.url)[1].split(':')[0] == urlsplit(url_to_follow)[1].split(':')[0]:
return url_to_follow
def _process_selector(self, selector, response):
"""
This is is where you would investigate a youtube link or whatever.
"""
# TODO This function receives the selector (a, iframe, or img tag). Extract what you need and write it out to
# the CSV.
if selector.root.tag == 'iframe':
self.writer.writerow([
'iframe',
response.url,
selector.attrib.get('src'),
])
def parse(self, response):
"""
Arguments:
response (HtmlResponse): The response from Scrapy.
"""
if response.headers['Content-Type'].decode().startswith('text/html'):
for selector in response.xpath('//a[@href]|//iframe[@src]|//img[@src]'):
self._process_selector(selector, response)
url_to_follow = self._get_url_to_follow(selector, response)
if url_to_follow is not None:
yield scrapy.Request(url_to_follow)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('start_url', type=str)
parser.add_argument('--output-file', type=str, default='')
args = parser.parse_args()
output_file = args.output_file
if output_file == '':
output_file = mktemp('.csv', 'scrapy-')
process = CrawlerProcess({
'USER_AGENT': 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)'
})
stream = open('/tmp/scrapy-output.csv', 'w')
try:
process.crawl(MySpider, writer=csv.writer(stream), start_urls=[args.start_url])
process.start()
finally:
stream.close()
print("Output saved to: %s" % output_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment