Skip to content

Instantly share code, notes, and snippets.

@mvyskocil
Last active December 15, 2015 06:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mvyskocil/5220690 to your computer and use it in GitHub Desktop.
Save mvyskocil/5220690 to your computer and use it in GitHub Desktop.
funcmail.py: create tags for a notmuch based on evaluating Python expression. This is a workaround for a fact notmuch does not index email headers. Script will print the file intended for notmuch-tag --batch, so usage is python funcmail.py | notmuch-tag --batch
#!/usr/bin/python
from __future__ import print_function
from tokenize import generate_tokens, untokenize, STRING, NAME, OP, COMMA
#
# Generate tags for notmuch-tag --batch from simple rules
#
# Copyright (c) 2013 Michal Vyskocil <michal.vyskocil@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# README
# ======
#
# Intended usage is python funcmail.py | notmuch-tag --batch
#
# The rules file is simple text file in form
# #this is a comment (but all lines w/o -- are ignored yet)
# list-of-tags -- python expression
# where tags can be prefixed by +/-, see notmuch-tag(1) for details
#
# Example:
# # tags will be applied when string is in header
# +notmuch -inbox -- 'notmuch.notmuchmail.org' in 'List-Id'
# # procmail-like rule, the whole email is searched for a pattern
# +notmuch -inbox -- 'List-Id.*notmuch.notmuchmail.org'
#
#XXX: python 2.7 contains io.StringIO, which needs unicode object
#therefor try to import old stuff before new one
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
import collections
import email.message
import os
import sys
try:
from urllib.parse import quote as urllib_quote
from urllib.parse import unquote as urllib_unquote
except ImportError:
from urllib import quote as urllib_quote
from urllib import unquote as urllib_unquote
def func_globals_2(func):
return func.im_func.func_globals
def func_globals_3(func):
return func.__globals__
if sys.version_info[0] == 2:
func_globals = func_globals_2
else:
func_globals = func_globals_3
#
#TODO = """
# * search inside email ('foo' in EMAIL)
# * user-defined functions
# * user-defined variables (listid = ('List-Id', 'X-Mailinglist')
# * stop-when-rule-is-passed
# * else branch: what to do if no rule above apply
# * and maybe some profiling
#"""
#
class FuncmailMesage(email.message.Message):
# instances will be evaluated in a limited environment, so
# lets be self-contained as much as possible
email = __import__('email.message', fromlist=[True, ])
parser = __import__('email.parser', fromlist=[True, ])
def __init__(self):
self.email.Message.__init__(self)
@classmethod
def from_file(cls, fd):
return cls.parser.Parser(_class=cls).parse(fd)
@classmethod
def from_string(cls, string):
return cls.parser.Parser(_class=cls).parsestr(string)
def __contains__(self, pattern):
""" supports <pattern> in EMAIL, which is translated as
EMAIL.__contains__(pattern)
"""
if isinstance(pattern, FuncmailString):
return FuncmailString.__contains_m__(self, pattern)
elif isinstance(pattern, str):
#from message.py
return pattern.lower() in (k.lower() for k, v in self._headers)
else:
raise TypeError("FuncmailString or str expected, got '{}'".format(type(pattern)))
class FuncmailString(str):
"""Wrapper around string type with own __contains__ and
__eq__ methods. Those perform a re.search.
"""
# instances will be evaluated in a limited environment, so
# lets be self-contained as much as possible
re = __import__('re')
email = __import__('email.message')
def __repr__(self):
return "FuncmailString('{}')".format(str(self))
@classmethod
def __contains_h__(cls, key, pattern):
global EMAIL
value = EMAIL.get(str(key), '')
if not value:
return False
ret = (cls.re.search(pattern, value) is not None)
return ret
@classmethod
def __contains_m__(cls, email, pattern):
for line in email.as_string().split('\n'):
ret = (cls.re.search(pattern, line) is not None)
if ret:
return True
return False
def __contains__(self, pattern):
return self.__contains_h__(self, str(pattern))
def __eq__(self, otherstr):
if isinstance(otherstr, self.email.message.Message):
return self.__contains_m__(otherstr, self)
return self.__contains_h__(str(otherstr), self)
def __hash__(self):
return hash(str(self))
def wrap_strings(s, wrapper_name='FuncmailString'):
"""Wrap all strings in expression by FuncmailString"""
result = []
g = generate_tokens(StringIO(s).readline)
for toknum, tokval, _, _, _ in g:
if toknum == STRING:
result.extend([
(NAME, wrapper_name),
(OP, '('),
(STRING, tokval),
(OP, ')')
])
else:
result.append((toknum, tokval))
return untokenize(result)
class FuncmailEvaluator(object):
default_ns = {"FuncmailString" : FuncmailString, "FuncmailMesage" : FuncmailMesage}
def __call__(self, inp, globals, locals={}):
ns = globals
ns.update(self.default_ns)
assert('EMAIL' in ns)
if not locals:
locals=ns
ret = eval(inp, ns, locals)
if isinstance(ret, str):
return bool(ns['EMAIL'].__contains__(ret))
return bool(ret)
fm_eval = FuncmailEvaluator()
def get_test_email1():
string = """From: foo@example.com
List-Id: Notmuch Mailinglist <notmuch@notmuchmail.org>
this is the content
"""
return FuncmailMesage.from_string(string)
def get_test_email2():
string = """From: foo@example.com
X-Mailinglist: Notmuch Mailinglist <notmuch@notmuchmail.org>
this is the content
"""
return FuncmailMesage.from_string(string)
def test():
msg1 = get_test_email1()
msg2 = get_test_email2()
#This is an ugly hack - import msg as EMAIL into __contains__ globals
func_globals(FuncmailString.__contains__)['EMAIL'] = msg1
ns = {'EMAIL' : msg1}
#test1: how __contains__ and __eq__ works
assert (FuncmailString('notmuch@notmuchmail.org') in FuncmailString('List-Id')) == True
assert (FuncmailString('notmuch@notmuchmail.org') == FuncmailString('List-Id')) == True
inp = """'notmuch@notmuchmail.org' in 'List-Id'"""
my_str = wrap_strings(inp, wrapper_name="W")
#print(my_str)
assert(fm_eval(my_str, {'EMAIL': msg1, 'W' : FuncmailString}) == True)
inp2 = """'notmuch@notmuchmail.org' == 'List-Id'"""
my_str2 = wrap_strings(inp2)
#print(my_str2)
assert(fm_eval(my_str2, ns) == True)
#This is an ugly hack - import msg as EMAIL into __contains__ globals
#there are two ways how to work-around
# 1. have it in separate file and call __import__ with defined ns
# 2. have it in a string and build it as exec string in ns
func_globals(FuncmailString.__contains__)['EMAIL'] = msg2
ns = {'EMAIL' : msg2}
assert (FuncmailString('notmuch@notmuchmail.org') in (FuncmailString('List-Id'), FuncmailString('X-Mailinglist'))) == True
inp3 = """'notmuch@notmuchmail.org' in ('List-Id', 'X-Mailinglist')"""
my_str3 = wrap_strings(inp3)
assert(fm_eval(my_str3, ns) == True)
assert((FuncmailString('List-Id.*notmuch@notmuchmail.org') in msg1) == True)
assert((FuncmailString('List-Id.*notmuch@notmuchmail.org') == msg1) == True)
assert((FuncmailString('List-Id.*notmuch@notmuchmail.org') in msg2) == False)
inp4 = """'notmuch@notmuchmail.org in EMAIL'"""
wstr4 = wrap_strings(inp4)
assert(fm_eval(wstr4, ns) == False)
inp5 = """'X-Mailinglist.*notmuch@notmuchmail'"""
wstr5 = wrap_strings(inp5)
assert(fm_eval(wstr5, ns))
#cleanup the definition
del func_globals(FuncmailString.__contains__)['EMAIL']
return 0
def unquote(s):
return urllib_unquote(s.strip())
def quote(s):
return urllib_quote(s)
def load_rules_from_fd(fd):
"""Load rules from opened file or any other iterable
Format is
+tags -separated +by -space -- expr
Return a list of
(expr, ('+list', '-of', '+tags))
"""
rules = list()
fd_name = fd.name if hasattr(fd, "name") else "<input>"
for i, line in enumerate(fd):
if line[0] == '#' or not '--' in line:
continue
tags, expr = line.split('--')
expr = expr.strip()
try:
code = compile(wrap_strings(expr), "{}:{}".format(fd_name, i+1), "eval")
except SyntaxError as se:
print("WARNING: ignoring {}:{}".format(fd_name, i+1))
print(se, file=sys.stderr)
continue
rules.append(
(
code,
tuple(unquote(tag) for tag in tags.split(' ') if tag.strip())
)
)
return rules
def load_rules_from_file(path):
with open(path, 'rt') as fd:
return load_rules_from_fd(fd)
def get_msgid(msg):
if not 'Message-ID' in msg:
return None
msgid = msg['Message-ID']
if msgid[0] == '<' and msgid[-1:] == '>':
return msgid[1:-1]
return msgid
def match_rules(rules, msg):
tags = list()
msgid = get_msgid(msg)
if not msgid:
return msgid, tags
#This is an ugly hack - import msg as EMAIL into __contains__ globals
func_globals(FuncmailString.__contains__)['EMAIL'] = msg
ns = {"FuncmailString" : FuncmailString, "EMAIL" : msg}
for code, ts in rules:
try:
ret = eval(code, ns)
except StandardError as e:
print("NOTE: {}".format(e), file=sys.stderr)
continue
if not ret:
continue
tags.extend(ts)
return msgid, tags
def list_path(path):
for root, subdirs, files in os.walk(path):
if os.path.basename(root) not in ("cur", "new", "tmp"):
continue
for f in files:
yield os.path.join(root, f)
def list_from_path(path):
with open(path, 'rt') as fd:
return list_files(fd)
def list_files(files):
for x in files:
root = os.path.dirname(x)
#if os.path.basename(root) not in ("cur", "new", "tmp"):
# continue
yield x
def match_files(rules, file_set_generator):
ret = collections.defaultdict(list)
for path in file_set_generator:
with open(path, 'rt') as f:
msg = FuncmailMesage.from_file(f)
if not msg:
continue
msgid, tags = match_rules(rules, msg)
if not msgid or not tags:
continue
for t in tags:
ret[t].append(msgid)
for tag in ret:
ret[tag] = tuple(sorted(ret[tag]))
return ret
def optimize_tags(inp):
"""
An optimization pass - in case several rules matches to same set of tags, they
will be printed on one line
Algorithm is simple - all it does is it exchange key and values in original dict
so if there is a tag matches to the same list of ids, they will be joined
"""
ret = collections.defaultdict(list)
for k, v in inp.items():
ret[v].append(k)
return ret
def print_tags(tags, file=sys.stdout):
for idlist, taglist in tags.items():
ids = ("id:{}".format(_id) for _id in idlist)
tags = (quote(_tag) for _tag in taglist)
print("{} -- {}".format(
' '.join(taglist),
' or '.join(ids), file=file))
def get_dbpath(path=os.path.expanduser('~/.notmuch-config')):
if not os.path.isfile(path):
raise ValueError("'{}' does not exists".format(path))
try:
import configparser
except ImportError:
import ConfigParser as configparser
config = configparser.ConfigParser()
config.read(path)
if not config.has_section('database'):
raise KeyError("[database] section missing in '{}'".format(path))
if not config.has_option('database', 'path'):
raise KeyError("path does not exists in section database in '{}'".format(path))
return config.get('database', 'path')
def mkparser():
import argparse
p = argparse.ArgumentParser(description="tag email for notmuch")
p.add_argument('-c', '--config', help="path to notmuch config used to read database.path (defaults to ~/.notmuch-config)")
p.add_argument('-r', '--rules', help="load rules from this file (defaults to $dbpath/.notmuch/funcmail.rules)")
p.add_argument('-d', '--dbpath', metavar="DBPATH", help="use directory instead of path read from notmuch config")
p.add_argument('--input', metavar='FILE', help="read list of files from FILE (- is stdin)")
p.add_argument('-t', '--test', default=False, action="store_true", help="run built-in unit tests")
return p
def check(val, check_f, err_msg, ret_code=1):
if not check_f(val):
print(err_msg.format(value=val), file=sys.stderr)
sys.exit(ret_code)
def check_file_exist(val, err_msg = "No such file '{value}'"):
return check(val,
os.path.isfile,
err_msg)
def check_directory_exist(val, err_msg = "No such directory '{value}'"):
return check(val,
os.path.isdir,
err_msg)
def main(argv):
p = mkparser()
args = p.parse_args(argv)
if args.test:
sys.exit(test())
list_f = None
dbpath = None
if args.input:
if args.input == "-":
list_f = lambda : list_files(sys.stdin)
else:
check_file_exist(args.input)
list_f = lambda : list_from_path(args.input)
if args.dbpath:
check_directory_exist(args.dbpath)
dbpath = args.dbpath
if not dbpath:
try:
if args.config:
check_file_exist(args.config)
dbpath = get_dbpath(args.config)
else:
dbpath = get_dbpath()
#XXX: ConfigParser is broken and all errors are derived from Exception, instead of StandardError
except Exception as err:
print("ERROR: " + str(err), file=sys.stderr)
sys.exit(2)
check_directory_exist(dbpath)
if not list_f:
list_f = lambda : list_path(dbpath)
if not args.rules:
rules = os.path.join(dbpath, ".notmuch", "funcmail.rules")
else:
rules = args.rules
check_file_exist(rules)
#print("DEBUG:\nargs.input: '{inp}'\ndbpath: '{dbpath}'\nrules: '{rules}'".format(
# inp=args.input, dbpath=dbpath, rules=rules))
print_tags(
optimize_tags(
match_files(
load_rules_from_file(rules),
list_f())),
file=sys.stdout)
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment