Created
December 12, 2015 12:47
-
-
Save lotabout/47537cdecf92001c9043 to your computer and use it in GitHub Desktop.
for Raspberry Pi: check mail and execute (download) tasks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# We will need 3 threads | |
import os | |
import Queue | |
from os.path import isfile, join | |
import email | |
import re | |
import subprocess | |
import threading | |
import time | |
import signal | |
mail_dir = '/media/nas/pi/mail/Pending/cur' | |
done_mail_dir = '/media/nas/pi/mail/Ended/cur' | |
you_get_down_dir = '/share/samba/DLNA/Video/' | |
DEFAULT_JOB_TYPE = 'download' | |
ALL_JOBS = {} | |
# 1. scanner, to search for new mails | |
new_mail_queue = Queue.Queue() | |
new_mail_queue_lock = threading.Lock() | |
class Monitor(threading.Thread): | |
def __init__(self, mail_dir, new_mail_queue): | |
threading.Thread.__init__(self) | |
self.mail_dir = mail_dir | |
self.mails = set() | |
self.mail_queue = new_mail_queue | |
def run(self): | |
while(True): | |
new_mails = self.check_new_mail() | |
for mail in new_mails: | |
print 'Got new mail: ' + mail | |
self.mail_queue.put(mail) | |
time.sleep(120) | |
def check_new_mail(self): | |
mails = set([f for f in os.listdir(self.mail_dir) if isfile(join(self.mail_dir, f))]) | |
deleted = self.mails - mails | |
new_mails = mails - self.mails | |
self.mails = (mails & self.mails) | new_mails | |
return new_mails | |
monitor = Monitor(mail_dir, new_mail_queue) | |
#=============================================================================== | |
# Executor: Download, given an URL, download the resources | |
#------------------------------------------------------------------------------ | |
# Sub downloader: you-get | |
def you_get_download(url): | |
# you-get | |
subprocess.call(['you-get', url], cwd=you_get_down_dir) | |
return 'Done' | |
def bilibili_av_number_download(url): | |
url = url.strip() | |
av_numbers = re.findall('[0-9]+', url) | |
actual_urls = ['www.bilibili.com/video/av'+av for av in av_numbers] | |
args = ['you-get'] + actual_urls | |
subprocess.call(args, cwd=you_get_down_dir) | |
def universal_download(url): | |
print 'universal' | |
return 'Done' | |
YOU_GET_SITES = set(['163', '56', 'acfun', 'archive', 'baidu', 'bandcamp', 'baomihua', 'bilibili', 'cntv', 'cbs', 'dailymotion', 'dilidili', 'dongting', 'douban', 'douyutv', 'ehow', 'facebook', 'flickr', 'freesound', 'fun', 'google', 'heavy-music', 'iask', 'ifeng', 'in', 'instagram', 'interest', 'iqilu', 'iqiyi', 'isuntv', 'joy', 'jpopsuki', 'kankanews', 'khanacademy', 'ku6', 'kugou', 'kuwo', 'letv', 'lizhi', 'magisto', 'metacafe', 'miomio', 'mixcloud', 'mtv81', 'musicplayon', '7gogo', 'nicovideo', 'pinterest', 'pixnet', 'pptv', 'qianmo', 'qq', 'sina', 'smgbb', 'sohu', 'soundcloud', 'ted', 'theplatform', 'tucao', 'tudou', 'tumblr', 'twitter', 'vidto', 'vimeo', 'weibo', 'veoh', 'vine', 'vk', 'xiami', 'xiaokaxiu', 'yinyuetai', 'miaopai', 'youku', 'youtu', 'youtube', 'zhanqi']) | |
from urlparse import urlparse | |
def exec_download(url): | |
url = url.strip() | |
# check if is is bilibili's URL | |
if re.search('^(av)?[0-9][0-9 ]+$', url) is not None: | |
return bilibili_av_number_download(url) | |
# find out the site name | |
hostname = urlparse(url).hostname | |
if hostname is not None: | |
hostname_components = set(hostname.split('.')) | |
if len(hostname_components) > len(hostname_components - YOU_GET_SITES): | |
return you_get_download(url) | |
else: | |
return universal_download(url) | |
else: | |
print "Invalid URL: ", url | |
#=============================================================================== | |
# Executor: execute a job, with (job-type, job content) | |
executor_map = {'download': exec_download} | |
callback_map = {} | |
def exec_do_nothing(arg): | |
return 'Pass...' | |
class Executor(threading.Thread): | |
def __init__(self, (job_type, job_content)): | |
threading.Thread.__init__(self) | |
self.job_type = job_type | |
self.job_content = job_content | |
def run(self): | |
if self.job_type in executor_map: | |
executor = executor_map[self.job_type] | |
else: | |
executor = exec_do_nothing | |
try: | |
print "Start job (%s, %s)" % (self.job_type, self.job_content) | |
ret = executor(self.job_content) | |
print "Done running job (%s, %s)" % (self.job_type, self.job_content) | |
except: | |
print "Error durning job (%s, %s)" % (self.job_type, self.job_content) | |
return | |
if self.job_type in callback_map: | |
callback = callback_map[self.job_type] | |
callback(ret) | |
return ret | |
#=============================================================================== | |
# Job: maintaining jobs | |
class Job(threading.Thread): | |
def __init__(self, (mail_name, sub_jobs)): | |
threading.Thread.__init__(self) | |
self.name = mail_name | |
self.jobs = sub_jobs | |
def run(self): | |
# execute each job and wait for them to stop | |
executors = [Executor(j) for j in self.jobs] | |
print "[Debug] starting executors for job", self.jobs | |
[e.start() for e in executors] | |
try: | |
[e.join() for e in executors] | |
# TODO: add condition for checking running status | |
self.done() | |
except: | |
pass | |
def done(self): | |
print 'Moving Mail from: ', join(mail_dir, self.name), ' -> ', join(done_mail_dir, self.name) | |
# move the mail to Done | |
os.rename(join(mail_dir, self.name), join(done_mail_dir, self.name)) | |
# 5. Define executors for each job type | |
# need to come before the dispatcher because python needs definition comes | |
# before use | |
def execute_command(args): | |
"""Execute the command use subprocess.Popen(args) and store the information.""" | |
pid = subprocess.Popen(args) | |
global ALL_JOBS | |
#============================================================================== | |
# Runner: fething mails and create jobs | |
# 3. parse email contents | |
# the format of the mail: | |
# | |
# type: the type of command | |
# #1 job's arguments | |
# #2 job's arguments | |
# <newline needed> | |
# type: type 2 | |
# #1 job's argument | |
# | |
# or | |
# | |
# <type>: arguments needed | |
# For example: | |
# type: you-get | |
# www.youku.com/xxxx/xxxx/xxx | |
def parse_entry(entry, default_type): | |
re_type_args = re.compile('^([a-zA-Z0-9-_]+)(?:\s*:\s+|\s+:\s*)(.+)') | |
job_type = re_type_args.search(entry) | |
if job_type is not None: | |
return job_type.group(1).lower(), job_type.group(2) | |
else: | |
return default_type, entry | |
def parse_entries(entries): | |
global DEFAULT_JOB_TYPE | |
jobs = [] | |
default_job_type = DEFAULT_JOB_TYPE | |
for entry in entries: | |
job_type, args = parse_entry(entry, default_job_type) | |
if job_type == 'type': | |
default_job_type = args | |
else: | |
jobs.append((job_type, args)) | |
return jobs | |
def parse_records(records): | |
jobs = [] | |
for record in records: | |
entries = record.split('\n') | |
jobs.extend(parse_entries(entries)) | |
return jobs | |
def parse_content(content): | |
sep = re.compile('(?:\n)([ \t\r\f]*\n)+', re.MULTILINE) | |
records = sep.split(content.strip()) | |
return parse_records(records) | |
# read mail content | |
def get_email_contents(mail_dir, mail): | |
with open(join(mail_dir, mail)) as f: | |
msg = email.message_from_file(f) | |
content = [] | |
if msg.is_multipart(): | |
for part in msg.get_payload(): | |
content.append(part.get_payload()) | |
else: | |
content.append(msg.get_payload()) | |
return content | |
def parse_mail(mail_dir, mail): | |
contents = get_email_contents(mail_dir, mail) | |
print '[Debug] parse_mail: contents = ', contents | |
jobs = [] | |
for content in contents: | |
jobs.extend(parse_content(content)) | |
return (mail, jobs) | |
class Runner(threading.Thread): | |
def __init__(self, mail_dir, new_mail_queue): | |
threading.Thread.__init__(self) | |
self.mail_dir = mail_dir | |
self.mail_queue = new_mail_queue | |
def run(self): | |
while(True): | |
try: | |
mail = self.mail_queue.get() | |
print "[Debug] Runner get mail:", mail | |
job = Job(parse_mail(self.mail_dir, mail)) | |
job.run() | |
except: | |
pass | |
runner = Runner(mail_dir, new_mail_queue) | |
monitor.start() | |
runner.start() | |
runner.join() # will not stop | |
monitor.join() # will not stop |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment