Skip to content

Instantly share code, notes, and snippets.

@lotabout
Created December 12, 2015 12:47
Show Gist options
  • Save lotabout/47537cdecf92001c9043 to your computer and use it in GitHub Desktop.
Save lotabout/47537cdecf92001c9043 to your computer and use it in GitHub Desktop.
for Raspberry Pi: check mail and execute (download) tasks.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# We will need 3 threads
import os
import Queue
from os.path import isfile, join
import email
import re
import subprocess
import threading
import time
import signal
mail_dir = '/media/nas/pi/mail/Pending/cur'
done_mail_dir = '/media/nas/pi/mail/Ended/cur'
you_get_down_dir = '/share/samba/DLNA/Video/'
DEFAULT_JOB_TYPE = 'download'
ALL_JOBS = {}
# 1. scanner, to search for new mails
new_mail_queue = Queue.Queue()
new_mail_queue_lock = threading.Lock()
class Monitor(threading.Thread):
def __init__(self, mail_dir, new_mail_queue):
threading.Thread.__init__(self)
self.mail_dir = mail_dir
self.mails = set()
self.mail_queue = new_mail_queue
def run(self):
while(True):
new_mails = self.check_new_mail()
for mail in new_mails:
print 'Got new mail: ' + mail
self.mail_queue.put(mail)
time.sleep(120)
def check_new_mail(self):
mails = set([f for f in os.listdir(self.mail_dir) if isfile(join(self.mail_dir, f))])
deleted = self.mails - mails
new_mails = mails - self.mails
self.mails = (mails & self.mails) | new_mails
return new_mails
monitor = Monitor(mail_dir, new_mail_queue)
#===============================================================================
# Executor: Download, given an URL, download the resources
#------------------------------------------------------------------------------
# Sub downloader: you-get
def you_get_download(url):
# you-get
subprocess.call(['you-get', url], cwd=you_get_down_dir)
return 'Done'
def bilibili_av_number_download(url):
url = url.strip()
av_numbers = re.findall('[0-9]+', url)
actual_urls = ['www.bilibili.com/video/av'+av for av in av_numbers]
args = ['you-get'] + actual_urls
subprocess.call(args, cwd=you_get_down_dir)
def universal_download(url):
print 'universal'
return 'Done'
YOU_GET_SITES = set(['163', '56', 'acfun', 'archive', 'baidu', 'bandcamp', 'baomihua', 'bilibili', 'cntv', 'cbs', 'dailymotion', 'dilidili', 'dongting', 'douban', 'douyutv', 'ehow', 'facebook', 'flickr', 'freesound', 'fun', 'google', 'heavy-music', 'iask', 'ifeng', 'in', 'instagram', 'interest', 'iqilu', 'iqiyi', 'isuntv', 'joy', 'jpopsuki', 'kankanews', 'khanacademy', 'ku6', 'kugou', 'kuwo', 'letv', 'lizhi', 'magisto', 'metacafe', 'miomio', 'mixcloud', 'mtv81', 'musicplayon', '7gogo', 'nicovideo', 'pinterest', 'pixnet', 'pptv', 'qianmo', 'qq', 'sina', 'smgbb', 'sohu', 'soundcloud', 'ted', 'theplatform', 'tucao', 'tudou', 'tumblr', 'twitter', 'vidto', 'vimeo', 'weibo', 'veoh', 'vine', 'vk', 'xiami', 'xiaokaxiu', 'yinyuetai', 'miaopai', 'youku', 'youtu', 'youtube', 'zhanqi'])
from urlparse import urlparse
def exec_download(url):
url = url.strip()
# check if is is bilibili's URL
if re.search('^(av)?[0-9][0-9 ]+$', url) is not None:
return bilibili_av_number_download(url)
# find out the site name
hostname = urlparse(url).hostname
if hostname is not None:
hostname_components = set(hostname.split('.'))
if len(hostname_components) > len(hostname_components - YOU_GET_SITES):
return you_get_download(url)
else:
return universal_download(url)
else:
print "Invalid URL: ", url
#===============================================================================
# Executor: execute a job, with (job-type, job content)
executor_map = {'download': exec_download}
callback_map = {}
def exec_do_nothing(arg):
return 'Pass...'
class Executor(threading.Thread):
def __init__(self, (job_type, job_content)):
threading.Thread.__init__(self)
self.job_type = job_type
self.job_content = job_content
def run(self):
if self.job_type in executor_map:
executor = executor_map[self.job_type]
else:
executor = exec_do_nothing
try:
print "Start job (%s, %s)" % (self.job_type, self.job_content)
ret = executor(self.job_content)
print "Done running job (%s, %s)" % (self.job_type, self.job_content)
except:
print "Error durning job (%s, %s)" % (self.job_type, self.job_content)
return
if self.job_type in callback_map:
callback = callback_map[self.job_type]
callback(ret)
return ret
#===============================================================================
# Job: maintaining jobs
class Job(threading.Thread):
def __init__(self, (mail_name, sub_jobs)):
threading.Thread.__init__(self)
self.name = mail_name
self.jobs = sub_jobs
def run(self):
# execute each job and wait for them to stop
executors = [Executor(j) for j in self.jobs]
print "[Debug] starting executors for job", self.jobs
[e.start() for e in executors]
try:
[e.join() for e in executors]
# TODO: add condition for checking running status
self.done()
except:
pass
def done(self):
print 'Moving Mail from: ', join(mail_dir, self.name), ' -> ', join(done_mail_dir, self.name)
# move the mail to Done
os.rename(join(mail_dir, self.name), join(done_mail_dir, self.name))
# 5. Define executors for each job type
# need to come before the dispatcher because python needs definition comes
# before use
def execute_command(args):
"""Execute the command use subprocess.Popen(args) and store the information."""
pid = subprocess.Popen(args)
global ALL_JOBS
#==============================================================================
# Runner: fething mails and create jobs
# 3. parse email contents
# the format of the mail:
#
# type: the type of command
# #1 job's arguments
# #2 job's arguments
# <newline needed>
# type: type 2
# #1 job's argument
#
# or
#
# <type>: arguments needed
# For example:
# type: you-get
# www.youku.com/xxxx/xxxx/xxx
def parse_entry(entry, default_type):
re_type_args = re.compile('^([a-zA-Z0-9-_]+)(?:\s*:\s+|\s+:\s*)(.+)')
job_type = re_type_args.search(entry)
if job_type is not None:
return job_type.group(1).lower(), job_type.group(2)
else:
return default_type, entry
def parse_entries(entries):
global DEFAULT_JOB_TYPE
jobs = []
default_job_type = DEFAULT_JOB_TYPE
for entry in entries:
job_type, args = parse_entry(entry, default_job_type)
if job_type == 'type':
default_job_type = args
else:
jobs.append((job_type, args))
return jobs
def parse_records(records):
jobs = []
for record in records:
entries = record.split('\n')
jobs.extend(parse_entries(entries))
return jobs
def parse_content(content):
sep = re.compile('(?:\n)([ \t\r\f]*\n)+', re.MULTILINE)
records = sep.split(content.strip())
return parse_records(records)
# read mail content
def get_email_contents(mail_dir, mail):
with open(join(mail_dir, mail)) as f:
msg = email.message_from_file(f)
content = []
if msg.is_multipart():
for part in msg.get_payload():
content.append(part.get_payload())
else:
content.append(msg.get_payload())
return content
def parse_mail(mail_dir, mail):
contents = get_email_contents(mail_dir, mail)
print '[Debug] parse_mail: contents = ', contents
jobs = []
for content in contents:
jobs.extend(parse_content(content))
return (mail, jobs)
class Runner(threading.Thread):
def __init__(self, mail_dir, new_mail_queue):
threading.Thread.__init__(self)
self.mail_dir = mail_dir
self.mail_queue = new_mail_queue
def run(self):
while(True):
try:
mail = self.mail_queue.get()
print "[Debug] Runner get mail:", mail
job = Job(parse_mail(self.mail_dir, mail))
job.run()
except:
pass
runner = Runner(mail_dir, new_mail_queue)
monitor.start()
runner.start()
runner.join() # will not stop
monitor.join() # will not stop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment