Last active
April 16, 2016 23:16
-
-
Save gknepper/ce153fee5cb407f6a8342fca7660dcbf to your computer and use it in GitHub Desktop.
Teste PyCharm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os.path import exists | |
import webbrowser | |
from oauth2client.client import flow_from_clientsecrets, OOB_CALLBACK_URN | |
from oauth2client.file import Storage | |
import httplib2 | |
import magic | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
from utils import thread_loader | |
from logs import * | |
class Drive(object): | |
""" | |
""" | |
def __init__(self, config): | |
self.__config = config | |
self.__drive_service = None | |
self.info = {} | |
def __guess_info(self, file_path): | |
if not exists(file_path): | |
raise IOError('file not found!') | |
self.info = { | |
'path': file_path, | |
'name': file_path.split('/')[-1], | |
'mime_type': magic.from_file(file_path, mime=True), | |
} | |
log_info('[+] new file upload:') | |
# log_dict(self.file_info) | |
def __init_service(self): | |
auth_token = self.__config.get('drive', 'drive.auth_token') | |
if not exists(auth_token): | |
self.__save_credentials(auth_token) | |
storage = Storage(auth_token) | |
credentials = storage.get() | |
http = httplib2.Http() | |
http = credentials.authorize(http) | |
self.__drive_service = build('drive', 'v2', http=http) | |
def __save_credentials(self, auth_token): | |
flow = flow_from_clientsecrets( | |
self.__config.get('drive', 'drive.client_secrets'), | |
self.__config.get('drive', 'drive.oauth2_scope'), | |
OOB_CALLBACK_URN) | |
authorize_url = flow.step1_get_authorize_url() | |
print '[-] open browser...' | |
webbrowser.open(authorize_url) | |
code = raw_input('[*] Please, enter verification code: ').strip() | |
credentials = flow.step2_exchange(code) | |
storage = Storage(auth_token) | |
storage.put(credentials) | |
log_info('[+] new credentials saved') | |
def __insert_file(self): | |
print '[+] uploading file...' | |
media_body = MediaFileUpload( | |
self.info['path'], mimetype=self.info['mime_type'], resumable=True) | |
body = { | |
'title': self.info['name'], | |
'description': 'uploaded with packtpub-crawler', | |
'mimeType': self.info['mime_type'] | |
} | |
file = self.__drive_service.files().insert(body=body, media_body=media_body).execute() | |
# log_dict(file) | |
print '\b[+] updating file permissions...' | |
permissions = { | |
'role': 'reader', | |
'type': 'anyone', | |
'value': self.__config.get('drive', 'drive.gmail') | |
} | |
self.__drive_service.permissions().insert(fileId=file['id'], body=permissions).execute() | |
# self.__drive_service.files().get(fileId=file['id']).execute() | |
self.info['id'] = file['id'] | |
self.info['download_url'] = file['webContentLink'] | |
def upload(self, file_path): | |
self.__guess_info(file_path) | |
self.__init_service() | |
thread_loader(self.__insert_file) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from termcolor import cprint | |
import json | |
import sys, os, traceback | |
def log_error(message): | |
cprint(message, 'red') | |
def log_warn(message): | |
cprint(message, 'yellow') | |
def log_info(message): | |
cprint(message, 'cyan') | |
def log_success(message): | |
cprint(message, 'green') | |
def log_json(list_dict): | |
print json.dumps(list_dict, indent=2) | |
def log_dict(dict): | |
for key, elem in dict.items(): | |
print '\t[{0}] {1}'.format(key, elem) | |
def log_debug(e, stacktrace=True): | |
exc_type, exc_value, exc_traceback = sys.exc_info() | |
fname = os.path.split(exc_traceback.tb_frame.f_code.co_filename)[1] | |
log_warn('[-] {0} {1} | {2}@{3}'.format(exc_type, e, fname, exc_traceback.tb_lineno)) | |
if stacktrace: | |
traceback.print_exc() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import re | |
from os.path import split | |
from utils import make_soup, wait, download_file, create_directory | |
from logs import * | |
class Packpub(object): | |
""" | |
""" | |
def __init__(self, config, dev): | |
self.__config = config | |
self.__dev = dev | |
self.__delay = float(self.__config.get('delay', 'delay.requests')) | |
self.__url_base = self.__config.get('url', 'url.base') | |
self.__headers = self.__init_headers() | |
self.__session = requests.Session() | |
self.info = { | |
'paths': [] | |
} | |
def __init_headers(self): | |
return { | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive', | |
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36' | |
} | |
def __log_response(self, response, method='GET', detail=False): | |
print '[-] {0} {1} | {2}'.format(method, response.url, response.status_code) | |
if detail: | |
print '[-] cookies:' | |
log_dict(requests.utils.dict_from_cookiejar(self.__session.cookies)) | |
print '[-] headers:' | |
log_dict(response.headers) | |
def __GET_login(self): | |
url = self.__url_base | |
if self.__dev: | |
url += self.__config.get('url', 'url.loginGet') | |
else: | |
url += self.__config.get('url', 'url.login') | |
response = self.__session.get(url, headers=self.__headers) | |
self.__log_response(response) | |
soup = make_soup(response) | |
form = soup.find('form', {'id': 'packt-user-login-form'}) | |
self.info['form_build_id'] = form.find('input', attrs={'name': 'form_build_id'})['value'] | |
self.info['form_id'] = form.find('input', attrs={'name': 'form_id'})['value'] | |
def __POST_login(self): | |
data = self.info.copy() | |
data['email'] = self.__config.get('credential', 'credential.email') | |
data['password'] = self.__config.get('credential', 'credential.password') | |
data['op'] = 'Login' | |
# print '[-] data: {0}'.format(urllib.urlencode(data)) | |
url = self.__url_base | |
response = None | |
if self.__dev: | |
url += self.__config.get('url', 'url.loginPost') | |
response = self.__session.get(url, headers=self.__headers, data=data) | |
self.__log_response(response) | |
else: | |
url += self.__config.get('url', 'url.login') | |
response = self.__session.post(url, headers=self.__headers, data=data) | |
self.__log_response(response, 'POST', True) | |
soup = make_soup(response) | |
div_target = soup.find('div', {'id': 'deal-of-the-day'}) | |
title = div_target.select('div.dotd-title > h2')[0].text.strip() | |
self.info['title'] = title | |
self.info['filename'] = title.encode('ascii', 'ignore').replace(' ', '_') | |
self.info['description'] = div_target.select('div.dotd-main-book-summary > div')[2].text.strip() | |
self.info['url_image'] = 'https:' + div_target.select('div.dotd-main-book-image img')[0]['src'] | |
self.info['url_claim'] = self.__url_base + div_target.select('a.twelve-days-claim')[0]['href'] | |
# remove useless info | |
self.info.pop('form_build_id', None) | |
self.info.pop('form_id', None) | |
def __GET_claim(self): | |
if self.__dev: | |
url = self.__url_base + self.__config.get('url', 'url.account') | |
else: | |
url = self.info['url_claim'] | |
response = self.__session.get(url, headers=self.__headers) | |
self.__log_response(response) | |
soup = make_soup(response) | |
div_target = soup.find('div', {'id': 'product-account-list'}) | |
# only last one just claimed | |
div_claimed_book = div_target.select('.product-line')[0] | |
self.info['book_id'] = div_claimed_book['nid'] | |
self.info['author'] = div_claimed_book.find(class_='author').text.strip() | |
source_code = div_claimed_book.find(href=re.compile('/code_download/*')) | |
if source_code is not None: | |
self.info['url_source_code'] = self.__url_base + source_code['href'] | |
def run(self): | |
""" | |
""" | |
self.__GET_login() | |
wait(self.__delay) | |
self.__POST_login() | |
wait(self.__delay) | |
self.__GET_claim() | |
wait(self.__delay) | |
def download_ebooks(self, types): | |
""" | |
""" | |
downloads_info = [dict(type=type, | |
url=self.__url_base + self.__config.get('url', 'url.download').format(self.info['book_id'], type), | |
filename=self.info['filename'] + '.' + type) | |
for type in types] | |
directory = self.__config.get('path', 'path.ebooks') | |
for download in downloads_info: | |
self.info['paths'].append( | |
download_file(self.__session, download['url'], directory, download['filename'])) | |
def download_extras(self): | |
""" | |
""" | |
directory = self.__config.get('path', 'path.extras') | |
url_image = self.info['url_image'] | |
filename = self.info['filename'] + '_' + split(url_image)[1] | |
self.info['paths'].append(download_file(self.__session, url_image, directory, filename)) | |
if 'url_source_code' in self.info: | |
self.info['paths'].append(download_file(self.__session, self.info['url_source_code'], directory, | |
self.info['filename'] + '.zip')) | |
def download_ebooks_dir(self, types,directory): | |
""" | |
""" | |
downloads_info = [dict(type=type, | |
url=self.__url_base + self.__config.get('url', 'url.download').format(self.info['book_id'], type), | |
filename=self.info['filename'] + '.' + type) | |
for type in types] | |
for download in downloads_info: | |
self.info['paths'].append( | |
download_file(self.__session, download['url'], directory, download['filename'])) | |
def download_extras_dir(self,directory): | |
""" | |
""" | |
url_image = self.info['url_image'] | |
filename = self.info['filename'] + '_' + split(url_image)[1] | |
self.info['paths'].append(download_file(self.__session, url_image, directory, filename)) | |
if 'url_source_code' in self.info: | |
self.info['paths'].append(download_file(self.__session, self.info['url_source_code'], directory, | |
self.info['filename'] + '.zip')) | |
def get_library_list(self): | |
self.__GET_login() | |
wait(self.__delay) | |
self.__POST_login() | |
wait(self.__delay) | |
url = self.__url_base + self.__config.get('url', 'url.myebooks') | |
response = self.__session.get(url, headers=self.__headers) | |
self.__log_response(response) | |
soup = make_soup(response) | |
for a in soup.findAll('div', attrs={'class': 'product-line unseen'}): | |
print "Title: " + a.attrs.get('title') | |
print "Directory: " + a.attrs.get('title')[:-8].replace(' ', '_') | |
# print a | |
cover_url = a.find('img', attrs={'class': ' imagecache imagecache-thumbview'}).get('src').replace('thumbview', 'dotd_main_image') | |
print "Cover URL: " "http:" + cover_url | |
links = [] | |
for link in a.findAll('a', href=True): | |
url = link.attrs.get('href') | |
if not '#' in url: | |
links.append(url) | |
for i in range(1, len(links)): | |
if "cart" not in links[i] or not '#' or None: | |
if links[i].split("/")[-1] == 'pdf': | |
print "Download pdf: " + self.__url_base + links[i] | |
elif links[i].split("/")[-1] == 'epub': | |
print "Download epub: " + self.__url_base + links[i] | |
elif links[i].split("/")[-1] == 'mobi': | |
print "Download mobi: " + self.__url_base + links[i] | |
else: | |
print "Download extras: " + self.__url_base + links[i] | |
def dump_all_library(self): | |
self.__GET_login() | |
wait(self.__delay) | |
self.__POST_login() | |
wait(self.__delay) | |
url = self.__url_base + self.__config.get('url', 'url.myebooks') | |
response = self.__session.get(url, headers=self.__headers) | |
self.__log_response(response) | |
soup = make_soup(response) | |
for a in soup.findAll('div', attrs={'class': 'product-line unseen'}): | |
log_info("[+] Downloading : " + a.attrs.get('title')) | |
#print "Downloading : " + a.attrs.get('title') | |
directory = a.attrs.get('title')[:-8].replace(' ', '_') | |
filename = directory | |
#print "Directory: " + a.attrs.get('title')[:-8].replace(' ', '_') | |
# print a | |
# print "Cover URL: " "http:" + a.find('img', attrs={'class': ' imagecache imagecache-thumbview'}).get('src').replace('thumbview', 'dotd_main_image') | |
cover_url = a.find('img', attrs={'class': ' imagecache imagecache-thumbview'}).get('src').replace('thumbview', 'dotd_main_image') | |
download_file(self.__session, 'http:' + cover_url, self.__config.get('path', 'path.dumps') + '/' + directory, filename +'.jpg') | |
links = [] | |
for link in a.findAll('a', href=True): | |
url = link.attrs.get('href') | |
if not '#' in url: | |
links.append(url) | |
for i in range(1, len(links)): | |
if "cart" not in links[i] or not '#' or None: | |
if links[i].split("/")[-1] == 'pdf': | |
# print "Download pdf: " + self.__url_base + links[i] | |
download_file(self.__session, self.__url_base + links[i], self.__config.get('path', 'path.dumps') + '/' + directory, filename + '.pdf') | |
elif links[i].split("/")[-1] == 'epub': | |
# print "Download epub: " + self.__url_base + links[i] | |
download_file(self.__session, self.__url_base + links[i], self.__config.get('path', 'path.dumps') + '/' + directory, filename + '.epub') | |
elif links[i].split("/")[-1] == 'mobi': | |
# print "Download mobi: " + self.__url_base + links[i] | |
download_file(self.__session, self.__url_base + links[i], self.__config.get('path', 'path.dumps') + '/' + directory, filename + '.mobi') | |
else: | |
# print "Download extras: " + self.__url_base + links[i] | |
download_file(self.__session, self.__url_base + links[i], self.__config.get('path', 'path.dumps') + '/' + directory, filename +'.zip') | |
wait(self.__delay) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
// setup environment | |
sudo easy_install pip | |
// lists installed modules and version | |
pip freeze | |
// search | |
pip search module_name | |
sudo pip install termcolor | |
sudo pip install beautifulsoup4 | |
sudo pip install requests | |
sudo pip install requests[security] | |
sudo pip install clint | |
// Drive | |
sudo pip install httplib2 | |
sudo pip install --upgrade google-api-python-client | |
// fix error: AttributeError: 'Module_six_moves_urllib_parse' object has no attribute 'urlparse' | |
sudo pip install -I google-api-python-client==1.3.2 | |
sudo pip install apiclient | |
// run | |
python spider.py | |
python spider.py -e prod | |
python spider.py -h | |
""" | |
import argparse | |
from utils import ip_address, config_file | |
from packtpub import Packpub | |
from upload import Upload, SERVICE_DRIVE, SERVICE_DROPBOX | |
from logs import * | |
def parse_types(args): | |
if args.types is None: | |
return [args.type] | |
else: | |
return args.types | |
def main(): | |
parser = argparse.ArgumentParser( | |
description='Download FREE eBook every day from www.packtpub.com', | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
version='1.0') | |
parser.add_argument('-c', '--config', required=True, help='configuration file') | |
parser.add_argument('-d', '--dev', action='store_true', help='only for development') | |
parser.add_argument('-e', '--extras', action='store_true', help='download source code (if exists) and book cover') | |
parser.add_argument('-u', '--upload', choices=[SERVICE_DRIVE, SERVICE_DROPBOX], help='upload to cloud') | |
parser.add_argument('-a', '--archive', action='store_true', help='compress all file') | |
parser.add_argument('-n', '--notify', action='store_true', help='send confirmation email') | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument('-t', '--type', choices=['pdf', 'epub', 'mobi'], | |
default='pdf', help='specify eBook type') | |
group.add_argument('--all', dest='types', action='store_const', | |
const=['pdf', 'epub', 'mobi'], help='all eBook types') | |
args = parser.parse_args() | |
try: | |
ip_address() | |
config = config_file(args.config) | |
types = parse_types(args) | |
packpub = Packpub(config, args.dev) | |
packpub.run() | |
log_json(packpub.info) | |
packpub.download_ebooks(types) | |
if args.extras: | |
packpub.download_extras() | |
if args.archive: | |
raise NotImplementedError('not implemented yet!') | |
if args.upload is not None: | |
Upload(config, args.upload).run(packpub.info['paths']) | |
if args.notify: | |
raise NotImplementedError('not implemented yet!') | |
except KeyboardInterrupt: | |
log_error('[-] interrupted manually') | |
except Exception as e: | |
log_debug(e) | |
log_error('[-] something weird occurred, exiting...') | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
// setup environment | |
sudo easy_install pip | |
// lists installed modules and version | |
pip freeze | |
// search | |
pip search module_name | |
sudo pip install termcolor | |
sudo pip install beautifulsoup4 | |
sudo pip install requests | |
sudo pip install requests[security] | |
sudo pip install clint | |
// Drive | |
sudo pip install httplib2 | |
sudo pip install --upgrade google-api-python-client | |
// fix error: AttributeError: 'Module_six_moves_urllib_parse' object has no attribute 'urlparse' | |
sudo pip install -I google-api-python-client==1.3.2 | |
sudo pip install apiclient | |
// run | |
python spider.py | |
python spider.py -e prod | |
python spider.py -h | |
""" | |
import argparse | |
from utils import ip_address, config_file | |
from packtpub import Packpub | |
from upload import Upload, SERVICE_DRIVE, SERVICE_DROPBOX | |
from logs import * | |
def parse_types(args): | |
if args.types is None: | |
return [args.type] | |
else: | |
return args.types | |
def main(): | |
parser = argparse.ArgumentParser( | |
description='Download FREE eBook every day from www.packtpub.com', | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
version='1.0') | |
parser.add_argument('-c', '--config', required=True, help='configuration file') | |
parser.add_argument('-d', '--dev', action='store_true', help='only for development') | |
parser.add_argument('-e', '--extras', action='store_true', help='download source code (if exists) and book cover') | |
parser.add_argument('-u', '--upload', choices=[SERVICE_DRIVE, SERVICE_DROPBOX], help='upload to cloud') | |
parser.add_argument('-a', '--archive', action='store_true', help='compress all file') | |
parser.add_argument('-n', '--notify', action='store_true', help='send confirmation email') | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument('-t', '--type', choices=['pdf', 'epub', 'mobi'], | |
default='pdf', help='specify eBook type') | |
group.add_argument('--all', dest='types', action='store_const', | |
const=['pdf', 'epub', 'mobi'], help='all eBook types') | |
args = parser.parse_args() | |
try: | |
ip_address() | |
config = config_file(args.config) | |
types = parse_types(args) | |
packpub = Packpub(config, args.dev) | |
#packpub.run() | |
#log_json(packpub.info) | |
packpub.dump_all_library() | |
#packpub.get_library_list() | |
if not os.path.exists("ebooks/" + packpub.info['filename']): | |
log_info("[+] Creating Directory: ebooks/"+packpub.info['filename']) | |
dirdownload = 'ebooks/' + packpub.info['filename'] | |
os.makedirs(dirdownload) | |
packpub.download_ebooks_dir(['pdf', 'epub', 'mobi'], dirdownload) | |
packpub.download_extras_dir(dirdownload) | |
else: | |
dirdownload = 'ebooks/' + packpub.info['filename'] | |
log_error('[-] Download already done or directory ' + dirdownload + ' exists') | |
# packpub.download_ebooks(types) | |
#if args.extras: | |
# packpub.download_extras() | |
#if args.archive: | |
# raise NotImplementedError('not implemented yet!') | |
#if args.upload is not None: | |
# Upload(config, args.upload).run(packpub.info['paths']) | |
if args.notify: | |
raise NotImplementedError('not implemented yet!') | |
except KeyboardInterrupt: | |
log_error('[-] interrupted manually') | |
except Exception as e: | |
log_debug(e) | |
log_error('[-] something weird occurred, exiting...') | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from drive import Drive | |
from logs import * | |
SERVICE_DRIVE = 'drive' | |
SERVICE_DROPBOX = 'DROPBOX' | |
class Upload(object): | |
""" | |
TODO interface or abstract class for upload services | |
""" | |
def __init__(self, config, service_type): | |
self.__config = config | |
if service_type == SERVICE_DRIVE: | |
self.service = Drive(config) | |
elif service_type == SERVICE_DROPBOX: | |
raise NotImplementedError('not implemented yet!') | |
def run(self, paths): | |
""" | |
""" | |
for path in paths: | |
self.service.upload(path) | |
log_dict(self.service.info) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import ConfigParser | |
from bs4 import BeautifulSoup | |
from time import sleep | |
from clint.textui import progress | |
import os, sys, itertools | |
from threading import Thread | |
from logs import * | |
def ip_address(): | |
""" | |
Gets current IP address | |
""" | |
response = requests.get('http://www.ip-addr.es') | |
print '[-] GET {0} | {1}'.format(response.status_code, response.url) | |
log_info('[+] ip address is: {0}'.format(response.text.strip())) | |
def config_file(path): | |
""" | |
Reads configuration file | |
""" | |
if not os.path.exists(path): | |
raise IOError('file not found!') | |
log_info('[+] configuration file: {0}'.format(path)) | |
config = ConfigParser.ConfigParser() | |
config.read(path) | |
return config | |
def make_soup(response, debug=False): | |
""" | |
Makes soup from response | |
""" | |
print '[*] fetching url... {0} | {1}'.format(response.status_code, response.url) | |
#soup = BeautifulSoup(response.text, from_encoding=response.encoding) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
if debug: | |
print soup.prettify().encode('utf-8') | |
return soup | |
def wait(delay): | |
if delay > 0: | |
print '[-] going to sleep {0} seconds'.format(delay) | |
sleep(delay) | |
def download_file(r, url, directory, filename): | |
""" | |
Downloads file with progress bar | |
""" | |
if not os.path.exists(directory): | |
# creates directories recursively | |
os.makedirs(directory) | |
log_info('[+] created new directory: ' + directory) | |
path = os.path.join(directory, filename) | |
print '[-] downloading file from url: {0}'.format(url) | |
response = r.get(url, stream=True) | |
with open(path, 'wb') as f: | |
total_length = int(response.headers.get('content-length')) | |
for chunk in progress.bar(response.iter_content(chunk_size=1024), expected_size=(total_length/1024) + 1): | |
if chunk: | |
f.write(chunk) | |
f.flush() | |
log_success('[+] new download: {0}'.format(path)) | |
return path | |
def thread_loader(function): | |
""" | |
Starts a thread with loading bar | |
""" | |
thread = Thread(target=function) | |
thread.start() | |
spinner = itertools.cycle(['-', '/', '|', '\\']) | |
while thread.is_alive(): | |
sys.stdout.write(spinner.next()) | |
sys.stdout.flush() | |
# erase the last written char | |
sys.stdout.write('\b') | |
def create_directory(path, name): | |
if not os.path.exists(path + '/' + name): | |
log_info("[+] Creating Directory: " + path + '/' + name) | |
os.makedirs( path + '/' + name ) | |
else: | |
log_error('[-] Directory ' + path +'/' + name + ' already exists') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment