Skip to content

Instantly share code, notes, and snippets.

@athoune
Last active August 28, 2018 22:20
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save athoune/5777474 to your computer and use it in GitHub Desktop.
Save athoune/5777474 to your computer and use it in GitHub Desktop.
Pushing mails to Elastic Search for a Kibana analysis.
#!/usr/bin/env python
import sys
# Lamson is an application, but also the best way to read email without
# struggling with "battery include" libraries.
from lamson.encoding import from_string as parse_mail
from pyelasticsearch import ElasticSearch
from pyelasticsearch.exceptions import ElasticHttpNotFoundError
def parse_date(txt):
"""Mails use this format :
Fri, 10 Feb 2012 08:48:52 +0100 (CET)
Elastic Search need this one :
2009-11-15T14:12:12
Just use a naive translation.
"""
MONTH = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
parts = txt.encode('ASCII').split(',', 2)[-1].strip().split(' ')
resp = '%(year)s-%(month)02d-%(day)02dT%(hms)s' % {
'year': parts[2],
'month': MONTH.index(parts[1]) + 1,
'day': int(parts[0]),
'hms': parts[3]}
return resp
def mbox(path):
"""My Thunderbird mail crash the email.mbox library.
Here is a violent way to read mbox format : yielding mail as text and
parsing them with lamson.
"""
buff = None
with open(path, 'r') as box:
for line in box:
if line.startswith('From '):
if buff is None:
buff = []
else:
yield parse_mail("".join(buff))
buff = []
buff.append(line)
yield parse_mail("".join(buff))
def bulk_iterate(collection, bulk_size):
"""Agnostic way for bulk iteration"""
stack = []
for item in collection:
stack.append(item)
if len(stack) >= bulk_size:
yield stack
stack = []
if len(stack) > 0:
yield stack
def documents_from_mails(mails):
"""Build document from mail"""
for mail in mails:
if 'Date' in mail.headers: # Some mails seem broken.
yield {
'@source': 'stuff://',
'@type': 'mailadmin',
'@tags': [mail.headers['From']],
'@fields': mail.headers,
'@timestamp': parse_date(mail.headers['Date']),
'@source_host': 'localhost',
'@source_path': 'mail/admin ',
'@message': mail.body,
'id': mail.headers['Message-Id']
}
if __name__ == '__main__':
# Instantiate it with an url
es = ElasticSearch(sys.argv[1])
# Kibana need this kind of name
NAME = 'logstash-2013.06.13'
try:
es.delete_index(NAME)
except ElasticHttpNotFoundError:
pass # Nobody cares
emails = mbox(sys.argv[2])
for n, docs in enumerate(bulk_iterate(documents_from_mails(emails), 100)):
es.bulk_index(NAME, 'mailadmin', docs)
print(n)
print es.refresh(NAME)
@Quix0r
Copy link

Quix0r commented Jun 18, 2017

This is not working with the official client (Debian package python-elasticsearch) and there is a static date 2013.06.13 in it. Maybe with fixing it

@Quix0r
Copy link

Quix0r commented Jun 18, 2017

Getting this:

    es = Elasticsearch(sys.argv[1])
IndexError: list index out of range

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment