Skip to content

Instantly share code, notes, and snippets.

@wesyoung
Last active March 16, 2019 15:05
Show Gist options
  • Save wesyoung/ee2f8fdd585fcbb9296e80d429e1643a to your computer and use it in GitHub Desktop.
Save wesyoung/ee2f8fdd585fcbb9296e80d429e1643a to your computer and use it in GitHub Desktop.
elastic spam
#!/usr/bin/env python3
"""
# docker containers are your friend!
$ pip install arrow chardet elasticsearch-dsl
$ cat test.eml | python3 spam_es.py
"""
"""
procmail
:0 c
| ~/.virtualenvs/spam/bin/python ~/bin/es.py
"""
from pprint import pprint
from datetime import datetime
import os, sys
import chardet
import arrow
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Document, Date, Index
from elasticsearch_dsl.connections import get_connection, create_connection
NODES = os.getenv('ES_NODES', '127.0.0.1:9200')
NODES = NODES.split(',')
class Email(Document):
class Index:
name = 'email-spam-*'
doc_type = 'email'
class Meta:
doc_type = 'email'
def _current_index():
dt = datetime.utcnow()
dt = dt.strftime('%Y.%m')
idx = '{}-{}'.format('email-spam', dt)
return idx
def _create_index():
idx = _current_index()
create_connection(hosts=NODES)
if not get_connection().indices.exists(idx):
index = Index(idx)
index.document(Email)
index.create()
get_connection().indices.flush(idx)
return idx
def main():
es = Elasticsearch(hosts=NODES, timeout=120, max_retries=10,
retry_on_timeout=True)
m = sys.stdin.buffer.read()
# parse email message
if m == b'':
print('missing email..')
raise SystemExit
try:
encode = chardet.detect(m)
doc = dict()
doc['message'] = m.decode(encode['encoding'])
doc['encoding'] = encode
doc['@timestamp'] = arrow.utcnow().isoformat('T')
_create_index()
es.index(index=_current_index(), doc_type='email', body=doc)
except Exception as e:
print(e)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment