Last active
December 15, 2015 06:59
-
-
Save mvyskocil/5220690 to your computer and use it in GitHub Desktop.
funcmail.py: create tags for a notmuch based on evaluating Python expression. This is a workaround for a fact notmuch does not index email headers. Script will print the file intended for notmuch-tag --batch, so usage is python funcmail.py | notmuch-tag --batch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from __future__ import print_function | |
from tokenize import generate_tokens, untokenize, STRING, NAME, OP, COMMA | |
# | |
# Generate tags for notmuch-tag --batch from simple rules | |
# | |
# Copyright (c) 2013 Michal Vyskocil <michal.vyskocil@gmail.com> | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
# | |
# README | |
# ====== | |
# | |
# Intended usage is python funcmail.py | notmuch-tag --batch | |
# | |
# The rules file is simple text file in form | |
# #this is a comment (but all lines w/o -- are ignored yet) | |
# list-of-tags -- python expression | |
# where tags can be prefixed by +/-, see notmuch-tag(1) for details | |
# | |
# Example: | |
# # tags will be applied when string is in header | |
# +notmuch -inbox -- 'notmuch.notmuchmail.org' in 'List-Id' | |
# # procmail-like rule, the whole email is searched for a pattern | |
# +notmuch -inbox -- 'List-Id.*notmuch.notmuchmail.org' | |
# | |
#XXX: python 2.7 contains io.StringIO, which needs unicode object | |
#therefor try to import old stuff before new one | |
try: | |
from cStringIO import StringIO | |
except ImportError: | |
from io import StringIO | |
import collections | |
import email.message | |
import os | |
import sys | |
try: | |
from urllib.parse import quote as urllib_quote | |
from urllib.parse import unquote as urllib_unquote | |
except ImportError: | |
from urllib import quote as urllib_quote | |
from urllib import unquote as urllib_unquote | |
def func_globals_2(func): | |
return func.im_func.func_globals | |
def func_globals_3(func): | |
return func.__globals__ | |
if sys.version_info[0] == 2: | |
func_globals = func_globals_2 | |
else: | |
func_globals = func_globals_3 | |
# | |
#TODO = """ | |
# * search inside email ('foo' in EMAIL) | |
# * user-defined functions | |
# * user-defined variables (listid = ('List-Id', 'X-Mailinglist') | |
# * stop-when-rule-is-passed | |
# * else branch: what to do if no rule above apply | |
# * and maybe some profiling | |
#""" | |
# | |
class FuncmailMesage(email.message.Message): | |
# instances will be evaluated in a limited environment, so | |
# lets be self-contained as much as possible | |
email = __import__('email.message', fromlist=[True, ]) | |
parser = __import__('email.parser', fromlist=[True, ]) | |
def __init__(self): | |
self.email.Message.__init__(self) | |
@classmethod | |
def from_file(cls, fd): | |
return cls.parser.Parser(_class=cls).parse(fd) | |
@classmethod | |
def from_string(cls, string): | |
return cls.parser.Parser(_class=cls).parsestr(string) | |
def __contains__(self, pattern): | |
""" supports <pattern> in EMAIL, which is translated as | |
EMAIL.__contains__(pattern) | |
""" | |
if isinstance(pattern, FuncmailString): | |
return FuncmailString.__contains_m__(self, pattern) | |
elif isinstance(pattern, str): | |
#from message.py | |
return pattern.lower() in (k.lower() for k, v in self._headers) | |
else: | |
raise TypeError("FuncmailString or str expected, got '{}'".format(type(pattern))) | |
class FuncmailString(str): | |
"""Wrapper around string type with own __contains__ and | |
__eq__ methods. Those perform a re.search. | |
""" | |
# instances will be evaluated in a limited environment, so | |
# lets be self-contained as much as possible | |
re = __import__('re') | |
email = __import__('email.message') | |
def __repr__(self): | |
return "FuncmailString('{}')".format(str(self)) | |
@classmethod | |
def __contains_h__(cls, key, pattern): | |
global EMAIL | |
value = EMAIL.get(str(key), '') | |
if not value: | |
return False | |
ret = (cls.re.search(pattern, value) is not None) | |
return ret | |
@classmethod | |
def __contains_m__(cls, email, pattern): | |
for line in email.as_string().split('\n'): | |
ret = (cls.re.search(pattern, line) is not None) | |
if ret: | |
return True | |
return False | |
def __contains__(self, pattern): | |
return self.__contains_h__(self, str(pattern)) | |
def __eq__(self, otherstr): | |
if isinstance(otherstr, self.email.message.Message): | |
return self.__contains_m__(otherstr, self) | |
return self.__contains_h__(str(otherstr), self) | |
def __hash__(self): | |
return hash(str(self)) | |
def wrap_strings(s, wrapper_name='FuncmailString'): | |
"""Wrap all strings in expression by FuncmailString""" | |
result = [] | |
g = generate_tokens(StringIO(s).readline) | |
for toknum, tokval, _, _, _ in g: | |
if toknum == STRING: | |
result.extend([ | |
(NAME, wrapper_name), | |
(OP, '('), | |
(STRING, tokval), | |
(OP, ')') | |
]) | |
else: | |
result.append((toknum, tokval)) | |
return untokenize(result) | |
class FuncmailEvaluator(object): | |
default_ns = {"FuncmailString" : FuncmailString, "FuncmailMesage" : FuncmailMesage} | |
def __call__(self, inp, globals, locals={}): | |
ns = globals | |
ns.update(self.default_ns) | |
assert('EMAIL' in ns) | |
if not locals: | |
locals=ns | |
ret = eval(inp, ns, locals) | |
if isinstance(ret, str): | |
return bool(ns['EMAIL'].__contains__(ret)) | |
return bool(ret) | |
fm_eval = FuncmailEvaluator() | |
def get_test_email1(): | |
string = """From: foo@example.com | |
List-Id: Notmuch Mailinglist <notmuch@notmuchmail.org> | |
this is the content | |
""" | |
return FuncmailMesage.from_string(string) | |
def get_test_email2(): | |
string = """From: foo@example.com | |
X-Mailinglist: Notmuch Mailinglist <notmuch@notmuchmail.org> | |
this is the content | |
""" | |
return FuncmailMesage.from_string(string) | |
def test(): | |
msg1 = get_test_email1() | |
msg2 = get_test_email2() | |
#This is an ugly hack - import msg as EMAIL into __contains__ globals | |
func_globals(FuncmailString.__contains__)['EMAIL'] = msg1 | |
ns = {'EMAIL' : msg1} | |
#test1: how __contains__ and __eq__ works | |
assert (FuncmailString('notmuch@notmuchmail.org') in FuncmailString('List-Id')) == True | |
assert (FuncmailString('notmuch@notmuchmail.org') == FuncmailString('List-Id')) == True | |
inp = """'notmuch@notmuchmail.org' in 'List-Id'""" | |
my_str = wrap_strings(inp, wrapper_name="W") | |
#print(my_str) | |
assert(fm_eval(my_str, {'EMAIL': msg1, 'W' : FuncmailString}) == True) | |
inp2 = """'notmuch@notmuchmail.org' == 'List-Id'""" | |
my_str2 = wrap_strings(inp2) | |
#print(my_str2) | |
assert(fm_eval(my_str2, ns) == True) | |
#This is an ugly hack - import msg as EMAIL into __contains__ globals | |
#there are two ways how to work-around | |
# 1. have it in separate file and call __import__ with defined ns | |
# 2. have it in a string and build it as exec string in ns | |
func_globals(FuncmailString.__contains__)['EMAIL'] = msg2 | |
ns = {'EMAIL' : msg2} | |
assert (FuncmailString('notmuch@notmuchmail.org') in (FuncmailString('List-Id'), FuncmailString('X-Mailinglist'))) == True | |
inp3 = """'notmuch@notmuchmail.org' in ('List-Id', 'X-Mailinglist')""" | |
my_str3 = wrap_strings(inp3) | |
assert(fm_eval(my_str3, ns) == True) | |
assert((FuncmailString('List-Id.*notmuch@notmuchmail.org') in msg1) == True) | |
assert((FuncmailString('List-Id.*notmuch@notmuchmail.org') == msg1) == True) | |
assert((FuncmailString('List-Id.*notmuch@notmuchmail.org') in msg2) == False) | |
inp4 = """'notmuch@notmuchmail.org in EMAIL'""" | |
wstr4 = wrap_strings(inp4) | |
assert(fm_eval(wstr4, ns) == False) | |
inp5 = """'X-Mailinglist.*notmuch@notmuchmail'""" | |
wstr5 = wrap_strings(inp5) | |
assert(fm_eval(wstr5, ns)) | |
#cleanup the definition | |
del func_globals(FuncmailString.__contains__)['EMAIL'] | |
return 0 | |
def unquote(s): | |
return urllib_unquote(s.strip()) | |
def quote(s): | |
return urllib_quote(s) | |
def load_rules_from_fd(fd): | |
"""Load rules from opened file or any other iterable | |
Format is | |
+tags -separated +by -space -- expr | |
Return a list of | |
(expr, ('+list', '-of', '+tags)) | |
""" | |
rules = list() | |
fd_name = fd.name if hasattr(fd, "name") else "<input>" | |
for i, line in enumerate(fd): | |
if line[0] == '#' or not '--' in line: | |
continue | |
tags, expr = line.split('--') | |
expr = expr.strip() | |
try: | |
code = compile(wrap_strings(expr), "{}:{}".format(fd_name, i+1), "eval") | |
except SyntaxError as se: | |
print("WARNING: ignoring {}:{}".format(fd_name, i+1)) | |
print(se, file=sys.stderr) | |
continue | |
rules.append( | |
( | |
code, | |
tuple(unquote(tag) for tag in tags.split(' ') if tag.strip()) | |
) | |
) | |
return rules | |
def load_rules_from_file(path): | |
with open(path, 'rt') as fd: | |
return load_rules_from_fd(fd) | |
def get_msgid(msg): | |
if not 'Message-ID' in msg: | |
return None | |
msgid = msg['Message-ID'] | |
if msgid[0] == '<' and msgid[-1:] == '>': | |
return msgid[1:-1] | |
return msgid | |
def match_rules(rules, msg): | |
tags = list() | |
msgid = get_msgid(msg) | |
if not msgid: | |
return msgid, tags | |
#This is an ugly hack - import msg as EMAIL into __contains__ globals | |
func_globals(FuncmailString.__contains__)['EMAIL'] = msg | |
ns = {"FuncmailString" : FuncmailString, "EMAIL" : msg} | |
for code, ts in rules: | |
try: | |
ret = eval(code, ns) | |
except StandardError as e: | |
print("NOTE: {}".format(e), file=sys.stderr) | |
continue | |
if not ret: | |
continue | |
tags.extend(ts) | |
return msgid, tags | |
def list_path(path): | |
for root, subdirs, files in os.walk(path): | |
if os.path.basename(root) not in ("cur", "new", "tmp"): | |
continue | |
for f in files: | |
yield os.path.join(root, f) | |
def list_from_path(path): | |
with open(path, 'rt') as fd: | |
return list_files(fd) | |
def list_files(files): | |
for x in files: | |
root = os.path.dirname(x) | |
#if os.path.basename(root) not in ("cur", "new", "tmp"): | |
# continue | |
yield x | |
def match_files(rules, file_set_generator): | |
ret = collections.defaultdict(list) | |
for path in file_set_generator: | |
with open(path, 'rt') as f: | |
msg = FuncmailMesage.from_file(f) | |
if not msg: | |
continue | |
msgid, tags = match_rules(rules, msg) | |
if not msgid or not tags: | |
continue | |
for t in tags: | |
ret[t].append(msgid) | |
for tag in ret: | |
ret[tag] = tuple(sorted(ret[tag])) | |
return ret | |
def optimize_tags(inp): | |
""" | |
An optimization pass - in case several rules matches to same set of tags, they | |
will be printed on one line | |
Algorithm is simple - all it does is it exchange key and values in original dict | |
so if there is a tag matches to the same list of ids, they will be joined | |
""" | |
ret = collections.defaultdict(list) | |
for k, v in inp.items(): | |
ret[v].append(k) | |
return ret | |
def print_tags(tags, file=sys.stdout): | |
for idlist, taglist in tags.items(): | |
ids = ("id:{}".format(_id) for _id in idlist) | |
tags = (quote(_tag) for _tag in taglist) | |
print("{} -- {}".format( | |
' '.join(taglist), | |
' or '.join(ids), file=file)) | |
def get_dbpath(path=os.path.expanduser('~/.notmuch-config')): | |
if not os.path.isfile(path): | |
raise ValueError("'{}' does not exists".format(path)) | |
try: | |
import configparser | |
except ImportError: | |
import ConfigParser as configparser | |
config = configparser.ConfigParser() | |
config.read(path) | |
if not config.has_section('database'): | |
raise KeyError("[database] section missing in '{}'".format(path)) | |
if not config.has_option('database', 'path'): | |
raise KeyError("path does not exists in section database in '{}'".format(path)) | |
return config.get('database', 'path') | |
def mkparser(): | |
import argparse | |
p = argparse.ArgumentParser(description="tag email for notmuch") | |
p.add_argument('-c', '--config', help="path to notmuch config used to read database.path (defaults to ~/.notmuch-config)") | |
p.add_argument('-r', '--rules', help="load rules from this file (defaults to $dbpath/.notmuch/funcmail.rules)") | |
p.add_argument('-d', '--dbpath', metavar="DBPATH", help="use directory instead of path read from notmuch config") | |
p.add_argument('--input', metavar='FILE', help="read list of files from FILE (- is stdin)") | |
p.add_argument('-t', '--test', default=False, action="store_true", help="run built-in unit tests") | |
return p | |
def check(val, check_f, err_msg, ret_code=1): | |
if not check_f(val): | |
print(err_msg.format(value=val), file=sys.stderr) | |
sys.exit(ret_code) | |
def check_file_exist(val, err_msg = "No such file '{value}'"): | |
return check(val, | |
os.path.isfile, | |
err_msg) | |
def check_directory_exist(val, err_msg = "No such directory '{value}'"): | |
return check(val, | |
os.path.isdir, | |
err_msg) | |
def main(argv): | |
p = mkparser() | |
args = p.parse_args(argv) | |
if args.test: | |
sys.exit(test()) | |
list_f = None | |
dbpath = None | |
if args.input: | |
if args.input == "-": | |
list_f = lambda : list_files(sys.stdin) | |
else: | |
check_file_exist(args.input) | |
list_f = lambda : list_from_path(args.input) | |
if args.dbpath: | |
check_directory_exist(args.dbpath) | |
dbpath = args.dbpath | |
if not dbpath: | |
try: | |
if args.config: | |
check_file_exist(args.config) | |
dbpath = get_dbpath(args.config) | |
else: | |
dbpath = get_dbpath() | |
#XXX: ConfigParser is broken and all errors are derived from Exception, instead of StandardError | |
except Exception as err: | |
print("ERROR: " + str(err), file=sys.stderr) | |
sys.exit(2) | |
check_directory_exist(dbpath) | |
if not list_f: | |
list_f = lambda : list_path(dbpath) | |
if not args.rules: | |
rules = os.path.join(dbpath, ".notmuch", "funcmail.rules") | |
else: | |
rules = args.rules | |
check_file_exist(rules) | |
#print("DEBUG:\nargs.input: '{inp}'\ndbpath: '{dbpath}'\nrules: '{rules}'".format( | |
# inp=args.input, dbpath=dbpath, rules=rules)) | |
print_tags( | |
optimize_tags( | |
match_files( | |
load_rules_from_file(rules), | |
list_f())), | |
file=sys.stdout) | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment