Skip to content

Instantly share code, notes, and snippets.

@itsPG
Created May 21, 2012 19:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itsPG/2764080 to your computer and use it in GitHub Desktop.
Save itsPG/2764080 to your computer and use it in GitHub Desktop.
auto anime-list creator. v0.4. by PG @ NCTU SenseLab. it's a part of Orihime(C) project.
# -*- coding: utf-8 -*-
import os, re, copy, time, sys, io, datetime
import http.server
import socketserver
G_path_list = []
G_scan_dir_list = []
G_search_dir_list = []
#####################################################################################
##
PORT = 12345
G_URL = "http://127.0.0.1:%d/" % PORT
G_logfile_path = "E:/Anime-New/2012Q2"
G_scan_dir_list.append("E:/Anime-New/2012Q2")
G_search_dir_list.append("E:/download_tmp")
##
#####################################################################################
if sys.platform == "win32":
import os, sys, io, win32api, win32console, pywintypes
if sys.platform != "win32":
G_logfile_path = "/Users/PG"
G_scan_dir_list.append("/Users/PG/Dropbox/code/anime/anime_test")
def change_file_encoding(f, encoding):
"""
TextIOWrapper is missing a way to change the file encoding, so we have to
do it by creating a new one.
"""
errors = f.errors
line_buffering = f.line_buffering
# f.newlines is not the same as the newline parameter to TextIOWrapper.
# newlines = f.newlines
buf = f.detach()
# TextIOWrapper defaults newline to \r\n on Windows, even though the underlying
# file object is already doing that for us. We need to explicitly say "\n" to
# make sure we don't output \r\r\n; this is the same as the internal function
# create_stdio.
return io.TextIOWrapper(buf, encoding, errors, "\n", line_buffering)
class ConsoleFile:
class FileNotConsole(Exception): pass
def __init__(self, handle):
handle = win32api.GetStdHandle(handle)
self.screen = win32console.PyConsoleScreenBufferType(handle)
try:
self.screen.GetConsoleMode()
except pywintypes.error as e:
raise ConsoleFile.FileNotConsole
def write(self, s):
self.screen.WriteConsole(s)
def close(self): pass
def flush(self): pass
def isatty(self): return True
@staticmethod
def wrap_standard_handles():
sys.stdout.flush()
try:
# There seems to be no binding for _get_osfhandle.
sys.stdout = ConsoleFile(win32api.STD_OUTPUT_HANDLE)
except ConsoleFile.FileNotConsole:
sys.stdout = change_file_encoding(sys.stdout, "utf-8")
sys.stderr.flush()
try:
sys.stderr = ConsoleFile(win32api.STD_ERROR_HANDLE)
except ConsoleFile.FileNotConsole:
sys.stderr = change_file_encoding(sys.stderr, "utf-8")
if sys.platform == "win32":
ConsoleFile.wrap_standard_handles()
class PG_anime_unit:
def __init__(self):
self.sub = ""
self.name = ""
self.path = ""
self.filename = ""
self.vol = 0
def add(self, path, filename, search_mode = False, search_list = {}):
re_a = "\[([^\[\]]+)\]" # match a [(any_char)] form
if re.search(".rar", filename, re.I):
return False
m1 = re.search("%s%s" % (re_a, re_a), filename)
m2 = re.search("\[(([0-9]+)(v[0-9])?( end)?)\]", filename, re.I)
if m1 and m2:
if search_mode:
tmp = m1.group(1) + " / " + m1.group(2)
if tmp not in search_list:
return False
self.path = path
self.filename = filename
self.sub = m1.group(1)
self.name = m1.group(2)
self.vol = int(m2.group(2))
return True
else:
return False
class PG_log:
def add_to_logfile(self, filename, In):
global G_logfile_path
filename = os.path.join(G_logfile_path, filename)
logfile = open(filename, "a")
logfile.write(In)
logfile.close()
def add(self, In):
self.add_to_logfile("download_log.txt", In)
class PG_anime:
def __init__(self):
self.List = []
self.vol_list = {}
self.path_list = []
self.path_list_hash = {}
global G_URL
self.URL = G_URL
def end_session(self):
del self.List[0:len(self.List)]
del self.path_list[0:len(self.path_list)]
self.vol_list.clear()
self.path_list_hash.clear()
def scan_dir(self, path, search_mode = False):
for root, dirs, files in os.walk(path):
for name in files:
anime_tmp = PG_anime_unit()
if anime_tmp.add(root, name, search_mode, self.vol_list):
#print("add %s" % name)
self.List.append(anime_tmp)
else:
pass
#print("Failed on %s" % name)
"""
a anime is encode with this form:
sub_name / anime_name[]vol
"""
def add_to_path_list(self, path, filename):
self.path_list.append((path, filename))
def refresh_list(self):
#del old_list[ 0:len(old_list) ]
self.vol_list.clear()
del self.path_list[0:len(self.path_list)]
self.add_to_path_list("FB","test")
self.path_list_hash.clear()
count = 0
for i in self.List:
count += 1
new_name = i.sub + " / " + i.name
new_name_with_vol = new_name + "[]" + str(i.vol)
if new_name not in self.vol_list:
self.vol_list[new_name] = []
self.vol_list[new_name].append(i.vol)
self.add_to_path_list(i.path, i.filename)
self.path_list_hash[new_name_with_vol] = count
#print(self.path_list)
def html_print_list(self):
r = []
r.append("<body style='background-color:#EEEEEE'>")
r.append("<table>\n")
for key, value in self.vol_list.items():
r.append(" <tr>\n")
r.append(" <td>[%d] [%s]</td>\n" % (max(value), key))
r.append(" <td>\n")
for i in range(1, max(value)+1):
tmp = key + "[]" + str(i)
tmp2 = "0" + str(i) if i <= 10 else str(i)
if i in value:
now_time = datetime.datetime.now()
file_path = self.path_list[self.path_list_hash[tmp]]
file_path = os.path.join(file_path[0], file_path[1])
file_time = datetime.datetime.fromtimestamp(os.stat(file_path).st_ctime)
time_delta = ((now_time - file_time).seconds + (now_time - file_time).days * 86400) // 3600
#r.append(str((now_time - file_time).seconds))
if time_delta <= 1:
time_color = "#FF0000"
elif time_delta <= 24*2:
time_color = "#FFAA00"
elif time_delta <= 24*7:
time_color = "#009900"
else :
time_color = "#0000FF"
span = "<span style='color:%s;'>" % time_color
r.append(" <a href=\"%srixia/%d\">%s %s</span></a>\n" \
% (self.URL, self.path_list_hash[tmp], span, tmp2))
else:
r.append(" <b>%s</b>\n" \
% tmp2)
r.append(" </td>\n")
r.append(" </tr>\n")
r.append("</table>\n")
r.append("<!--\n")
for key, value in self.vol_list.items():
str1 = " ".join(str(i) for i in value)
list_tmp = []
for i in value:
list_tmp.append(self.path_list_hash[key + "[]" + str(i)])
str2 = " ".join(str(i) for i in list_tmp)
r.append("%s[=======]%s[=======]%s\n" % (key, str1, str2))
r.append("-->\n")
global G_path_list
G_path_list = self.path_list
return r
def show(self):
print("<h1>動畫闕漏清單</h1>");
for key, value in self.List.items():
vol_max = max(value)
for i in range(1, vol_max):
if i not in value:
print("!!! [缺] !!! [%d] [%s] <br/>" % (i, key) )
print("<h1>動畫最新進度清單</h1>");
for key, value in self.List.items():
print("<h3>[%d] [%s]</h3>" % (max(value), key) )
class PG_httpd(http.server.SimpleHTTPRequestHandler):
PG = PG_anime()
Log = PG_log()
def do_GET(self):
f = self.send_head2()
if f:
#print(self.translate_path(self.path))
self.copyfile(f, self.wfile)
f.close()
def list_dir(self, r):
#enc = sys.getfilesystemencoding()
enc = "utf-8"
encoded = bytes("".join(r), enc)
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(200)
self.send_header("Content-type", "text/html; charset=%s" % enc)
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
return f
def send_head2(self):
global G_path_list
log_tmp = "GET " + self.path
log_tmp.ljust(40)
self.Log.add_to_logfile("download_log_detail.txt", "[%s] => %s [%s]\n" % (self.client_address[0], log_tmp, \
self.log_date_time_string() ))
if self.path == "/":
self.Log.add("[LIST] <-- <%s>\n" % self.client_address[0])
self.PG.end_session()
global G_scan_dir_list, G_search_dir_list
for i in G_scan_dir_list:
self.PG.scan_dir(i)
self.PG.refresh_list()
for i in G_search_dir_list:
self.PG.scan_dir(i, True)
self.PG.refresh_list()
self.send_response(200)
tmp = self.PG.html_print_list()
return self.list_dir(tmp)
m = re.search("rixia/([0-9]+)", self.path)
if not m :
self.send_error(404, "0.0.0.0.....")
return None
#print("processing path : %s" % self.path)
#print("%d" % int(m.group(1)))
#print(G_path_list)
tmp = G_path_list[int(m.group(1))]
path = os.path.join(tmp[0], tmp[1])
filename = tmp[1]
f = None
ctype = self.guess_type(path)
try:
#print("trying %s" % path)
f = open(path, 'rb')
except IOError:
self.send_error(404, "File not found")
return None
self.send_response(200)
self.send_header("Content-type", ctype)
fs = os.fstat(f.fileno())
self.send_header("Content-Length", str(fs[6]))
self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
self.send_header("Content-Disposition", "attachment; filename=%s" % filename)
self.end_headers()
self.Log.add("[DL] --> %s <-- <%s>\n" %(filename, self.client_address[0]))
#self.PG.end_session()
return f
#Handler = http.server.SimpleHTTPRequestHandler
Handler = PG_httpd
httpd = socketserver.TCPServer(("", PORT), Handler)
print("serving at port", PORT)
httpd.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment