Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save shafayeatsumit/57e7ebf9c0ffda3c783b9a4f5d4e9468 to your computer and use it in GitHub Desktop.
Save shafayeatsumit/57e7ebf9c0ffda3c783b9a4f5d4e9468 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import json
import re
# Import Scrapy stuff
import scrapy
import time
from scrapy import signals
from scrapy.http import HtmlResponse
from scrapy_splash import SplashRequest
from scrapy.linkextractors import LinkExtractor
import random
from pyproj import Proj, transform
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
try:
from shop_info.items import ShopInfoItem
except:
from items import ShopInfoItem
try:
from shop_info import useragent
except:
import useragent
input_data = {
'format':
[
{
'item_name': 'name',
'xpath': '//*[@id="topicPath"]/li[2]/text()',
'regex_match': 'Null',
'regex_item': 'Null',
'remove_tag': ''},
{
'item_name': 'zip',
'xpath': '//*[@id="header"]/p',
'regex_match': '〒([\d\-]+)\s*',
'regex_item': '$1 ',
'remove_tag': ''
},
{
'item_name': 'address',
'xpath': '//*[@id="header"]/p',
'regex_match': '〒[\d\-]+\s*(.+)\s*TEL',
'regex_item': '$1 ',
'remove_tag': ''},
{
'item_name': 'tel',
'xpath': '//*[@id="header"]/p',
'regex_match': '〒[\d\-]+\s*.+\s*TEL\s*\:*\s*([\d\-\(\)\s]+)',
'regex_item': '$1',
'remove_tag': ''}
],
'target': [
{
'id': '1',
'url': 'https://www.takashimaya.co.jp/rakusai/index.html'
},
{
'id': '2',
'url': 'https://www.takashimaya.co.jp/okayama/index.html'
},
{
'id': '3',
'url': 'https://www.takashimaya.co.jp/tachikawa/index.html'},
{
'id': '4',
'url': 'https://www.takashimaya.co.jp/yokohama/index.html'}
],
'coordination':[
{
"item_name": "coordination",
"mapurl_xpath": '//*[@id="storeInfo"]/div/p/a',
"xpath": '//*[@id="mapDiv"]/div/div/div[10]/div/div/div/div[7]/div/a',
"regex_match": "https:\/\/maps\.google\.com\/maps\?.*ll=([\d\.]+),([\d\.]+)",
"lat": "$1",
"lon": "$2",
"mapurl_regex": "\/[^\/]+\/access\/index.html"
}
]
}
lua_first_page = """
function main(splash)
local url = splash.args.url
assert(splash:go(url))
assert(splash:wait(10))
return {
html = splash:html()
}
end
"""
map_url_page = """
function main(splash)
local url = splash.args.url
assert(splash:go(url))
assert(splash:wait(6))
return {
html = splash:html()
}
end
"""
tiny_url_extract = """
function main(splash)
local url = splash.args.url
assert(splash:go(url))
assert(splash:wait(15))
return {
url = splash:url()
html = splash:html()
}
end
"""
useragent_lists = useragent.user_agent_list
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-GB,en-US;q=0.8,en;q=0.6',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': useragent_lists[random.randrange(0, len(useragent_lists))],
}
def remove_tag(text, remove_tag):
if int(remove_tag) == 1:
text = re.sub('<[^<]+?>', '', text)
return text
elif int(remove_tag) == 2:
text = re.sub('<[^<]+?>', ' ', text)
return text
elif int(remove_tag) == 3:
text = re.sub('<[^<]+?>', '__tag__', text)
return text
return text
def cleanup_data(data):
replaced_data = []
for d in data:
for k, v in d.items():
if "¥" in v:
v = v.replace("¥", "\\")
d[k] = v
replaced_data.append(d)
if replaced_data:
return replaced_data
return data
class ShopSpider(scrapy.Spider):
name = "shop_info"
handle_httpstatus_list = [404, 500]
def __init__(self, *args, **kwargs):
super(ShopSpider, self).__init__(*args, **kwargs)
try:
self.driver = webdriver.Remote(command_executor='http://127.0.0.1:4444/wd/hub', desired_capabilities=DesiredCapabilities.CHROME)
except:
self.driver = webdriver.Remote(command_executor='http://138.68.241.86:4444/wd/hub', desired_capabilities=DesiredCapabilities.CHROME)
self.input_data = kwargs.get('input_data', None)
if self.input_data:
self.input_data = json.loads(self.input_data)
if not self.input_data:
self.input_data = input_data
self.target_urls = cleanup_data(self.input_data["target"])
self.formats = cleanup_data(self.input_data["format"])
self.coordinates = cleanup_data(self.input_data["coordination"])
def start_requests(self):
for target in self.target_urls:
headers["User-Agent"] = useragent_lists[random.randrange(0, len(useragent_lists))]
yield SplashRequest(target["url"], self.parse, errback=self.handle_error, headers=headers, meta={
'id': target["id"], "url": target["url"], "http_status_from_error_code":True},
endpoint='execute', args={'lua_source': lua_first_page, 'timeout': 90}
)
def parse(self, response):
id = response.meta.get('id')
url = response.meta.get('url')
if response.status in self.handle_httpstatus_list:
return scrapy.Request(url=url, callback=self.after_404, meta={
'id': id, "url": url})
self.output = ShopInfoItem()
self.output["id"] = id
self.output["URL"] = url
xpath_log = []
regex_log = []
strings_from_xpath = []
for fmt in self.formats:
x_log = {}
r_log = {}
item = fmt["item_name"]
item_val = None
if "regex_match" in fmt and fmt["regex_match"] != "Null" and fmt["xpath"] !="Null":
try:
# item_text = response.xpath('{0}'.format(fmt["xpath"])).extract_first()
item_text = response.xpath(fmt["xpath"]).extract_first()
if not item_text:
x_log["item"] = item
x_log["xpath"] = fmt["xpath"]
x_log["message"] = "Failed to get string from the Xpath"
xpath_log.append(x_log)
else:
try:
if "remove_tag" in fmt and fmt["remove_tag"] != "Null" and fmt["remove_tag"]:
item_text = remove_tag(item_text, fmt["remove_tag"])
line = re.search(r'{}'.format(fmt["regex_match"]), item_text.strip())
if line:
grp = re.findall(r'\d+', fmt["regex_item"])[0]
li = line.group(int(grp))
item_val = li
else:
r_log["item"] = item
r_log["item Text"] = item_text
r_log["regex"] = fmt["regex_match"]
r_log["message"] = "Failed to get item from Regex"
regex_log.append(r_log)
string_log = {"item": item, "string":item_text}
strings_from_xpath.append(string_log)
except Exception as e:
r_log["item"] = item
r_log["item Text"] = item_text
r_log["regex"] = fmt["regex_match"]
r_log["message"] = e
regex_log.append(r_log)
string_log = {"item": item, "string":item_text}
strings_from_xpath.append(string_log)
except Exception as e:
r_log["item"] = item
r_log["xpath"] = fmt["xpath"]
r_log["message"] = e
xpath_log.append(r_log)
pass
elif fmt["xpath"] !="Null":
item_val = response.xpath(fmt["xpath"]).extract_first()
if "remove_tag" in fmt and fmt["remove_tag"] != "Null" and fmt["remove_tag"]:
item_val = remove_tag(item_val, fmt["remove_tag"])
if not item_val:
x_log["item"] = item
x_log["xpath"] = fmt["xpath"]
x_log["message"] = "Failed to get string from the Xpath"
xpath_log.append(x_log)
elif "remove_tag" in fmt and fmt["remove_tag"] != "Null" and fmt["remove_tag"]:
item_text = response.xpath(fmt["xpath"]).extract_first()
item_text = remove_tag(item_text, fmt["remove_tag"])
item_val = item_text
self.output[item] = item_val
self.output["xpath_log"] = xpath_log if len(xpath_log) else None
self.output["regex_log"] = regex_log if len(regex_log) else None
if len(strings_from_xpath):
self.output["strings_from_xpath"] = strings_from_xpath
for cords in self.coordinates:
if cords and (cords["mapurl_regex"] == "Null" or cords["mapurl_regex"] == "" or "mapurl_regex" not in cords):
if "tinymapurl_regex" in cords and cords["tinymapurl_regex"]:
match = re.search(r'{}'.format(cords["tinymapurl_regex"]), response.body.decode('utf-8'))
if match:
tiny_url = match.group()
yield SplashRequest(tiny_url, self.parse_tiny_url, errback=self.handle_error, headers=headers, meta={"http_status_from_error_code":True, "output":self.output, "cords":cords},
endpoint='render.json', args={'lua_source': tiny_url_extract, 'timeout': 90, 'iframes': 1, 'html': 1})
else:
regex_log = {"regex_log":{"message":"Failed to get item from this regex", "regex":cords["tinyurl_regex"], 'strings_from_xpath':'* whole html docment *'}, "xpath_log":""}
self.output["coordination"] = regex_log
yield self.output
else:
item_name = "coordination"
try:
item_text = response.xpath(cords["xpath"]).extract_first()
match = None
if item_text:
match = re.search(r'{}'.format(cords["regex_match"]), item_text.strip())
else:
match = re.search(r'{}'.format(cords["regex_match"]), response.body.decode("utf-8"))
if match:
org_lat = match.group(1)
org_lon = match.group(2)
if org_lat and org_lon and "srid" in cords and cords["srid"]:
inProj = Proj("+init=EPSG:{}".format(cords["srid"]))
outProj = Proj("+init=EPSG:4326")
lon, lat = transform(inProj,outProj,org_lon,org_lat)
self.output[item_name] = {"lat": lat, "lon":lon, "org_lat":org_lat, "org_lon": org_lon}
else:
self.output[item_name] = {"lat": org_lat, "lon":org_lon}
yield self.output
else:
if item_text:
regex_log = {"regex_log":{"message":"Failed to get item from this regex", "regex":cords["regex_match"], 'strings_from_xpath':item_text}, "xpath_log":""}
else:
regex_log = {"regex_log":{"message":"Failed to get item from this regex", "regex":cords["regex_match"], 'strings_from_xpath':'* whole html docment *'}, "xpath_log":""}
self.output["coordination"] = regex_log
yield self.output
except Exception as e:
xpath_log = {"regex_log":"", "xpath_log":{"message":e, "xptah":cords["xpath"]}}
self.output["coordination"] = xpath_log
yield self.output
elif "mapurl_xpath" in cords and cords["mapurl_xpath"]:
try:
self.driver.get(url)
path = self.driver.find_element_by_xpath(cords["mapurl_xpath"]).get_attribute('outerHTML')
match = None
if path:
match = re.search(r'{}'.format(cords["mapurl_regex"]), path)
if match:
self.driver.find_element_by_xpath(cords["mapurl_xpath"]).click()
time.sleep(5)
yield SplashRequest(self.driver.current_url, self.parse_map_url, errback=self.handle_error, headers=headers, meta={"http_status_from_error_code":True, "output":self.output, "cords":cords},
endpoint='render.json', args={'lua_source': map_url_page, 'timeout': 90, 'iframes': 1, 'html': 1})
else:
try:
self.driver.find_element_by_xpath(cords["mapurl_xpath"]).click()
time.sleep(5)
yield SplashRequest(self.driver.current_url, self.parse_map_url, errback=self.handle_error, headers=headers, meta={"http_status_from_error_code":True, "output":self.output, "cords":cords},
endpoint='render.json', args={'lua_source': map_url_page, 'timeout': 90, 'iframes': 1, 'html': 1})
except Exception as e:
if path:
regex_log = {"mapurl_regex_log":{"message":"Failed to get item from this Mapurl regex", "regex":cords["mapurl_regex"], 'strings_from_xpath':path}, "xpath_log":"",}
self.output["coordination"] = regex_log
else:
xpath_log = {"regex_log":"", "xpath_Log":{"message":e, "xptah":cords["mapurl_xpath"]}}
self.output["coordination"] = xpath_log
yield self.output
except Exception as e:
xpath_log = {"regex_log":"", "xpath_Log":{"message":e, "xptah":cords["mapurl_xpath"]}}
self.output["coordination"] = xpath_log
yield self.output
else:
yield self.output
if not self.coordinates:
yield self.output
def after_404(self, response):
id = response.meta.get('id')
url = response.meta.get('url')
output = ShopInfoItem()
output["id"] = id
output["URL"] = url
output["404"] = "Page not Found"
yield output
def handle_error(self, failure):
# id = response.meta.get('id')
# url = response.meta.get('url')
url = failure.request.url
Error = failure.type
output = ShopInfoItem()
output["URL"] = url
output["Error"] = Error
yield output
def parse_map_url(self, response):
from scrapy.http import HtmlResponse
output = response.meta.get('output')
cords = response.meta.get('cords')
res = None
if "childFrames" in response.data and response.data["childFrames"]:
res = response.data["childFrames"][0]["html"]
response = response.data["html"]
response = HtmlResponse(url="Dummy URL", body=response, encoding='utf-8')
try:
match = None
item_text = response.xpath(cords["xpath"]).extract_first()
item_name = "coordination"
if item_text:
match = re.search(r'{}'.format(cords["regex_match"]), item_text.strip())
elif res:
match = re.search(r'{}'.format(cords["regex_match"]), res)
if match:
lon_r = re.findall(r'\d+', cords["lon"])[0]
lat_r = re.findall(r'\d+', cords["lat"])[0]
org_lat = match.group(int(lat_r))
org_lon = match.group(int(lon_r))
if org_lat and org_lon and "srid" in cords and cords["srid"]:
inProj = Proj("+init=EPSG:{}".format(cords["srid"]))
outProj = Proj("+init=EPSG:4326")
lon, lat = transform(inProj,outProj,org_lon,org_lat)
output[item_name] = {"lat": lat, "lon":lon, "org_lat":org_lat, "org_lon": org_lon}
else:
output[item_name] = {"lat": org_lat, "lon":org_lon}
else:
if item_text:
regex_log = {"regex_log":{"message":"Failed to get item from this regex", "regex":cords["regex_match"], 'strings_from_xpath':item_text}, "xpath_log":""}
else:
regex_log = {"regex_log":{"message":"Failed to get item from this regex", "regex":cords["regex_match"], 'strings_from_xpath':'* whole html docment *'}, "xpath_log":""}
output["coordination"] = regex_log
except Exception as e:
regex_log = {"regex_log":"", "xpath_log":{"message":e, "xptah":cords["xpath"]}}
output["coordination"] = regex_log
yield output
def parse_tiny_url(self, response):
output = response.meta.get('output')
cords = response.meta.get('cords')
url = response.data["url"]
regex = cords["regex_match"]
match = re.search(r'{}'.format(regex), url)
item_name = "coordination"
if match:
lon_r = re.findall(r'\d+', cords["lon"])[0]
lat_r = re.findall(r'\d+', cords["lat"])[0]
print("Group:", lon_r, lat_r)
org_lat = match.group(1)
org_lon = match.group(2)
if org_lat and org_lon and "srid" in cords and cords["srid"]:
inProj = Proj("+init=EPSG:{}".format(cords["srid"]))
outProj = Proj("+init=EPSG:4326")
lon, lat = transform(inProj,outProj,org_lon,org_lat)
output[item_name] = {"lat": lat, "lon":lon, "org_lat":org_lat, "org_lon": org_lon}
else:
output[item_name] = {"lat": org_lat, "lon":org_lon}
else:
regex_log = {"regex_log":{"message":"Failed to get item from this Mapurl regex", "regex":cords["regex_match"], 'strings_from_xpath':'* whole html docment *'}, "xpath_log":""}
output["coordination"] = regex_log
yield output
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super(ShopSpider, cls).from_crawler(crawler, *args, **kwargs)
crawler.signals.connect(spider.spider_opened, signals.spider_opened)
crawler.signals.connect(spider.spider_closed, signals.spider_closed)
return spider
def spider_opened(self, spider):
print('Opening {} spider'.format(spider.name))
def spider_closed(self, spider):
self.driver.quit()
print('Closing {} spider'.format(spider.name))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment