Created
September 8, 2011 21:11
-
-
Save mikewaters/1204741 to your computer and use it in GitHub Desktop.
Custom nodes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# My Custom Nodes | |
# | |
# in nodes/field_nodes.py | |
class FunctionSubstituteNode(base.Node): | |
"""Manipulate a field using a function. | |
This is a simpler version of DeriveNode(); a single field is passed in | |
rather than the entire record. | |
""" | |
__node_info__ = { | |
"type": "field", | |
"label" : "Function Substitute", | |
"description" : "Modify a field using a given function.", | |
"attributes" : [ | |
{ | |
"name": "field", | |
"label": "modified field", | |
"description": "Field containing a string or text value where substition will " | |
"be applied" | |
}, | |
{ | |
"name": "func", | |
"label": "function", | |
"description": "Modifier function." | |
} | |
] | |
} | |
def __init__(self, field=None, func=None): | |
"""Modifies a node. | |
It is up to the caller to make sure that the function doesn't raise an Exception. ? | |
:Attributes: | |
* `field`: field to be used for substitution (should contain a string) | |
* `func`: some callable that will manipulate the data in `field`. | |
""" | |
super(FunctionSubstituteNode, self).__init__() | |
self.field = field | |
self.func = func | |
def run(self): | |
pipe = self.input | |
index = self.input_fields.index(self.field) | |
for row in pipe.rows(): | |
value = self.func(row[index]) | |
row[index] = value | |
self.put(row) | |
# in nodes/target_nodes.py | |
class MongoDBTargetNode(base.TargetNode): | |
"""Feed data rows into a MongoDB collection. | |
""" | |
__node_info__ = { | |
"label": "MongoDB Target", | |
"icon": "sql_table_target", | |
"description" : "Feed data rows into a schemaless mongodb database", | |
"attributes" : [ | |
{ | |
"name": "collection", | |
"description": "mongo collection name" | |
}, | |
{ | |
"name": "database_name", | |
"description": "database name", | |
}, | |
{ | |
"name": "host", | |
"description": " mongo database server host, default is ``localhost``" | |
}, | |
{ | |
"name": "port", | |
"description": "mongo port, default is ``27017``" | |
}, | |
{ | |
"name": "expand", | |
"description": "expand dictionary values and treat children as top-level keys " | |
"with dot '.' separated key" | |
}, | |
{ | |
"name": "truncate", | |
"description": "delete existing data in the collection. Default: False" | |
}, | |
{ | |
"name": "mongo_args", | |
"description": "custom args passed to mongodb instance (eg. *args, **kwargs)" | |
"Optional parameters (from pymongo docs):" | |
"max_pool_size (optional): The maximum size limit for the connection pool." | |
"network_timeout (optional): timeout (in seconds) to use for socket " | |
"operations - default is no timeout" | |
"document_class (optional): default class to use for documents returned " | |
"from queries on this connection" | |
"tz_aware (optional): if True, datetime instances returned as values in a " | |
"document by this Connection will be timezone aware (otherwise they will be naive)" | |
"slave_okay or slaveok: Is it OK to perform queries if this connection is " | |
"to a secondary?" | |
"safe: Use getlasterror for each write operation?" | |
"j or journal: Block until write operations have been commited to the journal. " | |
"Ignored if the server is running without journaling. Implies safe=True." | |
"w: If this is a replica set the server won't return until write operations" | |
"have replicated to this many set members. Implies safe=True." | |
"wtimeout: Used in conjunction with j and/or w. Wait this many milliseconds" | |
"for journal acknowledgement and/or write replication. Implies safe=True." | |
"fsync: Force the database to fsync all files before returning When used " | |
"with j the server awaits the next group commit before returning.Implies safe=True." | |
"replicaset: The name of the replica set to connect to. " | |
"The driver will verify that the replica set it connects to matches this name. " | |
"Implies that the hosts specified are a seed list and the driver should " | |
"attempt to find all members of the set." | |
}, | |
] | |
} | |
def __init__(self, collection, **kwargs): | |
super(MongoDBTargetNode, self).__init__() | |
self.kwargs = kwargs | |
self.collection = collection | |
self.stream = None | |
def initialize(self): | |
self.stream = ds.MongoDBDataTarget(self.collection, **self.kwargs) | |
self.stream.fields = self.input_fields | |
self.stream.initialize() | |
def run(self): | |
for row in self.input.rows(): | |
self.stream.append(row) | |
###################################################### | |
# The main body of code | |
# import.py | |
import util | |
class MailerDataProcessor(object): | |
""" Do the work that needs to be done. yeah.""" | |
def __init__(self, source_csv, field_list, database, collection, **repo_args): | |
# Prepare nodes | |
nodes = { | |
"source": CSVSourceNode(source_csv, read_header=False), | |
"complete": RecordCompleteNode(), | |
"strip": StringStripNode(), | |
"validate": FunctionSelectNode(), | |
"lcase_email": FunctionSubstituteNode(), | |
"fmt_gender": FunctionSubstituteNode(), | |
"fmt_optin_ts": FunctionSubstituteNode(), | |
"fmt_dob": FunctionSubstituteNode(), | |
"fmt_statecd": FunctionSubstituteNode(), | |
"mongo": MongoDBTargetNode(collection, database=database), | |
} | |
# | |
# Configure nodes | |
nodes['source'].fields = metadata.FieldList(field_list) | |
nodes['complete'].reqd_fields = ['email', 'first', 'last', 'state', 'gender', | |
'dob', 'optin_ts', 'optin_ip', 'optin_src'] | |
# "strip" | |
nodes['strip'].fields = ["email", "state", "gender", "dob"] | |
# "validate" - only email for now | |
nodes['validate'].function = email_vrfy | |
nodes['validate'].fields = ["email"] | |
# "lcase_email" | |
nodes["lcase_email"].field = "email" | |
nodes["lcase_email"].func = unicode.lower | |
# "fmt_dob" + mongo workaround | |
nodes['fmt_dob'].field = 'dob' | |
nodes['fmt_dob'].func = parse_timestamp | |
# "fmt_optin_ts" | |
nodes['fmt_optin_ts'].field = "optin_ts" | |
nodes['fmt_optin_ts'].func = parse_timestamp | |
# "fmt_gender" | |
nodes['fmt_gender'].field = "gender" | |
nodes['fmt_gender'].func = parse_gender | |
# "fmt_statecd" | |
nodes['fmt_statecd'].field = "state" | |
nodes['fmt_statecd'].func = parse_statecd | |
self.nodes = nodes | |
self.connections = [("source", "complete"), ("complete", "strip"), | |
("strip", "validate"), ("validate", "lcase_email"), | |
("lcase_email", "fmt_gender"), ("fmt_gender", "fmt_optin_ts"), | |
("fmt_optin_ts", "fmt_dob"), ("fmt_dob", "fmt_statecd"), | |
("fmt_statecd", "mongo")] | |
def process(self): | |
stream = Stream(self.nodes, self.connections) | |
stream.run() | |
if __name__ == "__main__": | |
field_list = [("email", "string"), ("first", "string"), ('last', 'string'), | |
('address', 'string'), ('city', 'string'), ('state', 'string'), | |
('zip', 'string'), ('tel', 'string'), ("dob", "date"), | |
('gender', 'string'), ('optin_ip', 'string'), ('optin_ts', 'time'), | |
("optin_src", "string")] | |
#logging.basicConfig(level=logging.DEBUG) | |
processor = MailerDataProcessor('test/bulktest2.csv', field_list, 'testdb', 'bulktest') | |
processor.process() | |
# Functions used in the FunctionSubstituteNodes | |
# util.py | |
# | |
#!/usr/bin/env python | |
"""Utility functions for import module""" | |
import dateutil.parser | |
from datetime import datetime, date | |
from lepl.apps.rfc3696 import Email | |
import re | |
import statestyle | |
statecd_re = re.compile(r'^[A-Za-z]{2}$') | |
def parse_statecd(statecd): | |
"""Normalize a state code, if any, or convert a state name to 2-digit code.""" | |
''' | |
if len(statecd) > 0: | |
postal = statecd_re.match(statecd) | |
if postal is None: | |
try: | |
stateobj = statestyle.get() | |
return stateobj.postal | |
except: | |
return None | |
else: | |
return unicode.upper(statecd) | |
else: | |
return None | |
''' | |
return None | |
def parse_gender(gender): | |
"""Determine what the gender is, and convert it to 'M/F/Null' as appropriate.""" | |
try: | |
if gender.startswith(('M', 'm')): | |
return u'M' | |
elif gender.startswith(('F', 'f')): | |
return u'F' | |
else: | |
return None | |
except: | |
return None | |
def parse_date(dt): | |
"""Try to coerce a date string into one format; we like YYYY-MM-DD. | |
returns a `date` object. | |
""" | |
if len(dt) == 0: # this will raise an exception if dt is not a string | |
return None | |
try: | |
dt = dateutil.parser.parse(dt) | |
except ValueError: | |
return None | |
return date.fromordinal(datetime.toordinal(dt)) | |
def parse_timestamp(ts): | |
"""Try to coerce a date/time string into unix timestamp. | |
returns a `datetime` object | |
""" | |
if len(ts) == 0: # this will raise an exception if dt is not a string | |
return None | |
try: | |
return dateutil.parser.parse(ts) | |
except ValueError: | |
return None | |
def lcase(val): | |
try: | |
return val.lower() | |
except AttributeError: # e.g. None, objects etc. | |
return val # right back at ya, motherfucker! | |
email_vrfy = Email() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment