Skip to content

Instantly share code, notes, and snippets.

@Ivlyth
Last active June 25, 2021 07:19
Show Gist options
  • Save Ivlyth/d8a9780b146625526a54aa74c5e225c8 to your computer and use it in GitHub Desktop.
Save Ivlyth/d8a9780b146625526a54aa74c5e225c8 to your computer and use it in GitHub Desktop.
命令行邮件工具,用于快速发送邮件及录制 pcap 用于协议解析测试
# -*- coding:utf8 -*-
"""
Author : Myth
Date : 2021/3/9
Email : email4myth at gmail.com
"""
from __future__ import unicode_literals
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import mimetypes
from cStringIO import StringIO
import argparse
import smtplib
from email.mime.application import MIMEApplication
from email.mime.audio import MIMEAudio
from email.mime.image import MIMEImage
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email.quoprimime import header_encode
import os
import time
try:
import qrcode
except:
qrcode = None
try:
import magic
except ImportError:
print("you can install filemagic module by execute `pip2 install filemagic` for intelligent file type detect")
magic = None
options = None
INSTALL = '''\
Installation
The current stable version of python-magic is available on PyPI and can be installed by running `pip install python-magic`.
This module is a simple wrapper around the libmagic C library, and that must be installed as well:
# Debian/Ubuntu
>>> sudo apt-get install libmagic1
# Centos
>>> sudo yum -y install epel-release
>>> sudo
# Windows
## You'll need DLLs for libmagic. @julian-r maintains a pypi package with the DLLs, you can fetch it with:
>>> pip install python-magic-bin
# OSX
## When using Homebrew:
>>> brew install libmagic
## When using macports:
>>>port install file
'''
MIME_TYPES = '''\
application/font-woff woff
application/java-archive jar
application/javascript js
text/javascript js
application/ms-msi msi
application/msword doc
application/vnd.ms-powerpoint ppt pps pot
application/vnd.ms-excel xls xlm xla xlc xlt xlw
application/pdf pdf
application/pkcs7-signature p7
application/vnd.font-fontforge-sfd sfd
application/vnd.ms-cab-compressed cab
application/vnd.ms-fontobject eot
application/vnd.ms-opentype otf
application/vnd.openxmlformats-officedocument.presentationml.presentation pptx
application/vnd.openxmlformats-officedocument.spreadsheetml.sheet xlsx
application/vnd.openxmlformats-officedocument.wordprocessingml.document docx
application/vnd.tcpdump.pcap pcap
application/warc warc
application/x-7z-compressed 7z
application/x-arc arc
application/x-arj arj
application/x-compress z
application/x-coredump core
application/x-cpio cpio
application/x-dmg dmg
application/x-dosexec exe
application/x-eet eet
application/x-font-ttf ttf
application/x-gzip gz
application/x-java-jnlp-file jnlp
application/x-lha lha
application/x-lrzip lrz
application/x-lz4 lz4
application/x-lzh lzh
application/x-mif mif
application/x-pem pem
application/x-rar rar
application/x-rpm rpm
application/x-shockwave-flash swf
application/x-stuffit sif
application/x-tar tar
application/x-xar pkg
application/x-xz xz
application/x-zoo zoo
application/xml xml
application/zip zip
audio/mpeg mp3
image/gif gif
image/jpeg jpg
image/png png
image/tiff tiff
image/vnd.adobe.photoshop pnd
image/x-cursor cur
image/x-icon ico
image/x-ms-bmp bmp
text/html html
text/json json
text/plain txt
text/rss rss
text/rtf rtf
text/x-awk awk
text/x-lua lua
text/x-perl pl
text/x-php php
text/x-python py
text/x-ruby rb
text/x-shellscript sh
video/3gpp 3gp
video/h264 264
video/matroska mkv
video/mj2 mj2
video/mp4 mp4
video/mpeg mpg
video/mpv mpv
video/quicktime qt
video/webm webm
video/x-flc flc
video/x-fli fli
video/x-flv flv
video/x-jng jng
video/x-mng mng
video/x-sgi-movie sgi
'''
class ArgumentDefaultsHelpFormatter(argparse.HelpFormatter):
"""Help message formatter which adds default values to argument help.
Only the name of this class is considered a public API. All the methods
provided by the class are considered an implementation detail.
"""
def _get_help_string(self, action):
help = action.help
if '%(default)' not in action.help:
if action.default is not argparse.SUPPRESS:
defaulting_nargs = [argparse.OPTIONAL, argparse.ZERO_OR_MORE]
if action.option_strings or action.nargs in defaulting_nargs:
if action.default is not None:
help += ' (default: %(default)s)'
return help
def define_and_parse_args(args=None):
global options
parser = argparse.ArgumentParser(prog="PROG",
formatter_class=ArgumentDefaultsHelpFormatter)
############### Mail
mail_group = parser.add_argument_group(u'Mail')
mail_group.add_argument('-s', '--subject', default='【测试】邮件主题', help=u'测试的邮件主题, 可以包含占位符: xxx')
mail_group.add_argument('-f', '--from-user', help='发件人')
mail_group.add_argument('-t', '--to-list', required=True, nargs='+', help='收件人列表')
mail_group.add_argument('-c', '--cc-list', nargs='+', help='抄送人列表')
mail_group.add_argument('-b', '--bcc-list', nargs='+', help='秘送人列表')
mail_group.add_argument('-C', '--content', help='邮件正文, 可以指定一个文件路径, 以文件内容作为正文')
mail_group.add_argument('-a', '--attachments', nargs='+', help='附件路径, 可以指定多个')
mail_group.add_argument('-r', '--repeat-name-n-times', type=int, default=0, help='重复文件名称指定次数, 模拟长文件名称')
mail_group.add_argument('-q', '--qr-code-content', nargs='+', help='二维码形式附件的内容')
op_group = parser.add_argument_group(title='Operation')
op_group.add_argument('-i', '--interactive-mode', action='store_true', help='交互模式, 主要用于通过 wireshark 观测发包')
op_group.add_argument('-m', '--message-id', default='mk-id-%(pid)s-%(ts)s', help='message ID 格式')
op_group.add_argument('-T', '--reply-to', help='自定义的 reply-to 地址')
op_group.add_argument('-M', '--add-message-id', action='store_true', help='添加 message ID')
op_group.add_argument('-R', '--repeat-times', type=int, default=1, help='将当前邮件重复发送指定次数')
op_group.add_argument('-H', '--custom-headers', nargs='+', help='自定义 header')
################# SMTP Server
smtp_group = parser.add_argument_group(title=u'SMTP Server')
smtp_group.add_argument('-S', '--smtp-server', default='58.251.82.205', help=u'SMTP server 地址')
smtp_group.add_argument('-P', '--smtp-server-port', type=int, default=25, help=u'SMTP server 非加密通信端口')
smtp_group.add_argument('-u', '--user', default='monitor@example.com', help='发信人邮箱账号')
smtp_group.add_argument('-p', '--password', default='xxxxxxxxxxx', help='发信人邮箱密码')
tcpdump_group = parser.add_argument_group(title='tcpdump hint')
tcpdump_group.add_argument('--tcpdump', action='store_true', help='展示 tcpdump 命令提示, 且需二次确认后才开始真正执行')
options = parser.parse_args(args)
# check
if options.attachments:
for path in options.attachments:
if not os.path.isfile(path):
parser.error("attachment %s not found" % path)
return options
def interactive_prompt(prompt_msg):
if options.interactive_mode:
raw_input(prompt_msg)
def get_mail_content():
if options.content:
if os.path.isfile(options.content):
return open(options.content, 'rb').read()
else:
return options.content
else:
return '''\
中文邮件内容.
english email content.
---------------
\r\n.\r\n..
'''
def main():
global options
options = define_and_parse_args()
if options.tcpdump:
print u'打开新的 shell 窗口执行以下 tcpdump 命令进行抓包, 使用真实 网卡名称 替换 IFNAME'
print u'tcpdump -i IFNAME -w %s_%s.pcap "host %s and port %s"' % (
options.smtp_server, options.smtp_server_port,
options.smtp_server, options.smtp_server_port
)
raw_input('tcpdump 命令开始执行后, 回车继续')
if options.from_user:
from_user = options.from_user
else:
from_user = options.user[:options.user.index('@')]
frm = u'%s<%s>' % (from_user, options.user)
to_list = options.to_list[:]
content = MIMEText(get_mail_content(), _subtype=u'html', _charset=u'utf-8')
message = MIMEMultipart()
message['From'] = Header(frm, 'utf-8')
message['To'] = Header(u';'.join(options.to_list), 'utf-8')
message['Subject'] = Header(options.subject, 'utf-8')
if options.add_message_id:
message_id = options.message_id % {
'pid': os.getpid(),
'ts': int(time.time() * 1000000)
}
message["Message-ID"] = Header(message_id, "utf-8")
if options.reply_to:
message["Reply-To"] = Header(options.reply_to, "utf-8")
if options.custom_headers:
for header in options.custom_headers:
h_name, _, h_value = header.partition(':')
h_name = h_name.strip()
h_value = h_value.strip()
if not (h_name and h_value):
print "Warn: header name or value is empty, ignored: %s" % header
continue
message[h_name] = Header(h_value, 'utf-8')
if options.cc_list:
to_list += options.cc_list
message['Cc'] = Header(u';'.join(options.cc_list), 'utf-8')
if options.bcc_list:
to_list += options.bcc_list
message['Bcc'] = Header(u';'.join(options.bcc_list), 'utf-8')
# 正文
message.attach(content)
if options.attachments:
mimetypes.inited = True
mt = mimetypes.MimeTypes()
mt.readfp(StringIO(MIME_TYPES))
for attachment in options.attachments:
basename = os.path.basename(attachment)
name, ext = os.path.splitext(basename)
if options.repeat_name_n_times > 0:
name = name * options.repeat_name_n_times
basename = u'%s%s' % (name, ext)
if ext:
mime_type, _ = mt.guess_type(attachment)
elif magic is not None: # no ext
m = magic.Magic(flags=magic.MAGIC_MIME_TYPE)
mime_type = m.id_buffer(open(attachment, 'rb').read(2048))
else:
mime_type = 'application/octet-stream'
if not mime_type:
print("Error: can not determine mime type for file %s" % attachment)
sys.exit(1)
elif mime_type.startswith('text/'):
att = MIMEText(open(attachment, 'rb').read(), _subtype=mime_type.split('/')[-1], _charset='utf-8')
elif mime_type.startswith('application/'):
att = MIMEApplication(open(attachment, 'rb').read(), _subtype=mime_type.split('/')[-1])
elif mime_type.startswith('image/'):
att = MIMEImage(open(attachment, 'rb').read(), _subtype=mime_type.split('/')[-1])
elif mime_type.startswith('audio/'):
att = MIMEAudio(open(attachment, 'rb').read(), _subtype=mime_type.split('/')[-1])
else:
print("Error: unsupported file suffix: %s" % attachment)
sys.exit(1)
att.add_header('Content-Disposition', 'attachment',
filename=header_encode(basename.encode('utf-8'), charset='utf-8'))
message.attach(att)
if options.qr_code_content:
if qrcode is None:
print("Error: qrcode module not installed, please execute `pip2 install qrcode` to install it before use this script")
sys.exit(1)
for qr_content in options.qr_code_content:
img = qrcode.make(qr_content)
buf = StringIO()
img.save(buf)
basename = qr_content.replace('"', '_').replace("'", '_').replace(' ', '_').replace(':', '_').replace('/', '_').replace('.', '_')
att = MIMEText(buf.getvalue(), _charset='utf-8')
att.add_header('Content-Disposition', 'attachment',
filename=header_encode('%s.png' % basename, charset='utf-8'))
message.attach(att)
interactive_prompt("ready to construct SMTP instance")
client = smtplib.SMTP()
interactive_prompt("ready to connect to SMTP server %s:%s" % (options.smtp_server, options.smtp_server_port))
client.connect(host=options.smtp_server, port=options.smtp_server_port)
try:
interactive_prompt("ready to login for user: %s with password: %s" % (options.user, options.password))
client.login(options.user, options.password)
except Exception as e:
print "Error: login failed: %s" % e
return
for i in range(options.repeat_times):
try:
to_list = list(set(to_list))
interactive_prompt("ready to send mail for loop %s" % i)
client.sendmail(frm, to_list, message.as_string())
interactive_prompt("send mail for loop %s done" % i)
except Exception as e:
print "Error: send mail failed: %s" % e
return
if options.tcpdump:
print u'可以在另一个窗口中停止 tcpdump 命令'
print u'邮件发送结束'
if __name__ == '__main__':
# sys.argv = ['mail-maker.py', '--tcpdump', '-s', 'test subject', '-C', 'test content', '-t', 'myth.ren@example.com']
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment