Skip to content

Instantly share code, notes, and snippets.

@amarynets
Created January 10, 2017 19:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amarynets/d364405e6357278d749782be5f932a31 to your computer and use it in GitHub Desktop.
Save amarynets/d364405e6357278d749782be5f932a31 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from urllib.parse import urlparse
from urllib.request import urlopen
from urllib.request import build_opener, Request, HTTPCookieProcessor
from http.cookiejar import CookieJar
import traceback
import time
import socket
from pyvirtualdisplay import Display
import scrapy
from scrapy.http import Request
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from scraper.items import AdsItem
from ads.models import URL, Site
class AdsSpider(scrapy.Spider):
name = "ads"
def __init__(self):
self.display = Display()
self.display.start()
super(AdsSpider, self).__init__()
def __del__(self):
self.display.stop()
def start_requests(self):
urls = URL.objects.all()#.order_by('?')[:2]
# i = URL.objects.get(pk=99)
for i in urls:
try:
yield Request(i.name, meta={'url': i})
except:
continue
print('Something wrong in custom Request')
def parse(self, response):
try:
time.sleep(10)
driver = webdriver.Chrome()
driver.set_page_load_timeout(15)
socket.setdefaulttimeout(15)
try:
driver.get(response.url)
except socket.timeout:
print("SOCKET TIMEOUT")
driver.quit()
res = driver.page_source.encode('utf-8')
response = response.replace(body=res)
url = response.meta['url']
# try:
# site_url = response.request.meta.get('redirect_urls')
# site_url = site_url[0]
# except:
# site_url = response.url
# response = response.replace(url=site_url)
img_finder = Finder(response, url)
i_finder = IFrameFinder(response, url, driver)
result = img_finder.get_result()
result += i_finder.get_result()
for i in result:
if i['img']:
yield i
else:
pass
driver.quit()
except TimeoutException as e:
driver.quit()
except BaseException as e:
print(e)
print(traceback.print_exc())
print("Something wrong in function parse")
class Finder(object):
def __init__(self, response, url):
self.response = response
self.url = url
self.result = list()
self.image_processing(self.response)
def get_all_href(self, response):
return response.xpath("..//a")
def get_all_external_href(self, data):
result = list()
for i in data:
if self.has_target(i):
result.append(i)
return result
def image_processing(self, response):
all_href = self.get_all_href(response)
external_href = self.get_all_external_href(all_href)
print(external_href)
for i in external_href:
self.push_ads(i)
def push_ads(self, data):
item = AdsItem()
print("DATA IS: ", data.extract())
href = data.xpath("./@href").extract_first()
img = data.xpath("./img/@src").extract_first()
href = self.join_url(href, self.response.url)
img = self.join_url(img, self.response.url)
href = self.get_url_from_href(href)
item['href'] = href
item['img'] = img
item['url'] = self.url
self.result.append(item)
def get_result(self):
return self.result
def has_target(self, a_tag):
target = a_tag.xpath("./@target").extract_first()
if target == '_blank':
return True
else:
return False
def join_url(self, url, base_url):
if self.url_start_point(url):
address = urlparse(base_url)
domain = address.scheme + '://' + address.netloc
return domain + url[1:]
elif self.url_start_slash(url):
address = urlparse(base_url)
domain = address.scheme + '://' + address.netloc
return domain + url
else:
return url
def url_start_point(self, url):
try:
if url[0] == '.':
return True
else:
return False
except BaseException as e:
return False
def url_start_slash(self, url):
try:
if url[0] == '/':
return True
else:
return False
except BaseException as e:
return False
def get_url_from_href(self, href):
# More info why need use it there ->
# http://stackoverflow.com/questions/32569934/urlopen-returning-redirect-error-for-valid-links
try:
# req = Request(href)
# cj = CookieJar()
# opener = build_opener(HTTPCookieProcessor(cj))
# response = opener.open(req)
# url = response.url
# response.close()
url = urlopen(href, timeout=8)
return url.url
except:
# self.driver.get(href)
# url = self.driver.current_url
return href
class IFrameFinder(object):
def __init__(self, response, url, driver):
self.response = response
self.url = url
self.driver = driver
self.result = list()
self.iframe_processing(self.response)
def iframe_processing(self, response):
print("IFRAME Processing")
all_iframe = self.get_all_iframe(response)
all_html_from_iframe = self.get_all_html_from_iframe(all_iframe)
for i in all_html_from_iframe:
finder = Finder(i, self.url)
self.result += finder.get_result()
def get_result(self):
return self.result
def get_all_iframe(self, response):
result = response.xpath("..//iframe")
return result
def get_all_html_from_iframe(self, data):
result = list()
for i in data:
result.append(self.get_html_from_iframe(i))
return result
def get_html_from_iframe(self, iframe):
id = str(iframe.xpath("./@id").extract_first())
frame = self.driver.find_element_by_id(id)
try:
self.driver.switch_to.frame(frame)
response = self.response
response = response.replace(body=self.driver.page_source.encode('utf-8'))
self.driver.switch_to.default_content()
except:
print("Not found iframe")
return False
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment