Skip to content

Instantly share code, notes, and snippets.

@wujiyu115
Last active January 13, 2016 08:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wujiyu115/707691f95713c69fde01 to your computer and use it in GitHub Desktop.
Save wujiyu115/707691f95713c69fde01 to your computer and use it in GitHub Desktop.
python:downblogimg
# -*- coding:utf-8 -*-
import os
import os.path
import re
import logging
import json
import requests
import errno
postFileSuffix = ".md"
#----------------------逻辑代码---------------------------
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s [line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename='blogimg.log',
filemode='a')
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s:[line:%(lineno)d] %(levelname)-8s %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
#-------------------FileUtil------------------------------------------------
class FileUtil:
def __init__(self):
pass
def read(self,path):
is_exists= os.path.exists(path)
if not is_exists:
f = open(path, 'w')
f.close()
files = open(path,"r")
strs= files.read()
files.close()
return strs
pass
def read_json(self,path):
content =self.read(path)
jobject = {}
if content:
jobject=json.loads(content)
return jobject
pass
def write(self,path,content):
f = open(path, 'w+')
f.write(content)
f.close()
pass
def write_json(self,path,json_object):
jstr = json.dumps(json_object)
self.write(path,jstr)
pass
#-------------------代理类------------------------------------------------
class BlogFileUtil:
def __init__(self):
self.config_file = "blogimg.json"
self.file_util = FileUtil()
self.config_info = self.file_util.read_json(self.config_file)
self.init_proxy_list()
pass
def init_proxy_list(self):
self.config_info["img_list"] = self.config_info.get("img_list") or [] #已经匹配过的图片
# logging.info("已经匹配过的图片:"+str(len(self.config_info["img_list"] )))
def add_imgs_to_list(self,images):
self.config_info["img_list"]=[]
for img_path in images:
self.config_info["img_list"].append([img_path,False])
self.write_json()
def down_success(self,img_path):
for img_obj in self.config_info["img_list"]:
if img_obj[0]== img_path:
img_obj[1]=True
logging.info("download success:"+img_path)
def get_imgs(self):
return self.config_info["img_list"]
def write_json(self):
self.file_util.write_json(self.config_file,self.config_info)
class FindBlogImg(object):
"""docstring for FindBlogImg"""
def __init__(self):
super(FindBlogImg, self).__init__()
self.hexo_post_location = None #hexo ,source位置
self.post_dict_name = None #post位置
self.wp_root = None #wordpress上传目录根目录
self.always_find_img = True # 从文件中拿匹配还是从rm中匹配
self.down_same_dict = False##是否下载到同一个目录wp_root
self.init_path()
self.file_proxy = BlogFileUtil()
'''
按目录下载全部图片
'''
def down_all_images(self):
if self.always_find_img == True :
all_imgs = self.__find_img()
self.file_proxy.add_imgs_to_list(all_imgs)
need_down_imgs = self.file_proxy.get_imgs()
for img_obj in need_down_imgs:
img_path=img_obj[0]
had_down=img_obj[1]
if had_down ==False :
try:
self.__down_img(img_path)
# break
except Exception, e:
print(e)
logging.error("download failure:"+img_path)
finally:
pass
self.file_proxy.write_json()
'''
按目录从导入的地址下载全部图片
@param full_path 导入的地址文件(一行一个)
'''
def down_all_images_by_urls(self,full_path):
need_down_imgs = self.__read_line(full_path)
for img_obj in need_down_imgs:
img_path=img_obj
try:
self.__down_img(img_path)
# break
except Exception, e:
print(e)
logging.error("download failure:"+img_path)
finally:
pass
'''
初始化路径
@param hexo_post_location hexo的source目录
@param wp_root wordpress图片根目录
'''
def init_path(self,hexo_post_location = "E:/git/hexo/source/",wp_root = "wp-content"):
self.hexo_post_location =hexo_post_location
self.post_dict_name =hexo_post_location+ "_posts"
self.wp_root = wp_root
'''
初始化选项
@param always_find_img 总是读取md来匹配 还是读json中已保存的缓存数据
@param down_same_dict 是否下载到同一个目录wp_root
'''
def init_option(self,always_find_img=True,down_same_dict=False):
self.always_find_img = always_find_img
self.down_same_dict = down_same_dict
'''
从html文件中匹配所有图片文件
@param full_path html文件
@return 返回list列表
'''
def find_img(self,full_path):
html=self.__read_file(full_path)
lists = self.__check_image_link(html)
return lists
def __find_img(self):
all_imgs = []
for parent,dirnames,filenames in os.walk(self.post_dict_name):
for filename in filenames:
full_path=os.path.join(parent,filename)
if self.__check_file_suffix(full_path,postFileSuffix):
html=self.__read_file(full_path)
lists = self.__check_image_link(html)
all_imgs.extend(lists)
return all_imgs
def __down_img(self,url):
full_filename=self.__get_file_name(url)
if not os.path.exists(full_filename):
# logging.info("start download :"+full_filename)
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(full_filename, 'wb') as f:
for chunk in r.iter_content(1024):
f.write(chunk)
f.close()
self.file_proxy.down_success(url)
else:
logging.error(" download status_code failure: "+url)
pass
else:
self.file_proxy.down_success(url)
pass
##如果没有匹配到wp_root的也放在这个目录中,比如外站
def __get_file_name(self,url):
dirname=self.wp_root
dir_end_index= url.rfind("/")
file_name= url[dir_end_index+1:len(url)]
if self.down_same_dict ==False:
if self.wp_root in url:
dir_start_index= url.index(self.wp_root)
dirname= url[dir_start_index:dir_end_index]
full_path=os.path.join(self.hexo_post_location,dirname)
# full_path=dirname
self.__mkdir_p(full_path)
full_filename=os.path.join(full_path,file_name)
# print(full_path,full_filename)
return full_filename
def __mkdir_p(self,path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5 (except OSError, exc: for Python <2.5)
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def __check_file_suffix( self,fileName, suffix):
return suffix in fileName
def __check_image_link(self,html) :
rex = r"http.*?\.(?:jpg|gif|png)"
# rex = r"src=\"(http.*?\.(?:jpg|gif|png))"
m = re.findall(rex,html)
return m
def __read_file(self,path):
file_object = open(path)
try:
all_the_text = file_object.read( )
return all_the_text
except Exception,e:
print str(e)
finally:
file_object.close( )
return None
def __read_line(self,full_path):
f = open(full_path,'r')
result = list()
for line in f.readlines() :
result.append(line)
f.close()
return result
if __name__ == '__main__':
findImg = FindBlogImg()
# try:
# findImg.init_option(False)
findImg.down_all_images()
# findImg.down_all_images_by_urls("wordpress_pic.txt")
# except Exception,e:
# # SendEmail(str(e))
# print str(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment