Skip to content

Instantly share code, notes, and snippets.

@yuezhu
Created January 14, 2018 21:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yuezhu/fd9cf73e4997a3bda2c1d76d9d42f18a to your computer and use it in GitHub Desktop.
Save yuezhu/fd9cf73e4997a3bda2c1d76d9d42f18a to your computer and use it in GitHub Desktop.
Lenovo Outlet Scanner
import urllib
import re
import logging
import sys
import time
import webbrowser
import pprint
import argparse
import subprocess
import shlex
from HTMLParser import HTMLParser
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
IGNORED_ITEMS_FILE = 'lenovo_outlet_scanner_ignored_items'
INFO_FIELD_DEF = (
(r'(ThinkPad .+) - (.+)', ('model', 'condition')),
(r'Part number: (.+)', ('pn', )),
(r'(Limited quantity available|Out of stock|In stock)', ('inventory', )),
(r'List price: (.+)', ('price', )),
(r'Processor: (.+)', ('processor', )),
(r'Operating system: (.+)', ('os', )),
(r'Display: (.+)', ('display', )),
(r'Graphics: (.+)', ('graphics', )),
(r'Memory: (.+)', ('memory', )),
(r'Hard Drive: (.+)', ('hdd', )),
(r'Optical Drive: (.+)', ('optical', ))
)
UA_HDR = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36'
URL = 'http://outlet.lenovo.com/outlet_us/laptops/'
PN_URL = 'http://outlet.lenovo.com/outlet_us/itemdetails/%s/445'
DEF_INTERVAL = 180
NUM_ATTEMPTS = 3
# Using urllib2 would cause HTTP 403 errors.
def notify(title, subtitle, info_text, delay=0, sound=False, userInfo={}):
if sys.platform != 'darwin':
return
import Foundation
import objc
import AppKit
NSUserNotification = objc.lookUpClass('NSUserNotification')
NSUserNotificationCenter = objc.lookUpClass('NSUserNotificationCenter')
notification = NSUserNotification.alloc().init()
notification.setTitle_(title)
notification.setSubtitle_(subtitle)
notification.setInformativeText_(info_text)
notification.setUserInfo_(userInfo)
if sound:
notification.setSoundName_("NSUserNotificationDefaultSoundName")
notification.setDeliveryDate_(Foundation.NSDate.dateWithTimeInterval_sinceDate_(delay, Foundation.NSDate.date()))
NSUserNotificationCenter.defaultUserNotificationCenter().scheduleNotification_(notification)
def progressbar(it, prefix='', length=40, dismiss=True, percentage=True):
count = float(len(it))
def _display(index):
progress = int(length * index / count)
if percentage:
sys.stderr.write(
u'\r{}|{}{}| {:.2f}% '.format(
prefix, u'\u2588' * progress, u' ' * (length - progress), index / count * 100
)
)
else:
sys.stderr.write(
u'\r{}|{}{}| '.format(
prefix, u'\u2588' * progress, u' ' * (length - progress)
)
)
_display(0)
for index, item in enumerate(it):
yield item
_display(index + 1)
if dismiss:
sys.stderr.write('\x1b[2K\r')
else:
sys.stderr.write('\n')
class ItemPageParser(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.data = ''
def handle_data(self, data):
self.data += data
def parse(self):
self.data = re.sub(r'\t', r'', self.data)
self.data = re.sub(r'((\r\n)+|\n+)', r'\n', self.data)
self.data = re.sub(r':\n+', r': ', self.data)
self.data = re.sub(r'List price: (.+?)\n(.+?)\n', r'List price: \2\n', self.data)
def match(info, criteria):
try:
item = criteria[info['model']]
except KeyError:
return False
for key, val in item.items():
if key == 'price' and float(re.sub(r'[$,]', r'', info[key])) > val:
return False
if key in ('processor', 'os', 'inventory', 'condition') and \
not [v for v in val if v in unicode(info[key], 'utf-8')]:
return False
return True
def parse_item_details(page):
parser = ItemPageParser()
parser.feed(page)
parser.parse()
info = {}
for line in parser.data.splitlines():
for regex, fields in INFO_FIELD_DEF:
found = re.search(regex, line)
if found is None:
continue
for index, field in enumerate(fields):
info[field] = found.group(index + 1)
break
try:
logging.info(
'%s, %s, %s, %s, P/N URL: %s',
info['model'], info['condition'], info['price'], info['inventory'],
PN_URL % info['pn']
)
except KeyError as ex:
logging.error('Page: %s\n, missing: %s', parser.data, ex)
return None
return info
def fetch_outlet_info():
cmd = 'curl -s -k -H \'User-Agent: %s\' -X GET %s' % (UA_HDR, URL)
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
page, error = proc.communicate()
if proc.returncode != 0:
logging.error(error)
raise RuntimeError('Failed to GET: %r' % cmd)
category_id = re.findall(r'var f_categoryid = "(.+?)";', page, re.DOTALL)
if category_id:
category_id = category_id[0]
logging.debug('Category ID: %s', category_id)
else:
raise RuntimeError('Could not find category ID in the page')
result_url = re.findall(r'var f_resultsUrl = "(.+?)";', page, re.DOTALL)
if result_url:
result_url = 'http:' + result_url[0]
logging.debug('Result URL: %s', result_url)
else:
raise RuntimeError('Could not find result URL in the page')
return category_id, result_url
def fetch_items(result_url, facet_data):
cmd = 'curl -s -k -H \'User-Agent: %s\' -X POST %s -d %s' % (UA_HDR, result_url, urllib.urlencode(facet_data))
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
page, error = proc.communicate()
if proc.returncode != 0:
logging.error(error)
raise RuntimeError('Failed to POST: %r' % cmd)
items = re.findall(r'var fitems = \[\s*(.+?)\s*\];', page, re.DOTALL)
if not items or not items[0].strip():
items = []
else:
items = [x.strip("'") for x in re.split(r',\s+', items[0])]
item_url_prefix = re.findall(r'var infiniteUrl = \'(.+?)\';', page, re.DOTALL)
if item_url_prefix:
item_url_prefix = 'http:' + item_url_prefix[0]
logging.debug('Item URL prefix: %s', item_url_prefix)
else:
raise RuntimeError('Could not find item URL prefix in the page')
return item_url_prefix, items
def ignored(item):
items = []
try:
with open(IGNORED_ITEMS_FILE, 'r') as fptr:
for line in fptr:
items.extend(re.split(r',\s*', line.strip()))
except (OSError, IOError):
pass
return item in items
def check_item(item, item_url, criteria):
cmd = 'curl -s -k -H \'User-Agent: %s\' -H \'Referer: %s\' -X GET %s' % (UA_HDR, URL, item_url)
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
page, error = proc.communicate()
if proc.returncode != 0:
logging.error(error)
raise RuntimeError('Failed to GET: %r' % cmd)
if 'HTTP/1.1 429 Too Many Requests' in page:
raise RuntimeError('HTTP/1.1 429 Too Many Requests')
if 'Our system has identified this session as possible bot. ' \
'You are being redirected to requested page. Wait a second.' in page:
raise RuntimeError('Triggered anti-bot detection')
info = parse_item_details(page)
if info is None:
logging.error('Failed to parse item page\n%s', page)
raise RuntimeError('Failed to parse item page')
if match(info, criteria):
logging.info('Item %s matched: \n%s', item, pprint.pformat(info))
pn_url = PN_URL % info['pn']
if ignored(item):
logging.info('Item %s is ignored. P/N URL: %s', item, pn_url)
return
if sys.platform == 'darwin':
time.sleep(1)
try:
webbrowser.open_new_tab(pn_url)
except webbrowser.Error as ex:
logging.error(ex)
# notify('Lenovo Outlet Scanner', info['model'], 'Price: ' + info['price'], sound=True)
def check(criteria):
for attempt in xrange(1, NUM_ATTEMPTS + 1):
try:
category_id, result_url = fetch_outlet_info()
except RuntimeError as ex:
logging.error('Failed to parse page content, %s, attempt # %d', ex, attempt)
else:
break
else:
return
data = {
'T series': {
'category-id': category_id,
'sort-criteria': '1',
'page-size': '100',
'facet-1':'1', # New
'facet-2':'1', # ThinkPad
'facet-3':'14', # T series
'facet-5':'4', # 14 inch screen
# 'facet-11':'2' # no touchscreen
},
# 'P series': {
# 'category-id': category_id,
# 'sort-criteria': '2',
# 'page-size': '100',
# 'facet-1':'1', # New
# 'facet-2':'1', # ThinkPad
# 'facet-3':'10', # P series
# 'facet-4':'1,2,3', # price < 1199
# 'facet-5':'5', # 15 inch screen
# 'facet-11':'2' # no touchscreen
# }
}
items = []
for series, data in data.items():
for attempt in xrange(1, NUM_ATTEMPTS + 1):
try:
item_url_prefix, series_items = fetch_items(result_url, data)
logging.info('%d item(s) found for %s', len(series_items), series)
items.extend(series_items)
except RuntimeError as ex:
logging.error('Failed to parse page content, %s, attempt # %d', ex, attempt)
else:
break
else:
continue
if not items:
return
for item in items:
item_url = item_url_prefix + '&page=1&itemid=' + item
logging.debug('Fetch item URL: %s', item_url)
for attempt in xrange(1, NUM_ATTEMPTS + 1):
try:
check_item(item, item_url, criteria)
except RuntimeError as ex:
logging.error('Failed to parse page content, %s, attempt # %d', ex, attempt)
time.sleep(3)
else:
break
else:
logging.error('Failed to check item: %s', item)
time.sleep(3)
time.sleep(0.5)
def args():
parser = argparse.ArgumentParser()
parser.add_argument('--interval', dest='interval', type=int, default=DEF_INTERVAL)
return parser.parse_args()
def main():
criteria = {
'ThinkPad T460s': {
'inventory': ('Limited quantity available', 'In stock'),
'condition': ('New', ),
'processor': ('6300', '6600'),
'os': ('Professional', 'Pro', 'professional', 'pro'),
'price': 800
},
'ThinkPad P50': {
'inventory': ('Limited quantity available', 'In stock'),
'condition': ('New', ),
'price': 1000
}
}
interval = args().interval
if interval < 0:
logging.info('Adjusted polling interval to %d seconds',
DEF_INTERVAL)
interval = DEF_INTERVAL
while True:
check(criteria)
for _ in progressbar(xrange(interval)):
time.sleep(1)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
logging.info('Quit')
sys.exit(0)
@erickfung
Copy link

Not working
It needs markupbase, but when I pip install it says there is no matching distribution found for markupbase
Any update?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment