-
-
Save hute37/0d2f42c8cc898cf7a2c46b1888b3a160 to your computer and use it in GitHub Desktop.
My Pig script and Python Streaming Stuff
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/opt/local/bin/python | |
import imaplib | |
import sys, signal | |
from avro import schema, datafile, io | |
import os, re | |
import email | |
import inspect, pprint | |
import getopt | |
import time | |
from lepl.apps.rfc3696 import Email | |
class EmailUtils(object): | |
def __init__(self): | |
"""This class contains utilities for parsing and extracting structure from raw UTF-8 encoded emails""" | |
self.is_email = Email() | |
def strip_brackets(self, message_id): | |
return str(message_id).strip('<>') | |
def parse_date(self, date_string): | |
tuple_time = email.utils.parsedate(date_string) | |
iso_time = time.strftime("%Y-%m-%dT%H:%M:%S", tuple_time) | |
return iso_time | |
def get_charset(self, raw_email): | |
if(type(raw_email)) is str: | |
raw_email = email.message_from_string(raw_email) | |
else: | |
raw_email = raw_email | |
charset = None | |
for c in raw_email.get_charsets(): | |
if c != None: | |
charset = c | |
break | |
return charset | |
# '1011 (X-GM-THRID 1292412648635976421 RFC822 {6499}' --> 1292412648635976421 | |
def get_thread_id(self, thread_string): | |
p = re.compile('\d+ \(X-GM-THRID (.+) RFC822.*') | |
m = p.match(thread_string) | |
return m.group(1) | |
def parse_addrs(self, addr_string): | |
if(addr_string): | |
addresses = email.utils.getaddresses([addr_string]) | |
validated = [] | |
for address in addresses: | |
address_pair = {'real_name': None, 'address': None} | |
if address[0]: | |
address_pair['real_name'] = address[0] | |
if self.is_email(address[1]): | |
address_pair['address'] = address[1] | |
if not address[0] and not self.is_email(address[1]): | |
pass | |
else: | |
validated.append(address_pair) | |
if(len(validated) == 0): | |
validated = None | |
return validated | |
def process_email(self, msg, thread_id): | |
subject = msg['Subject'] | |
body = get_body(msg) | |
# Without handling charsets, corrupt avros will get written | |
charsets = msg.get_charsets() | |
charset = None | |
for c in charsets: | |
if c != None: | |
charset = c | |
break | |
if charset: | |
subject = subject.decode(charset)#.encode('utf-8') | |
body = body.decode(charset)#.encode('utf-8') | |
else: | |
return {}, charset | |
avro_parts = { | |
'message_id': strip_brackets(msg['Message-ID']), | |
'thread_id': get_thread_id(thread_id), | |
'in_reply_to': strip_brackets(msg['In-Reply-To']), | |
'subject': subject, | |
'date': parse_date(msg['Date']), | |
'body': body, | |
'froms': parse_addrs(msg['From']), | |
'tos': parse_addrs(msg['To']), | |
'ccs': parse_addrs(msg['Cc']), | |
'bccs': parse_addrs(msg['Bcc']), | |
'reply_tos': parse_addrs(msg['Reply-To']) | |
} | |
return avro_parts, charset | |
def get_body(self, msg): | |
body = '' | |
if msg: | |
for part in msg.walk(): | |
if part.get_content_type() == 'text/plain': | |
body += part.get_payload() | |
return body | |
#if not avro_parts.has_key('froms'): | |
# return 'FROM', {}, charset | |
#msg = email.message_from_string(raw_email) | |
#avro_parts, charset = process_email(msg, thread_id) | |
def main(): | |
for line in sys.stdin: | |
(thread_id, message_raw) = line.strip().split() | |
print "Message ID: %s" % (thread_id) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Load Avro jars */ | |
register /me/pig/build/ivy/lib/Pig/avro-1.5.3.jar | |
register /me/pig/build/ivy/lib/Pig/json-simple-1.1.jar | |
register /me/pig/build/ivy/lib/Pig/joda-time-1.6.jar | |
/* Piggybank */ | |
register /me/pig/contrib/piggybank/java/piggybank.jar | |
/* MongoDB */ | |
register /me/mongo-hadoop/mongo-2.7.3.jar | |
register /me/mongo-hadoop/core/target/mongo-hadoop-core-1.0.0.jar | |
register /me/mongo-hadoop/pig/target/mongo-hadoop-pig-1.0.0.jar | |
/* Set speculative execution for mappers and reducers off, or MongoDB will get duplicate records */ | |
set mapred.map.tasks.speculative.execution false | |
set mapred.reduce.tasks.speculative.execution false | |
/* Define shortform functions for convenience */ | |
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage(); | |
define MongoStorage com.mongodb.hadoop.pig.MongoStorage(); | |
/* Load the emails */ | |
emails = load '/me/tmp/small_inbox' using AvroStorage(); | |
define email_utils `email_utils.py` ship('email_utils.py'); | |
email_parts = stream emails through email_utils AS (message_id:chararray); | |
rmf /tmp/test_mail | |
store email_parts into '/tmp/test_mail'; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment