Last active
May 2, 2017 07:57
-
-
Save wwj718/095c661a11c6f55ce64332f0fb91be52 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
import os | |
import datetime | |
import shutil | |
from shutil import copytree, ignore_patterns | |
from shutil import copyfile | |
import time | |
import subprocess | |
import hashlib | |
''' | |
todo 使用watchdog来写 | |
''' | |
#PI_CODETEST ="/home/pi/mylab/raspberrypi_lab/codetest.py" #todo 拷贝到home里,~/udist_codetest/ 或者 /home/pi/codetest.py | |
PI_CODETEST ="/home/pi/codetest.py" #todo 拷贝到home里,~/udist_codetest/ 或者 /home/pi/codetest.py | |
last_md5_udisk_codetest = "" | |
def generate_file_md5(file): | |
md5 = hashlib.md5(open(file, 'rb').read()).hexdigest() | |
print({"file_md5":md5}) | |
return md5 | |
def run_it(filepath): | |
# todo:重构为log | |
global last_md5_udisk_codetest | |
current_file_md5 = generate_file_md5(filepath) | |
if current_file_md5 != last_md5_udisk_codetest: | |
try: | |
copyfile(filepath,PI_CODETEST) | |
subprocess.Popen(["pkill","-f",'codetest']) | |
print("running!") | |
subprocess.call("/usr/bin/python {}".format(PI_CODETEST),shell=True) #会自己退出? | |
print("run complate!") | |
except Exception as e: | |
print(e) | |
finally: | |
last_md5_udisk_codetest = current_file_md5 | |
else: | |
return | |
def order_by_mtime(file): | |
return time.ctime(os.stat(file).st_mtime) | |
def get_newest_codetest(u_root): | |
''' | |
在u盘第一层目录里找 | |
需要获取最新的文件的原因是,学生可能会拖动多个文件进去 | |
''' | |
udisk_dir = '/media/pi/%s' % u_root | |
codetestlist=[] | |
for filename in os.listdir(udisk_dir): | |
if filename.startswith("codetest"): | |
codetestlist.append(os.path.join(udisk_dir,filename)) | |
if codetestlist: | |
#max([time.ctime(os.stat(os.path.join(udisk_dir,codetest)).st_mtime) for codetest in codetestlist]) | |
#newest = sorted(codetestlist, key=order_by_mtime) | |
newest = sorted(codetestlist, key=lambda file:time.ctime(os.stat(file).st_mtime)) | |
if newest: | |
return newest[-1] | |
else: | |
return | |
while True: | |
print('sleep 3s') | |
time.sleep(3) | |
print("watching /media/pi/ ...") | |
udisk_exists = os.path.exists('/media/pi/') | |
if not udisk_exists: | |
print("can not find u disk") | |
continue | |
else: | |
media_pi = os.listdir('/media/pi/') | |
try : | |
for u_root in media_pi: | |
if u_root: | |
#source = '/media/pi/%s' % u_root | |
newest_codetest = get_newest_codetest(u_root) | |
udisk_codetest = newest_codetest | |
if not udisk_codetest: | |
print("can not find the codetest(.*).py in the u disk") | |
continue | |
else: | |
print("find the codetest(.*).py in the u disk") | |
run_it(udisk_codetest) | |
except Exception as e: | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment