-
-
Save amarynets/d364405e6357278d749782be5f932a31 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from urllib.parse import urlparse | |
from urllib.request import urlopen | |
from urllib.request import build_opener, Request, HTTPCookieProcessor | |
from http.cookiejar import CookieJar | |
import traceback | |
import time | |
import socket | |
from pyvirtualdisplay import Display | |
import scrapy | |
from scrapy.http import Request | |
from selenium import webdriver | |
from selenium.common.exceptions import TimeoutException | |
from scraper.items import AdsItem | |
from ads.models import URL, Site | |
class AdsSpider(scrapy.Spider): | |
name = "ads" | |
def __init__(self): | |
self.display = Display() | |
self.display.start() | |
super(AdsSpider, self).__init__() | |
def __del__(self): | |
self.display.stop() | |
def start_requests(self): | |
urls = URL.objects.all()#.order_by('?')[:2] | |
# i = URL.objects.get(pk=99) | |
for i in urls: | |
try: | |
yield Request(i.name, meta={'url': i}) | |
except: | |
continue | |
print('Something wrong in custom Request') | |
def parse(self, response): | |
try: | |
time.sleep(10) | |
driver = webdriver.Chrome() | |
driver.set_page_load_timeout(15) | |
socket.setdefaulttimeout(15) | |
try: | |
driver.get(response.url) | |
except socket.timeout: | |
print("SOCKET TIMEOUT") | |
driver.quit() | |
res = driver.page_source.encode('utf-8') | |
response = response.replace(body=res) | |
url = response.meta['url'] | |
# try: | |
# site_url = response.request.meta.get('redirect_urls') | |
# site_url = site_url[0] | |
# except: | |
# site_url = response.url | |
# response = response.replace(url=site_url) | |
img_finder = Finder(response, url) | |
i_finder = IFrameFinder(response, url, driver) | |
result = img_finder.get_result() | |
result += i_finder.get_result() | |
for i in result: | |
if i['img']: | |
yield i | |
else: | |
pass | |
driver.quit() | |
except TimeoutException as e: | |
driver.quit() | |
except BaseException as e: | |
print(e) | |
print(traceback.print_exc()) | |
print("Something wrong in function parse") | |
class Finder(object): | |
def __init__(self, response, url): | |
self.response = response | |
self.url = url | |
self.result = list() | |
self.image_processing(self.response) | |
def get_all_href(self, response): | |
return response.xpath("..//a") | |
def get_all_external_href(self, data): | |
result = list() | |
for i in data: | |
if self.has_target(i): | |
result.append(i) | |
return result | |
def image_processing(self, response): | |
all_href = self.get_all_href(response) | |
external_href = self.get_all_external_href(all_href) | |
print(external_href) | |
for i in external_href: | |
self.push_ads(i) | |
def push_ads(self, data): | |
item = AdsItem() | |
print("DATA IS: ", data.extract()) | |
href = data.xpath("./@href").extract_first() | |
img = data.xpath("./img/@src").extract_first() | |
href = self.join_url(href, self.response.url) | |
img = self.join_url(img, self.response.url) | |
href = self.get_url_from_href(href) | |
item['href'] = href | |
item['img'] = img | |
item['url'] = self.url | |
self.result.append(item) | |
def get_result(self): | |
return self.result | |
def has_target(self, a_tag): | |
target = a_tag.xpath("./@target").extract_first() | |
if target == '_blank': | |
return True | |
else: | |
return False | |
def join_url(self, url, base_url): | |
if self.url_start_point(url): | |
address = urlparse(base_url) | |
domain = address.scheme + '://' + address.netloc | |
return domain + url[1:] | |
elif self.url_start_slash(url): | |
address = urlparse(base_url) | |
domain = address.scheme + '://' + address.netloc | |
return domain + url | |
else: | |
return url | |
def url_start_point(self, url): | |
try: | |
if url[0] == '.': | |
return True | |
else: | |
return False | |
except BaseException as e: | |
return False | |
def url_start_slash(self, url): | |
try: | |
if url[0] == '/': | |
return True | |
else: | |
return False | |
except BaseException as e: | |
return False | |
def get_url_from_href(self, href): | |
# More info why need use it there -> | |
# http://stackoverflow.com/questions/32569934/urlopen-returning-redirect-error-for-valid-links | |
try: | |
# req = Request(href) | |
# cj = CookieJar() | |
# opener = build_opener(HTTPCookieProcessor(cj)) | |
# response = opener.open(req) | |
# url = response.url | |
# response.close() | |
url = urlopen(href, timeout=8) | |
return url.url | |
except: | |
# self.driver.get(href) | |
# url = self.driver.current_url | |
return href | |
class IFrameFinder(object): | |
def __init__(self, response, url, driver): | |
self.response = response | |
self.url = url | |
self.driver = driver | |
self.result = list() | |
self.iframe_processing(self.response) | |
def iframe_processing(self, response): | |
print("IFRAME Processing") | |
all_iframe = self.get_all_iframe(response) | |
all_html_from_iframe = self.get_all_html_from_iframe(all_iframe) | |
for i in all_html_from_iframe: | |
finder = Finder(i, self.url) | |
self.result += finder.get_result() | |
def get_result(self): | |
return self.result | |
def get_all_iframe(self, response): | |
result = response.xpath("..//iframe") | |
return result | |
def get_all_html_from_iframe(self, data): | |
result = list() | |
for i in data: | |
result.append(self.get_html_from_iframe(i)) | |
return result | |
def get_html_from_iframe(self, iframe): | |
id = str(iframe.xpath("./@id").extract_first()) | |
frame = self.driver.find_element_by_id(id) | |
try: | |
self.driver.switch_to.frame(frame) | |
response = self.response | |
response = response.replace(body=self.driver.page_source.encode('utf-8')) | |
self.driver.switch_to.default_content() | |
except: | |
print("Not found iframe") | |
return False | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment