Last active
January 13, 2016 08:13
-
-
Save wujiyu115/707691f95713c69fde01 to your computer and use it in GitHub Desktop.
python:downblogimg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding:utf-8 -*- | |
import os | |
import os.path | |
import re | |
import logging | |
import json | |
import requests | |
import errno | |
postFileSuffix = ".md" | |
#----------------------逻辑代码--------------------------- | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s [line:%(lineno)d] %(levelname)s %(message)s', | |
datefmt='%a, %d %b %Y %H:%M:%S', | |
filename='blogimg.log', | |
filemode='a') | |
console = logging.StreamHandler() | |
console.setLevel(logging.DEBUG) | |
formatter = logging.Formatter('%(asctime)s:[line:%(lineno)d] %(levelname)-8s %(message)s') | |
console.setFormatter(formatter) | |
logging.getLogger('').addHandler(console) | |
#-------------------FileUtil------------------------------------------------ | |
class FileUtil: | |
def __init__(self): | |
pass | |
def read(self,path): | |
is_exists= os.path.exists(path) | |
if not is_exists: | |
f = open(path, 'w') | |
f.close() | |
files = open(path,"r") | |
strs= files.read() | |
files.close() | |
return strs | |
pass | |
def read_json(self,path): | |
content =self.read(path) | |
jobject = {} | |
if content: | |
jobject=json.loads(content) | |
return jobject | |
pass | |
def write(self,path,content): | |
f = open(path, 'w+') | |
f.write(content) | |
f.close() | |
pass | |
def write_json(self,path,json_object): | |
jstr = json.dumps(json_object) | |
self.write(path,jstr) | |
pass | |
#-------------------代理类------------------------------------------------ | |
class BlogFileUtil: | |
def __init__(self): | |
self.config_file = "blogimg.json" | |
self.file_util = FileUtil() | |
self.config_info = self.file_util.read_json(self.config_file) | |
self.init_proxy_list() | |
pass | |
def init_proxy_list(self): | |
self.config_info["img_list"] = self.config_info.get("img_list") or [] #已经匹配过的图片 | |
# logging.info("已经匹配过的图片:"+str(len(self.config_info["img_list"] ))) | |
def add_imgs_to_list(self,images): | |
self.config_info["img_list"]=[] | |
for img_path in images: | |
self.config_info["img_list"].append([img_path,False]) | |
self.write_json() | |
def down_success(self,img_path): | |
for img_obj in self.config_info["img_list"]: | |
if img_obj[0]== img_path: | |
img_obj[1]=True | |
logging.info("download success:"+img_path) | |
def get_imgs(self): | |
return self.config_info["img_list"] | |
def write_json(self): | |
self.file_util.write_json(self.config_file,self.config_info) | |
class FindBlogImg(object): | |
"""docstring for FindBlogImg""" | |
def __init__(self): | |
super(FindBlogImg, self).__init__() | |
self.hexo_post_location = None #hexo ,source位置 | |
self.post_dict_name = None #post位置 | |
self.wp_root = None #wordpress上传目录根目录 | |
self.always_find_img = True # 从文件中拿匹配还是从rm中匹配 | |
self.down_same_dict = False##是否下载到同一个目录wp_root | |
self.init_path() | |
self.file_proxy = BlogFileUtil() | |
''' | |
按目录下载全部图片 | |
''' | |
def down_all_images(self): | |
if self.always_find_img == True : | |
all_imgs = self.__find_img() | |
self.file_proxy.add_imgs_to_list(all_imgs) | |
need_down_imgs = self.file_proxy.get_imgs() | |
for img_obj in need_down_imgs: | |
img_path=img_obj[0] | |
had_down=img_obj[1] | |
if had_down ==False : | |
try: | |
self.__down_img(img_path) | |
# break | |
except Exception, e: | |
print(e) | |
logging.error("download failure:"+img_path) | |
finally: | |
pass | |
self.file_proxy.write_json() | |
''' | |
按目录从导入的地址下载全部图片 | |
@param full_path 导入的地址文件(一行一个) | |
''' | |
def down_all_images_by_urls(self,full_path): | |
need_down_imgs = self.__read_line(full_path) | |
for img_obj in need_down_imgs: | |
img_path=img_obj | |
try: | |
self.__down_img(img_path) | |
# break | |
except Exception, e: | |
print(e) | |
logging.error("download failure:"+img_path) | |
finally: | |
pass | |
''' | |
初始化路径 | |
@param hexo_post_location hexo的source目录 | |
@param wp_root wordpress图片根目录 | |
''' | |
def init_path(self,hexo_post_location = "E:/git/hexo/source/",wp_root = "wp-content"): | |
self.hexo_post_location =hexo_post_location | |
self.post_dict_name =hexo_post_location+ "_posts" | |
self.wp_root = wp_root | |
''' | |
初始化选项 | |
@param always_find_img 总是读取md来匹配 还是读json中已保存的缓存数据 | |
@param down_same_dict 是否下载到同一个目录wp_root | |
''' | |
def init_option(self,always_find_img=True,down_same_dict=False): | |
self.always_find_img = always_find_img | |
self.down_same_dict = down_same_dict | |
''' | |
从html文件中匹配所有图片文件 | |
@param full_path html文件 | |
@return 返回list列表 | |
''' | |
def find_img(self,full_path): | |
html=self.__read_file(full_path) | |
lists = self.__check_image_link(html) | |
return lists | |
def __find_img(self): | |
all_imgs = [] | |
for parent,dirnames,filenames in os.walk(self.post_dict_name): | |
for filename in filenames: | |
full_path=os.path.join(parent,filename) | |
if self.__check_file_suffix(full_path,postFileSuffix): | |
html=self.__read_file(full_path) | |
lists = self.__check_image_link(html) | |
all_imgs.extend(lists) | |
return all_imgs | |
def __down_img(self,url): | |
full_filename=self.__get_file_name(url) | |
if not os.path.exists(full_filename): | |
# logging.info("start download :"+full_filename) | |
r = requests.get(url, stream=True) | |
if r.status_code == 200: | |
with open(full_filename, 'wb') as f: | |
for chunk in r.iter_content(1024): | |
f.write(chunk) | |
f.close() | |
self.file_proxy.down_success(url) | |
else: | |
logging.error(" download status_code failure: "+url) | |
pass | |
else: | |
self.file_proxy.down_success(url) | |
pass | |
##如果没有匹配到wp_root的也放在这个目录中,比如外站 | |
def __get_file_name(self,url): | |
dirname=self.wp_root | |
dir_end_index= url.rfind("/") | |
file_name= url[dir_end_index+1:len(url)] | |
if self.down_same_dict ==False: | |
if self.wp_root in url: | |
dir_start_index= url.index(self.wp_root) | |
dirname= url[dir_start_index:dir_end_index] | |
full_path=os.path.join(self.hexo_post_location,dirname) | |
# full_path=dirname | |
self.__mkdir_p(full_path) | |
full_filename=os.path.join(full_path,file_name) | |
# print(full_path,full_filename) | |
return full_filename | |
def __mkdir_p(self,path): | |
try: | |
os.makedirs(path) | |
except OSError as exc: # Python >2.5 (except OSError, exc: for Python <2.5) | |
if exc.errno == errno.EEXIST and os.path.isdir(path): | |
pass | |
else: | |
raise | |
def __check_file_suffix( self,fileName, suffix): | |
return suffix in fileName | |
def __check_image_link(self,html) : | |
rex = r"http.*?\.(?:jpg|gif|png)" | |
# rex = r"src=\"(http.*?\.(?:jpg|gif|png))" | |
m = re.findall(rex,html) | |
return m | |
def __read_file(self,path): | |
file_object = open(path) | |
try: | |
all_the_text = file_object.read( ) | |
return all_the_text | |
except Exception,e: | |
print str(e) | |
finally: | |
file_object.close( ) | |
return None | |
def __read_line(self,full_path): | |
f = open(full_path,'r') | |
result = list() | |
for line in f.readlines() : | |
result.append(line) | |
f.close() | |
return result | |
if __name__ == '__main__': | |
findImg = FindBlogImg() | |
# try: | |
# findImg.init_option(False) | |
findImg.down_all_images() | |
# findImg.down_all_images_by_urls("wordpress_pic.txt") | |
# except Exception,e: | |
# # SendEmail(str(e)) | |
# print str(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment