Skip to content

Instantly share code, notes, and snippets.

@belyaev-pa
Created January 10, 2019 15:01
Show Gist options
  • Save belyaev-pa/126a249a413c17ee5c0adda5327c521b to your computer and use it in GitHub Desktop.
Save belyaev-pa/126a249a413c17ee5c0adda5327c521b to your computer and use it in GitHub Desktop.
email filter
# !/usr/bin/python
# -*- coding: utf-8 -*-
from email.parser import Parser
import syslog
import email
import base64
import ldap
import sys
import argparse
import os
import subprocess
import time
# WHITE_LIST_TYPE = 'file' #'json'
ENCODING = 'utf-8'
LDAP_ALIASES = '/etc/postfix/ldap_aliases.cf'
# LDAP_ADDR = '10.32.99.163'
# BASEDN = 'CN=accounts, DC=che1, DC=ru'
NAME_ATTR = 'gecos'
WHITE_LIST_PATH = '/opt/white_list'
SEND_COMMAND = '/usr/sbin/sendmail -G -i'
# results codes
EX_OK = 0
EX_TEMPFAIL = 75
EX_UNAVAILABLE = 69
def get_ldap_host_and_dn():
"""Parse ldap config from LDAP_ALIASES file
:return: host string and base dn
"""
host = None
base_dn = None
if not os.path.exists(LDAP_ALIASES):
syslog.syslog(syslog.LOG_INFO, 'Error: File ldap_aliases not found')
return None, None
with open(LDAP_ALIASES, 'r') as ldap_conf:
for line in ldap_conf:
param, tune_up = line.split('=', 1)
if param.strip() == 'server_host':
host = tune_up.strip()
if param.strip() == 'search_base':
base_dn = tune_up.strip()
return host, base_dn
def get_name_from_ldap(finding_email):
"""Get name from LDAP for requested email
:param finding_email:
:return: EX_TEMPFAIL, None | if no config file or parameters not found
1, None | if email not found in ldap
0, 'base64 encoded string with name' | if all OK
"""
host, basedn = get_ldap_host_and_dn()
if (host is None) or (basedn is None):
syslog.syslog(syslog.LOG_INFO, 'Error: File ldap_aliases not found or have no server_host/search_base param')
return EX_TEMPFAIL, None
ad = ldap.initialize('ldap://' + host)
ad.simple_bind_s() # here we need to provide user and psswd
scope = ldap.SCOPE_SUBTREE
filter_exp = 'mail={}'.format(finding_email)
attr_list = [NAME_ATTR]
result = ad.search_s(basedn, scope, filter_exp, attr_list)
syslog.syslog(syslog.LOG_INFO, 'result: {0}, {1}'.format(result, type(result)))
if not result:
syslog.syslog(syslog.LOG_INFO, 'Error: Email {0} not found in LDAP'.format(email))
return 1, None
else:
s = result[0][1][NAME_ATTR][0]
syslog.syslog(syslog.LOG_INFO, 's: {0}'.format(s))
# print('s = {0}'.format(s));
# return base64.b64encode(s.encode(ENCODING))
return 0, base64.b64encode(s)
def check_white_list(mail_from, mail_to_list):
"""
check email with white list file
can user send email to following emails
:param mail_from: email from address - string
:param mail_to_list: email to addresses - list
:return: result code
"""
err_count = 0
if not os.path.exists(WHITE_LIST_PATH):
syslog.syslog(syslog.LOG_INFO, 'Error: File white_list not found')
return EX_TEMPFAIL
# TODO: make parse json instead of file
with open(WHITE_LIST_PATH, 'r') as white_list:
for line in white_list:
line = line.split(' ', 1)
if mail_from == line[0]:
allowed_to_list = line[1].split(',')
for mail_to_addr in mail_to_list:
if mail_to_addr not in allowed_to_list:
syslog.syslog(syslog.LOG_INFO, 'Deny from {0} to {1}'.format(mail_from, mail_to_addr))
print('Deny from {0} to {1}'.format(mail_from, mail_to_addr))
err_count += 1
if err_count:
return EX_UNAVAILABLE
return EX_OK
def send_email(message, mail_from, mail_to):
"""send mail with initiate subprocess with SEND_COMMAND
:param message: message as sting with email.parser example *msg.as_string(unixfrom=True)*
:param mail_from: address from - string
:param mail_to: addresses to - string ' ' space separated
:return: None
"""
cmd_str = SEND_COMMAND + ' -f ' + mail_from + ' -- ' + mail_to
proc = subprocess.Popen([cmd_str], shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# proc.communicate(input=message.encode("utf-8"))
proc.communicate(input=message)
result = proc.wait()
syslog.syslog(syslog.LOG_INFO, 'Result of sendmail: {0}.'.format(result))
def header_handler(msg, field_name):
"""Handle parsed message attributes
:param msg: Parsed message with email.parsec Parser
:param field_name: name of handling field with email 'From', 'To' for example
:return: result code
"""
field = msg.get(field_name)
if field is not None:
addresses = email.utils.getaddresses([field])
new_field = ''
for addr in addresses:
ldap_res = get_name_from_ldap(addr[1])
if ldap_res[0] == 0:
name = str(ldap_res[1]).lstrip('b').strip('\'')
new_field += '=?' + ENCODING + '?B?' + name + '?=' + ' <' + addr[1] + '>, '
elif ldap_res[0] == 1:
new_field += addr[1] + ', '
else:
return ldap_res[0]
msg.replace_header(field_name, new_field)
return 0
def main(message, mail_from, mail_to_list):
"""handling algorithm:
1) Check mails in white list
2) Replace To field
3) Replace Cc field
4) Replace From field
5) Send mail
:param message: full message as file
:param mail_from: parameter 'from' from console command
:param mail_to_list: parameter 'to' from console command
:return: result code
"""
if (mail_from is None) or (not mail_to_list):
syslog.syslog(syslog.LOG_INFO, 'Error: mail_from({0}) or mail_to({1}) is empty'.format(mail_from, mail_to))
return EX_UNAVAILABLE
if check_white_list(mail_from, mail_to_list) != EX_OK:
return EX_UNAVAILABLE
email_parser = Parser()
msg = email_parser.parsestr(message.read())
result = header_handler(msg, 'To')
if result == EX_TEMPFAIL:
return EX_TEMPFAIL
result = header_handler(msg, 'Cc')
if result == EX_TEMPFAIL:
return EX_TEMPFAIL
result = header_handler(msg, 'From')
if result == EX_TEMPFAIL:
return EX_TEMPFAIL
send_email(msg.as_string(unixfrom=True), mail_from, ' '.join(mail_to_list))
return EX_OK
if __name__ == '__main__':
syslog.openlog('mail_filter')
syslog.syslog(syslog.LOG_INFO, 'Start filter.py ')
syslog.syslog(syslog.LOG_INFO, 'Full args: {0}'.format(' '.join(sys.argv)))
std_from, std_to, std_in = sys.argv[2], sys.argv[4:], sys.stdin
syslog.syslog(syslog.LOG_INFO, 'From: {0}'.format(std_from))
syslog.syslog(syslog.LOG_INFO, 'To: {0}'.format(std_to))
res = main(std_in, std_from, std_to)
syslog.syslog(syslog.LOG_INFO, 'Stop filter.py. Result: {0}'.format(res))
syslog.closelog()
sys.exit(res)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment