Created
February 6, 2018 09:48
-
-
Save SylvanasSun/260e958b31c7ddd87eec5ee5bf8a7dfb to your computer and use it in GitHub Desktop.
Some utils in common use of python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import logging | |
METHOD_GET = 'GET' | |
METHOD_POST = 'POST' | |
RETURN_TEXT = 'TEXT' | |
RETURN_BINARY = 'BINARY' | |
RETURN_JSON = 'JSON' | |
RETURN_RAW = 'RAW' | |
logging = logging.getLogger(__name__) | |
def before_logging(func): | |
def wrapper(*args, **kwargs): | |
func_name = func.__name__ | |
separator_idx = func_name.find('_') | |
func_type = func_name[separator_idx + 1:] | |
logging.info('request[%s]: %s ....' % (func_type, args[0])) | |
return func(*args, **kwargs) | |
return wrapper | |
@before_logging | |
def request_get(url, data, retry_times=3, timeout=0.5): | |
for i in range(retry_times): | |
try: | |
result = requests.get(url, timeout=timeout, params=data) | |
except Exception as e: | |
logging.warning('request[get]: %s - failure, retry times %s ' % (url, i + 1)) | |
continue | |
return result | |
@before_logging | |
def request_post(url, data, retry_times=3, timeout=0.5): | |
for i in range(retry_times): | |
try: | |
result = requests.post(url, timeout=timeout, data=data) | |
except Exception as e: | |
logging.warning('request[post]: %s - failure, retry times %s ' % (url, i + 1)) | |
continue | |
return result | |
def request(url, data=None, method_type=METHOD_GET, retry_times=3, timeout=0.5, return_type=RETURN_TEXT): | |
result = None | |
if method_type == METHOD_GET: | |
result = request_get(url, data, retry_times, timeout) | |
if method_type == METHOD_POST: | |
result = request_post(url, data, retry_times, timeout) | |
if not result or result == None: return result | |
if return_type == RETURN_TEXT: | |
return result.text | |
if return_type == RETURN_JSON: | |
try: | |
return result.json() | |
except Exception as e: | |
logging.warning('request[%s]: parse json failure %s ' % (method_type.lower(), str(e))) | |
if return_type == RETURN_BINARY: | |
return result.content | |
if return_type == RETURN_RAW: | |
return result.raw |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import smtplib | |
from email import encoders | |
from email.header import Header | |
from email.mime.base import MIMEBase | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
from email.utils import parseaddr, formataddr | |
class SmtpKeys(object): | |
SMTP_HOST = 'smtp_host' | |
SMTP_PORT = 'smtp_port' | |
SMTP_SENDER = 'smtp_sender' | |
SMTP_RECEIVER = 'smtp_receiver' | |
SMTP_SUBJECT = 'smtp_subject' | |
SMTP_USERNAME = 'smtp_username' | |
SMTP_PASSWORD = 'smtp_password' | |
SMTP_ATTACHMENTS = 'smtp_attachments' | |
SMTP_IS_THIRD_PARTY = 'smtp_third_party' | |
SMTP_IS_SEND_HTML = 'smtp_send_html' | |
SMTP_IS_SSL = 'smtp_ssl' | |
SMTP_IS_VERBOSE_LOG = 'smtp_verbose_log' | |
class SmtpHelper(object): | |
""" | |
The class represents an object for simplifying use operate in | |
python email module and provides chaining invoke style for config. | |
Usage: | |
a = Attachment('E:\\test.jpg', 'test.jpg', 'image', 'jpg') | |
helper = SmtpHelper() \ | |
.with_server('smtp.xx.com', 25) \ | |
.with_third_party_service() \ | |
.with_ssl() \ | |
.with_server_login('test@xxx.com', 'xxxxxxxxxxx') \ | |
.with_sender('SylvanasSun', 'test@xxx.com') \ | |
.with_receiver('Claude Shannon', ['123456@xxx.com']) \ | |
.with_subject('Hello') \ | |
.with_attachment(a) | |
helper.send('Hello, World') | |
Author: SylvanasSun <sylvanas.sun@gmail.com> | |
Licence: MIT | |
""" | |
def __init__(self): | |
self.params = {} | |
def with_server(self, host, port=25): | |
if not isinstance(host, str) or not isinstance(port, int): | |
raise ValueError('The type of host and port must be a string and integer') | |
self.params[SmtpKeys.SMTP_HOST] = host | |
self.params[SmtpKeys.SMTP_PORT] = port | |
return self | |
def with_server_login(self, username, password): | |
if not isinstance(username, str) or not isinstance(password, str): | |
raise ValueError('The type of username and password must be a string') | |
self.params[SmtpKeys.SMTP_USERNAME] = username | |
self.params[SmtpKeys.SMTP_PASSWORD] = password | |
return self | |
def with_sender(self, sender, addr): | |
if not isinstance(sender, str) or not isinstance(addr, str): | |
raise ValueError('The type of sender and address must be a string') | |
self.params[SmtpKeys.SMTP_SENDER] = [sender, addr] | |
return self | |
def with_receiver(self, receiver, addr): | |
if not isinstance(receiver, str): | |
raise ValueError('The type of receiver must be a string') | |
if not isinstance(addr, list): | |
raise ValueError('The type of receiver address must be a list') | |
self.params[SmtpKeys.SMTP_RECEIVER] = [receiver, addr] | |
return self | |
def with_subject(self, subject): | |
if not isinstance(subject, str): | |
raise ValueError('The type of subject must be a string') | |
self.params[SmtpKeys.SMTP_SUBJECT] = subject | |
return self | |
def with_third_party_service(self): | |
self.params[SmtpKeys.SMTP_IS_THIRD_PARTY] = True | |
return self | |
def with_send_html(self): | |
self.params[SmtpKeys.SMTP_IS_SEND_HTML] = True | |
return self | |
def with_ssl(self): | |
self.params[SmtpKeys.SMTP_IS_SSL] = True | |
return self | |
def with_verbose_log(self): | |
self.params[SmtpKeys.SMTP_IS_VERBOSE_LOG] = True | |
return self | |
def with_attachment(self, attachments): | |
if not isinstance(attachments, Attachment) and not isinstance(attachments, list): | |
raise ValueError('The type of attachments must be a class Attachment or list') | |
elif isinstance(attachments, list): | |
for a in attachments: | |
if not isinstance(a, Attachment): | |
raise ValueError('The type of attachments must be a class Attachment') | |
self.params[SmtpKeys.SMTP_ATTACHMENTS] = attachments | |
return self | |
def send(self, message_body, encode_format='utf-8'): | |
self._validate_necessary_metadata([SmtpKeys.SMTP_HOST, | |
SmtpKeys.SMTP_PORT, | |
SmtpKeys.SMTP_SUBJECT, | |
SmtpKeys.SMTP_SENDER, | |
SmtpKeys.SMTP_RECEIVER]) | |
message, from_addr, to_addr = self._build_message(message_body, encode_format) | |
smtp_server = self._build_smtp_server() | |
try: | |
smtp_server.sendmail(from_addr, to_addr, message.as_string()) | |
finally: | |
smtp_server.quit() | |
def _build_message(self, message_body, encode_format): | |
msg = MIMEMultipart() | |
sender = self.params[SmtpKeys.SMTP_SENDER] | |
msg['From'] = self._format_addr('%s <%s>' % (sender[0], sender[1]), encode_format) | |
receiver = self.params[SmtpKeys.SMTP_RECEIVER] | |
if len(receiver[1]) == 1: | |
msg['To'] = self._format_addr('%s <%s>' % (receiver[0], receiver[1]), encode_format) | |
else: | |
msg['To'] = self._format_addr(receiver[0], encode_format) | |
msg['Subject'] = Header(self.params[SmtpKeys.SMTP_SUBJECT], encode_format).encode() | |
if self._validated(SmtpKeys.SMTP_IS_SEND_HTML): | |
msg.attach(MIMEText(message_body, 'html', encode_format)) | |
else: | |
msg.attach(MIMEText(message_body, 'plain', encode_format)) | |
if SmtpKeys.SMTP_ATTACHMENTS in self.params: | |
attachments = self.params[SmtpKeys.SMTP_ATTACHMENTS] | |
if isinstance(attachments, list): | |
for a in attachments: | |
msg.attach(a.to_MIME()) | |
else: | |
msg.attach(attachments.to_MIME()) | |
return msg, sender[1], receiver[1] | |
def _build_smtp_server(self): | |
if self._validated(SmtpKeys.SMTP_IS_SSL): | |
smtp = smtplib.SMTP_SSL(self.params[SmtpKeys.SMTP_HOST], self.params[SmtpKeys.SMTP_PORT]) | |
else: | |
smtp = smtplib.SMTP(self.params[SmtpKeys.SMTP_HOST], self.params[SmtpKeys.SMTP_PORT]) | |
if self._validated(SmtpKeys.SMTP_IS_THIRD_PARTY): | |
if SmtpKeys.SMTP_USERNAME in self.params and SmtpKeys.SMTP_PASSWORD in self.params: | |
smtp.login(self.params[SmtpKeys.SMTP_USERNAME], self.params[SmtpKeys.SMTP_PASSWORD]) | |
if self._validated(SmtpKeys.SMTP_IS_VERBOSE_LOG): | |
smtp.set_debuglevel(1) | |
return smtp | |
def _format_addr(self, s, encode_format='utf-8'): | |
name, addr = parseaddr(s) | |
return formataddr((Header(name, encode_format).encode(), addr)) | |
def _validated(self, key): | |
return key in self.params and self.params[key] | |
def _validate_necessary_metadata(self, keys): | |
for key in keys: | |
if key not in self.params: | |
raise ValueError('The value %s must be exist' % key) | |
class Attachment(object): | |
""" | |
The class represent an object that comprises necessary metadata of | |
one attachment and provides a function for convert to MIMEBase | |
""" | |
def __init__(self, path, filename, maintype, subtype, id=None): | |
self.path = path | |
self.filename = filename | |
self.maintype = maintype | |
self.subtype = subtype | |
self.id = id | |
def to_MIME(self): | |
mime = MIMEBase(self.maintype, self.subtype, filename=self.filename) | |
mime.add_header('Content-Disposition', 'attachment', filename=self.filename) | |
if self.id is not None: | |
mime.add_header('Content-ID', '<%s>' % self.id) | |
mime.add_header('X-Attachment-Id', str(self.id)) | |
with open(self.path, 'rb') as f: | |
mime.set_payload(f.read()) | |
encoders.encode_base64(mime) | |
return mime |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment