Skip to content

Instantly share code, notes, and snippets.

@jashsu
Last active January 29, 2020 10:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jashsu/0d4f7fb0eebbf7f1ee9c9cae7046165e to your computer and use it in GitHub Desktop.
Save jashsu/0d4f7fb0eebbf7f1ee9c9cae7046165e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import requests, smtplib, sys, argparse, re
from time import sleep, ctime
from random import random
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
from twilio.rest import Client
from requests.exceptions import ConnectionError
class ProgressBar(object):
def __init__(self):
self.counter = 0
def increment(self, n=1):
self.counter += n
def draw(self, tick='.'):
draw_out = ''
if self.counter % 50 == 0: draw_out += '\n[{}] '.format(ctime())
if self.counter % 10 == 0: draw_out += tick # For optional fancier gfx
else: draw_out += tick
sys.stdout.write(draw_out)
sys.stdout.flush()
class PageMonitor(object):
def __init__(self, args):
print('[{}] Initializing...'.format(ctime()))
self.args = args
pass
def _monitor(self):
pb = ProgressBar()
session = requests.Session()
if self.args.preface_url is not None:
print('[{}] Visiting preface URL...'.format(ctime()))
session.get(self.args.preface_url)
print '[{}] Starting monitoring... (sleep {}+~{}sec)'.format(ctime(), self.args.monitor_sleep_time, self.args.monitor_sleep_rand)
while True:
pb.draw()
pb.increment()
r = session.get(self.args.target_url)
f = [m.start() for m in re.finditer(self.args.target_string, r.text)]
# u'カートに入れる' u'お品切れ'
if len(f) >= self.args.target_string_min:
break
else:
sleep(self.args.monitor_sleep_time + random() * self.args.monitor_sleep_rand)
def monitor_with_connection_retry(self):
retry = True
while retry:
try:
self._monitor()
except ConnectionError:
print('\n')
print('[{}] Encountered ConnectionError, retrying monitor after {} sec cooldown...'.format(ctime(), self.args.retry_cooldown))
sleep(self.args.retry_cooldown)
else:
# retry = False
self.notify()
print('Sleeping 5 minutes...')
sleep(300) #sleep 5 mins then continue monitoring
def notify(self):
print('\n')
self._call_phone()
self._send_email()
def _send_email(self):
print('[{}] Composing email...'.format(ctime()))
message = MIMEMultipart()
message['From'] = 'noreply'
message['To'] = self.args.email_recipient
message['Subject'] = 'PageMonitor found the target string! ({})[{}]'.format(self.args.task_name, ctime())
html = """<html><head></head><body><p>PageMonitor found the target string! ({tn})[{ct}]<br><br><a href="{target_url}">{target_url}</a></p></body></html>""".\
format(tn=self.args.task_name, ct=ctime(), target_url=self.args.target_url)
message.attach(MIMEText(html, 'html'))
print('[{}] Sending email...'.format(ctime()))
smtp = smtplib.SMTP()
smtp.connect(self.args.email_server_url, self.args.email_server_port)
smtp.ehlo()
smtp.starttls()
smtp.login(self.args.email_server_user, self.args.email_server_pswd)
smtp.sendmail('noreply', self.args.email_recipient, message.as_string())
smtp.close()
print('[{}] Sent email!'.format(ctime()))
def _call_phone(self):
print('[{}] Calling phone...'.format(ctime()))
client = Client(self.args.twilio_account_sid, self.args.twilio_auth_token)
client.calls.create(url='http://demo.twilio.com/docs/voice.xml', to=self.args.twilio_dest_number, from_=self.args.twilio_src_number)
print('[{}] Called phone!'.format(ctime()))
def parse_args():
parser = argparse.ArgumentParser(description='Monitor a webpage for a target string')
parser.add_argument('task_name', help='convenient task name for ps and notifications')
parser.add_argument('target_url', help='URL of webpage to monitor (URL encoded)')
parser.add_argument('target_string', type=lambda s: unicode(s, 'utf8'), help='string to find in the webpage (UTF-8 encoded)')
parser.add_argument('--target_string_min', type=int, default=1, help='minimum number of instances of target string to trigger a notification (default: 1)')
parser.add_argument('email_recipient', help='email address to receive notification')
parser.add_argument('email_server_user', help='username for sending email server')
parser.add_argument('email_server_pswd', help='password for sending email server')
parser.add_argument('--email_server_url', default='smtp.gmail.com', help='url for sending email server (default: smtp.gmail.com)')
parser.add_argument('--email_server_port', type=int, default=587, help='port for sending email server (default: 587)')
parser.add_argument('twilio_dest_number', help='twilio number to receive notification (+1XXXYYYZZZZ format)')
parser.add_argument('twilio_src_number', help='twilio number to send call from (+1XXXYYYZZZZ format)')
parser.add_argument('twilio_account_sid', help='twilio account SID')
parser.add_argument('twilio_auth_token', help='twilio auth token')
parser.add_argument('--monitor_sleep_time', type=int, default=60, help='seconds to sleep between checks (default: 30)')
parser.add_argument('--monitor_sleep_rand', type=int, default=30, help='seconds to additionally sleep upto between checks (default: 10)')
parser.add_argument('--preface_url', help='URL of webpage to visit before beginning monitoring (default: None, URL encoded)')
parser.add_argument('--retry_cooldown', type=int, default=60, help='seconds to cooldown after a ConnectionError before restarting monitor')
args = parser.parse_args()
return args
def main():
args = parse_args()
pc = PageMonitor(args)
pc.monitor_with_connection_retry()
#pc.notify()
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment