Skip to content

Instantly share code, notes, and snippets.

@zxkane
Last active December 18, 2015 20:29
Show Gist options
  • Save zxkane/a61d9103be232c4f2cd9 to your computer and use it in GitHub Desktop.
Save zxkane/a61d9103be232c4f2cd9 to your computer and use it in GitHub Desktop.
A python program is intended to automatically order goods in second kill of Amazon.cn. Prerequisites: requests, lxml, selenium
*.pyc
.settings/
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>z-seckill</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.python.pydev.pythonNature</nature>
</natures>
</projectDescription>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?>
<pydev_project>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/z-seckill</path>
</pydev_pathproperty>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">python2.7</pydev_property>
</pydev_project>
## {{{ http://code.activestate.com/recipes/577187/ (r9)
from Queue import Queue
from threading import Thread
import datetime
def consoleOutput(message):
print "%s: %s" %(str(datetime.datetime.now()), message)
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, threadno, tasks, exceptions):
Thread.__init__(self)
self.threadno = threadno
self.tasks = tasks
self.exceptions = exceptions
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
consoleOutput("Thread #%s start to execute task" %(self.threadno))
try: func(*args, **kargs)
except KeyboardInterrupt: pass
except Exception, e:
self.exceptions.append(e)
consoleOutput("Thread #%s complete to execute task" %(self.threadno))
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
self.exceptions = list()
for _ in range(num_threads): Worker(_, self.tasks, self.exceptions)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def getErrors(self):
"""Get exceptions if some happened"""
return self.exceptions
## end of http://code.activestate.com/recipes/577187/ }}}
#! /bin/sh
export PYTHONUNBUFFERED=x
export DISPLAY=:0
cd /home/kane/git/z-seckill
git pull
cd /home/kane/.zseckill
python /home/kane/git/z-seckill/zseckill.py -d 1>>`date '+%F'`.txt 2>&1 &
#!/usr/bin/env python
# coding=utf-8
'''
zseckill -- second kill program for z.cn
zseckill is a program to automatically submit second kill of z.cn.
It defines classes_and_methods
@author: kane
@copyright: 2013 organization_name. All rights reserved.
@license: EPL
@contact: kane.mx@gmail.com
@deffield updated: Updated
'''
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from collections import deque
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, TimeoutException,\
ElementNotVisibleException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from smtplib import SMTPException
from threadpool import ThreadPool
import argparse
import datetime
import os
from os.path import expanduser
import re
import string
import sys
import time
import traceback
import ConfigParser
import requests
from lxml import etree
from requests.exceptions import RequestException
import json
import threading
import code
import signal
from selenium.webdriver.support.expected_conditions import presence_of_element_located
__all__ = []
__version__ = 0.1
__date__ = '2013-06-07'
__updated__ = '2013-06-07'
TESTRUN = 0
PROFILE = 0
def loginZcn(driver, email, password):
driver.get("http://www.amazon.cn/gp/yourstore/home/ref=pd_ys_home_signin?ie=UTF8&signIn=1")
emailElement = driver.find_element_by_xpath("//*[@id=\"ap_email\"]")
emailElement.send_keys(email)
passElement = driver.find_element_by_xpath("//*[@id=\"ap_password\"]")
passElement.send_keys(password)
driver.find_element_by_xpath("//*[@id=\"signInSubmit-input\"]").click()
StarttimePattern = re.compile("\d{1,2}:\d{2}")
defaultTimeout = 10
class SecKillGoods:
def __init__(self, driver, upcoming = True):
zkillLink = "//*[@id=\"nav-cross-shop-links\"]/li[2]/a"
WebDriverWait(driver, defaultTimeout).until(lambda s: s.find_element(By.XPATH, zkillLink).is_displayed())
self.driver = driver
driver.find_element_by_xpath(zkillLink).click()
try:
if upcoming:
upcomingTab = "//*[@id=\"upcoming_filter\"]/span"
WebDriverWait(driver, defaultTimeout).until(lambda s: s.find_element(By.XPATH, upcomingTab).is_displayed())
self.parent = driver.find_elements_by_xpath("//*[@class=\"ONETHIRTYFIVE-HERO\"]")[0]
self.parent.find_element_by_xpath(upcomingTab).click()
else:
ongoingTab = "//*[@id=\"available_filter\"]/span"
WebDriverWait(driver, defaultTimeout).until(lambda s: s.find_element(By.XPATH, ongoingTab).is_displayed())
self.parent = driver.find_elements_by_xpath("//*[@class=\"ONETHIRTYFIVE-HERO\"]")[0]
self.parent.find_element_by_xpath(ongoingTab).click()
except (NoSuchElementException, ElementNotVisibleException):
pass #fall back to all goods
time.sleep(2);
self.goodsInCurrentPage = None
self.goodsIndex = 0
self.totalPage = int(self.getSpanText("//div[@class='ONETHIRTYFIVE-HERO']/div[3]/div[3]/span[@id='dealTotalPages']"))
def getSpanText(self, xpath):
function = "var getElementByXpath = function (path) {return document.evaluate(path, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;};"
script = "%s return getElementByXpath(\"%s\").innerHTML;" %(function, xpath)
return self.driver.execute_script(script)
def __iter__(self):
return self
def next(self):
goods = None
if self.goodsInCurrentPage is None:
seckillXPath = "//*[@id=\"shove_01\"]/div[2]/ul/*"
try:
WebDriverWait(self.parent, defaultTimeout).until(lambda s: s.find_element(By.XPATH, seckillXPath).is_displayed())
#waiting for loading of item to avoid stale element exception
WebDriverWait(self.parent, defaultTimeout).until(lambda s: s.find_element(By.XPATH, "//*[@id=\"dealImage\"]").is_displayed())
self.goodsInCurrentPage = self.parent.find_elements_by_xpath(seckillXPath)
goods = self.goodsInCurrentPage[self.goodsIndex]
self.goodsIndex += 1
except TimeoutException, e:
if u'暂无分秒必争活动。' == self.parent.find_element_by_xpath("//*[@id=\"shove_01\"]/div[2]/div").text:
raise StopIteration
else:
raise e
else:
if self.goodsIndex == len(self.goodsInCurrentPage): #need go next page or already reach last one
currentPage = int(self.getSpanText("//div[@class='ONETHIRTYFIVE-HERO']/div[3]/div[3]/span[@id='dealCurrentPage']"))
if currentPage == self.totalPage:
raise StopIteration
else:
self.parent.find_element_by_xpath("//*[@id=\"rightShovelBg\"]").click()
self.goodsIndex = 0
self.goodsInCurrentPage = None
return next(self)
else:
goods = self.goodsInCurrentPage[self.goodsIndex]
try:
goods.find_element_by_xpath(TitleXPath)
except NoSuchElementException:
try:
goods.find_element_by_xpath(UpcomingTitleXPath)
except NoSuchElementException:
raise StopIteration
self.goodsIndex += 1
return goods
def getGoods(driver, goodsTitle):
def matchByTitle(goods, title):
try:
title = goods.find_element_by_xpath(TitleXPath).get_attribute("title")
if not title.startswith(goodsTitle):
return False
except NoSuchElementException:
try:
title = goods.find_element_by_xpath(UpcomingTitleXPath).get_attribute("title")
except NoSuchElementException:
return False
if not title.startswith(goodsTitle):
return False
return True
try:
return next(i for i in SecKillGoods(driver) if matchByTitle(i, goodsTitle))
except StopIteration:
try:
return next(i for i in SecKillGoods(driver, False) if matchByTitle(i, goodsTitle))
except StopIteration:
return None
TitleXPath = ".//*[@id=\"dealTitleLink\"]"
UpcomingTitleXPath = ".//div[3]"
PriceXPath = ".//*[@id=\"dealDealPrice\"]/b"
DealXPath = ".//*[@id=\"dealActionButton\"]/img"
UpcomingStartTimeXPath = ".//div[4]/div/b"
CloseAndContinueXPath = "//*[@id=\"ap_container\"]/div[@class='ap_popover ap_popover_sprited']/div[@class='ap_close']/a/span[1]"
DealStateXPath = ".//*[@id=\"dealStateContent\"]/div/b"
WaitingXPath = ".//*[@id=\"ap_container\"]/div/div[2]/div[2]/div/div/div/span"
CloseBtnXPath = "//*[@id=\"ap_container\"]/div/div[5]/a"
def getWaitingSeconds(goods):
try:
waitingText = goods.find_element_by_xpath(".//div[4]/div/span[span]").text
waiting = re.compile(u"(\d{2})小时.*(\d{2})分.*(\d{2})秒").search(waitingText)
interval = int(waiting.group(1))*60*60 + int(waiting.group(2))*60 + int(waiting.group(3))
return interval
except NoSuchElementException:
return 0
def notifyUser(loginuser, loginpass, smtpserver, fromaddr, to, msg, subject):
try:
import smtplib
# The actual mail send
server = smtplib.SMTP(smtpserver)
server.starttls()
if loginuser is not None:
server.login(loginuser,loginpass)
body = string.join((
u"From: \"%s\" <%s>" % (u"z秒杀robot", fromaddr),
u"To: %s" % to,
"Subject: [%s] %s".encode("utf-8") %("z秒杀".encode("utf-8"), subject.encode("utf8")),
"",
msg.encode("utf8"),
), "\r\n")
server.sendmail(fromaddr, to, body)
server.quit()
print "\tEmail notification was sent to %s." %(to)
except SMTPException, e:
sys.stderr.write(repr(e) + "\n")
traceback.print_exc()
def doKillByTitle(args):
driver = getWebDriver()
driver.set_window_size(1440, 900)
try:
goodsTitle = args.target
killgoods = KillGoodsThread(1, driver, args, [], threading.Lock())
goods = getGoods(driver, goodsTitle)
if goods is None:
return {'status': -1, 'msg': "Didn't find expected goods whose title starts with '%s'" % (goodsTitle)}
try:
title = killgoods.getTitleUntilStarted(goods)
killgoods.killGoods(title, goods, True)
except Exception:
return {'status': -1, 'msg': "Exception happened in killing process for goods '%s'" % (goodsTitle)}
finally:
driver.close()
def getWebDriver():
return webdriver.Firefox()
# chromedriver = "/home/kane/Downloads/chromedriver"
# return webdriver.Chrome(chromedriver)
def getSecKillList(args):
killList = []
driver = getWebDriver()
driver.set_window_size(1440, 900)
try:
loginZcn(driver, args.email, args.password)
seckillGoods = SecKillGoods(driver)
UpcomingStartTimeXPath = ".//div[4]/div/b"
for goods in seckillGoods:
# title = goods.find_element_by_xpath(UpcomingTitleXPath).get_attribute("title")
try:
startTime = goods.find_element_by_xpath(UpcomingStartTimeXPath).text
startTimeStamp = time.mktime(time.strptime(time.strftime("%d %b %Y", time.localtime()) + " " + StarttimePattern.findall(startTime)[0], "%d %b %Y %H:%M"))
if killList.count(startTimeStamp) == 0:
killList.append(startTimeStamp)
except NoSuchElementException:
# sometimes the already started one might appear in upcoming list
pass
except Exception, e:
print startTime
raise e
return killList
finally:
driver.close()
class KillGoodsThread(object):
defaultInterval = 180
def __init__(self, num, driver, args, ignoreList, lock):
self.args = args
self.num = num
self.ignoredList = ignoreList
self.lock = lock
if driver is None:
self.driver = getWebDriver()
self.driver.set_window_size(1440, 900)
self.driver.set_script_timeout(15)
else:
self.driver = driver
loginZcn(self.driver, args.email, args.password)
def getTitleUntilStarted(self, goods):
title = None
try:
title = goods.find_element_by_xpath(TitleXPath).get_attribute("title")
except NoSuchElementException:
title = goods.find_element_by_xpath(UpcomingTitleXPath).get_attribute("title")
startTime = goods.find_element_by_xpath(UpcomingStartTimeXPath).text
print "\t\tThread #%s: %s will be start at %s\n" % (self.num, title, startTime)
interval = getWaitingSeconds(goods)
if interval > self.defaultInterval:
print "\t\tThread #%s: Stop waiting '%s' that will not be started in %d minutes" % (self.num, title, self.defaultInterval / 60)
return
print "\t\tThread #%s: will wait for %d seconds since %s" % (self.num, interval + 30, datetime.datetime.fromtimestamp(time.time()).strftime('%m-%d %H:%M:%S'))
WebDriverWait(goods, interval + 30).until(lambda s: s.find_element(By.XPATH, PriceXPath).is_displayed())
#refresh goods after seconds killing starts
title = goods.find_element_by_xpath(TitleXPath).get_attribute("title")
return title
def killGoods(self, title, goods, force = False):
print "\tThread #%s: Processing goods with title '%s'..." %(self.num, title)
priceText = goods.find_element_by_xpath(PriceXPath).text
try:
state = goods.find_element_by_xpath(DealStateXPath).text
if state == u"您在排队列表中。" or state == u"这个商品在您的购物车中。":
print "\tThread #%s: Ignore '%s' whose state is '%s'" %(self.num, title, state)
return
except NoSuchElementException:
pass
price = re.compile("[0-9]{1,3}(?:,?[0-9]{3})*").search(priceText)
price = float(price.group(0).replace(',', ''))
discount = self.discount(goods)
quantity = self.getQuantity(goods)
itemURL = goods.find_element_by_xpath(".//*[@id=\"dealImageContent\"]/div[1]/a").get_attribute("href")
try:
lowestPrice = self.getHistoryLowestPrice(itemURL)
except (RequestException, NoSuchElementException):
lowestPrice = None
if lowestPrice is None:
goodsDetail = u'%s(%s)(%s折)(数量: %s) %s' %(title, priceText, discount, (u"unknown" if quantity is None else str(quantity)), itemURL)
else:
goodsDetail = u'%s(%s)(%s折)(数量: %s)(历史最低价: %s) %s' %(title, priceText, discount, (u"unknown" if quantity is None else str(quantity)), lowestPrice, itemURL)
niceDiscount = self.isNiceDiscount(price, lowestPrice)
if (force or niceDiscount or price <= 10.0 or
self.isWished(title) or (lowestPrice is None and discount is not None and discount <= 2.0) or self.isHot(quantity)):
dealAction = goods.find_element_by_xpath(DealXPath)
if u'选择商品规格' == dealAction.get_attribute("title"):
# TODO implement selecting options
msg = u"Ignore the goods %s that has more options to be selected, though it matches my rule." %(goodsDetail)
with self.lock:
if title not in self.ignoredList:
if self.isWished(title):
self.notify(msg, u"'%s'作为wish的商品由于过多的选项无法加入购物车" %(title))
elif niceDiscount:
self.notify(msg, u"'%s'作为折扣较大的商品由于过多的选项无法加入购物车" %(title))
else:
print u"\tThread #%s: %s" %(self.num, msg)
self.ignoredList.append(title)
else:
print u"\tThread #%s: [%s] %s" %(self.num, u"已通知", msg)
return
dealAction.click()
continueXPath = "//*[@id=\"ap_container\"]/div[@class='ap_popover ap_popover_sprited']/div[@class='ap_body']/div[2]/div/table/tbody/tr/td[2]/table/tbody/tr/td/a/img"
try:
WebDriverWait(self.driver, 30).until(lambda s: s.find_element(By.XPATH, continueXPath).is_displayed())
self.notify(u"%s was added to cart.\n" %(goodsDetail), title)
self.driver.find_element_by_xpath(CloseAndContinueXPath).click()
except TimeoutException:
try:
AlreadyInWaitingListXPath = "//*[@id=\"ap_container\"]/div[@class='ap_popover ap_popover_sprited']/div[2]/div[2]/div/table/tbody/tr[2]/td/table/tbody/tr[2]/td/a/img"
WebDriverWait(self.driver, defaultTimeout).until(lambda s: s.find_element(By.XPATH, AlreadyInWaitingListXPath).is_displayed())
self.notify(u"%s is in waiting list.\n" %(goodsDetail), title)
self.driver.find_element_by_xpath(AlreadyInWaitingListXPath).click()
except TimeoutException:
try:
state = goods.find_element_by_xpath(DealStateXPath).text
if state == u"您在排队列表中。" or state == u"这个商品在您的购物车中。":
print "\tThread #%s: Ignore '%s' whose state is '%s', because it has been processed by another thread" %(self.num, title, state)
return
except NoSuchElementException:
# refresh deal button
dealAction = goods.find_element_by_xpath(DealXPath)
if u'等待列表已满' == dealAction.get_attribute("title"):
print "\tThread #%s: Ignore '%s' whose waiting list has been full" %(self.num, title)
return
print "\tThread #%s: Goods is in a unknown state, can't handle with it."
raise Exception
else:
print u"\tThread #%s: Goods %s does not satisfy my rule, so ignoring it." %(self.num, goodsDetail)
def isNiceDiscount(self, price, lowestPrice):
return lowestPrice is not None and price <= (float(lowestPrice) * 0.6)
def getHistoryLowestPrice(self, itemURL):
lowest = self.getHistoryLowestPrice_huihui(itemURL)
if lowest is not None:
return lowest
return self.getHistoryLowestPrice_etao(itemURL)
'''use online service(etao) to check the lowest price in history'''
def getHistoryLowestPrice_etao(self, url):
session = requests.Session()
session.headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:16.0) Gecko/20100101 Firefox/16.0', }
query = {"url": url}
r = session.get('http://ok.etao.com/item.htm', params=query, timeout = 10)
if r.status_code == requests.codes.ok:
tree = etree.HTML(r.text)
itemIds = tree.xpath("//*[@id=\"J_okSavingSku\"]/ul[1]/li[1]/div/@data-args")
if len(itemIds) > 0:
r = session.get('http://ok.etao.com/api/price_history.do', params = itemIds[0], timeout = 10)
if r.status_code == requests.codes.ok:
history = r.json()
return history['meta']['lowest']
else:
r.raise_for_status()
return None
else:
r.raise_for_status()
return None
'''use online service(huihui) to check the lowest price in history'''
def getHistoryLowestPrice_huihui(self, url):
session = requests.Session()
session.headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:16.0) Gecko/20100101 Firefox/16.0', }
query = {"phu": url}
r = session.get('http://zhushou.huihui.cn/productSense', params=query, timeout = 10)
if r.status_code == requests.codes.ok:
values = re.findall(r'var.*?=\s*(.*?);', r.text, re.DOTALL | re.MULTILINE)
for value in values:
return json.loads(value)['min']
else:
r.raise_for_status()
return None
'''Go thorough each goods that will be started in %%defaultInterval seconds '''
def doKill(self):
title = None
try:
for goods in SecKillGoods(self.driver):
title = None
title = self.getTitleUntilStarted(goods)
# won't start in a short time
if title is None:
return
self.killGoods(title, goods)
except Exception, e:
sys.stderr.write(repr(e) + "\n")
traceback.print_exc()
if isinstance(e, (TimeoutException)) and e.message == u'Timed out waiting for page load.':
return
if title is None:
title = "unknown"
self.driver.get_screenshot_as_file(os.getcwd() + '/screenshot-%s-%s.png' %(datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'), title.encode("utf8")))
finally:
self.driver.close()
def notify(self, msg, subject):
print "\tThread #%s: %s" %(self.num, msg)
if self.args.smtp_server is not None:
notifyUser(self.args.smtp_username, self.args.smtp_password, self.args.smtp_server, self.args.smtp_frommail, self.args.toemail, msg, subject)
def isWished(self, title):
try:
next(keyword for keyword in self.args.wishlist if keyword.rstrip() in title.encode("utf8"))
return True
except StopIteration:
return False
def discount(self, goods):
try:
discountText = goods.find_element_by_xpath(".//*[@id=\"dealPercentOff\"]").text
discount = re.compile(u'\((.*)折\)').search(discountText)
return float(discount.group(1))
except:
return None
def getQuantity(self, goods):
try:
couponCountText = goods.find_element_by_xpath(".//*[@id=\"dealTotalCouponsCount\"]").text
couponCount = re.compile(r'\d+').search(couponCountText)
return int(couponCount.group(0))
except:
return None
def isHot(self, quantity):
if quantity is not None:
return quantity <= 20
return False
######################################################
class WishListAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, self.getContent(values))
def getContent(self, wishfile):
with open(wishfile) as f:
content = f.readlines()
return content
def readConfigFile(args, configpath):
if configpath is None:
try:
homeconfig = expanduser("~") + "/.zseckill/config"
with open(homeconfig): configpath = homeconfig
except IOError:
try:
globalconfig = "/etc/zseckill/config"
with open(globalconfig): configpath = globalconfig
except IOError:
pass
if configpath is not None:
args.configfile = configpath
with open(configpath) as f:
if sys.version_info[:2] == (2, 6): # ruuning python is 2.6
config = ConfigParser.RawConfigParser()
else:
config = ConfigParser.RawConfigParser(allow_no_value=True)
config.readfp(f)
options = config.items("zseckill")
for option in options:
value = option[1]
if option[0].lower() == 'wishlist':
wishAction = WishListAction(("-w"), "wishlist")
value = wishAction.getContent(option[1])
setattr(args, option[0].lower(), value)
def main(argv=None):
'''Command line options.'''
if argv is None:
argv = sys.argv
else:
sys.argv.extend(argv)
program_name = os.path.basename(sys.argv[0])
program_version = "v0.1"
program_build_date = "%s" % __updated__
program_version_string = '%%prog %s (%s)' % (program_version, program_build_date)
#program_usage = '''usage: spam two eggs''' # optional - will be autogenerated by optparse
program_longdesc = '''''' # optional - give further explanation about what the program does
program_license = "Copyright 2013 kane (organization_name)"
try:
parser = ArgumentParser(description=program_license, formatter_class=RawDescriptionHelpFormatter)
parser.add_argument("-u", "--username", dest="email", help="username/email of amazon.cn")
parser.add_argument("-p", "--password", dest="password", help="password of amazon.cn")
parser.add_argument("-xu", "--smtp_username", dest="smtp_username", help="username for smtp server authentication")
parser.add_argument("-xp", "--smtp_password", dest="smtp_password", help="password for smtp server authentication")
parser.add_argument("-ss", "--smtp_server", dest="smtp_server", default="smtp.gmail.com:587", help="smtp server for sending mail")
parser.add_argument("-fm", "--smtp_frommail", dest="smtp_frommail", default="Zseckill robot<no-reply@z.cn", help="from mail of smtp")
parser.add_argument("-e", "--toemail", dest="toemail", help="notify email")
parser.add_argument("-w", "--wishlist", dest="wishlist", help="wish list using for daemon mode", default=(), action=WishListAction)
parser.add_argument("-v", "--verbose", dest="verbose", action="count", help="set verbosity level [default: %(default)s]")
"""
Sample config file
[zseckill]
Email = kane.mx@gmail.com
Password = xxxxxx
Smtp_username = xxx@gmail.com
Smtp_password = yyyyyy
Smtp_server = smtp.gmail.com:587
Smtp_frommail = xxx@gmail.com
Toemail = zzz@gmail.com
Wishlist = /home/user/wishlist.txt
"""
parser.add_argument("-c", "--config", dest="configfile", default=None, help="use specified config file")
# working mode, must be either 'dameon' or 'target'
workingMode = parser.add_mutually_exclusive_group(required=True)
workingMode.add_argument("-t", "--target", dest="target", help="title of second kill target")
workingMode.add_argument("-d", "--dameon", dest="dameon", help="run program as daemon mode", action='store_true')
# Process arguments
args = parser.parse_args()
readConfigFile(args, args.configfile)
if args.configfile is None and (args.email is None or args.password is None):
parser.error("Must specify (username and password) or config file.")
# MAIN BODY #
if args.dameon:
killList = getSecKillList(args)
queue = deque(killList)
concurrentThread = 2
lock = threading.Lock()
pool = ThreadPool(concurrentThread)
try:
while True:
nextItemStartTime = queue.popleft()
interval = nextItemStartTime - time.time()
if interval > 60:
interval -= 60
print "Will sleep %d seconds for next killing at %s." %(interval, datetime.datetime.fromtimestamp(nextItemStartTime).strftime('%m-%d %H:%M:%S'))
if interval > 0:
time.sleep(interval)
ignoredList = []
for _ in range(concurrentThread):
thread = KillGoodsThread(_ + 1, None, args, ignoredList, lock)
pool.add_task(thread.doKill)
#wait for them to finish (or you could go and do something else)
pool.wait_completion()
except IndexError:
# no pending second kill in today
pass
else:
rt = doKillByTitle(args)
if rt is not None:
print rt['msg']
except Exception, e:
sys.stderr.write(program_name + ": " + repr(e) + "\n")
traceback.print_exc()
return 2
def debug(sig, frame):
"""Interrupt running process, and provide a python prompt for
interactive debugging."""
d={'_frame':frame} # Allow access to frame object.
d.update(frame.f_globals) # Unless shadowed by global
d.update(frame.f_locals)
i = code.InteractiveConsole(d)
message = "Signal recieved : entering python shell.\nTraceback:\n"
message += ''.join(traceback.format_stack(frame))
i.interact(message)
def listen():
signal.signal(signal.SIGUSR1, debug) # Register handler
if __name__ == "__main__":
reload(sys)
sys.setdefaultencoding("UTF-8")
if TESTRUN:
import doctest
doctest.testmod()
if PROFILE:
import cProfile
import pstats
profile_filename = 'zseckill_profile.txt'
cProfile.run('main()', profile_filename)
statsfile = open("profile_stats.txt", "wb")
p = pstats.Stats(profile_filename, stream=statsfile)
stats = p.strip_dirs().sort_stats('cumulative')
stats.print_stats()
statsfile.close()
sys.exit(0)
listen()
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment