Skip to content

Instantly share code, notes, and snippets.

@mikewaters
Created September 8, 2011 21:11
Show Gist options
  • Save mikewaters/1204741 to your computer and use it in GitHub Desktop.
Save mikewaters/1204741 to your computer and use it in GitHub Desktop.
Custom nodes
# My Custom Nodes
#
# in nodes/field_nodes.py
class FunctionSubstituteNode(base.Node):
"""Manipulate a field using a function.
This is a simpler version of DeriveNode(); a single field is passed in
rather than the entire record.
"""
__node_info__ = {
"type": "field",
"label" : "Function Substitute",
"description" : "Modify a field using a given function.",
"attributes" : [
{
"name": "field",
"label": "modified field",
"description": "Field containing a string or text value where substition will "
"be applied"
},
{
"name": "func",
"label": "function",
"description": "Modifier function."
}
]
}
def __init__(self, field=None, func=None):
"""Modifies a node.
It is up to the caller to make sure that the function doesn't raise an Exception. ?
:Attributes:
* `field`: field to be used for substitution (should contain a string)
* `func`: some callable that will manipulate the data in `field`.
"""
super(FunctionSubstituteNode, self).__init__()
self.field = field
self.func = func
def run(self):
pipe = self.input
index = self.input_fields.index(self.field)
for row in pipe.rows():
value = self.func(row[index])
row[index] = value
self.put(row)
# in nodes/target_nodes.py
class MongoDBTargetNode(base.TargetNode):
"""Feed data rows into a MongoDB collection.
"""
__node_info__ = {
"label": "MongoDB Target",
"icon": "sql_table_target",
"description" : "Feed data rows into a schemaless mongodb database",
"attributes" : [
{
"name": "collection",
"description": "mongo collection name"
},
{
"name": "database_name",
"description": "database name",
},
{
"name": "host",
"description": " mongo database server host, default is ``localhost``"
},
{
"name": "port",
"description": "mongo port, default is ``27017``"
},
{
"name": "expand",
"description": "expand dictionary values and treat children as top-level keys "
"with dot '.' separated key"
},
{
"name": "truncate",
"description": "delete existing data in the collection. Default: False"
},
{
"name": "mongo_args",
"description": "custom args passed to mongodb instance (eg. *args, **kwargs)"
"Optional parameters (from pymongo docs):"
"max_pool_size (optional): The maximum size limit for the connection pool."
"network_timeout (optional): timeout (in seconds) to use for socket "
"operations - default is no timeout"
"document_class (optional): default class to use for documents returned "
"from queries on this connection"
"tz_aware (optional): if True, datetime instances returned as values in a "
"document by this Connection will be timezone aware (otherwise they will be naive)"
"slave_okay or slaveok: Is it OK to perform queries if this connection is "
"to a secondary?"
"safe: Use getlasterror for each write operation?"
"j or journal: Block until write operations have been commited to the journal. "
"Ignored if the server is running without journaling. Implies safe=True."
"w: If this is a replica set the server won't return until write operations"
"have replicated to this many set members. Implies safe=True."
"wtimeout: Used in conjunction with j and/or w. Wait this many milliseconds"
"for journal acknowledgement and/or write replication. Implies safe=True."
"fsync: Force the database to fsync all files before returning When used "
"with j the server awaits the next group commit before returning.Implies safe=True."
"replicaset: The name of the replica set to connect to. "
"The driver will verify that the replica set it connects to matches this name. "
"Implies that the hosts specified are a seed list and the driver should "
"attempt to find all members of the set."
},
]
}
def __init__(self, collection, **kwargs):
super(MongoDBTargetNode, self).__init__()
self.kwargs = kwargs
self.collection = collection
self.stream = None
def initialize(self):
self.stream = ds.MongoDBDataTarget(self.collection, **self.kwargs)
self.stream.fields = self.input_fields
self.stream.initialize()
def run(self):
for row in self.input.rows():
self.stream.append(row)
######################################################
# The main body of code
# import.py
import util
class MailerDataProcessor(object):
""" Do the work that needs to be done. yeah."""
def __init__(self, source_csv, field_list, database, collection, **repo_args):
# Prepare nodes
nodes = {
"source": CSVSourceNode(source_csv, read_header=False),
"complete": RecordCompleteNode(),
"strip": StringStripNode(),
"validate": FunctionSelectNode(),
"lcase_email": FunctionSubstituteNode(),
"fmt_gender": FunctionSubstituteNode(),
"fmt_optin_ts": FunctionSubstituteNode(),
"fmt_dob": FunctionSubstituteNode(),
"fmt_statecd": FunctionSubstituteNode(),
"mongo": MongoDBTargetNode(collection, database=database),
}
#
# Configure nodes
nodes['source'].fields = metadata.FieldList(field_list)
nodes['complete'].reqd_fields = ['email', 'first', 'last', 'state', 'gender',
'dob', 'optin_ts', 'optin_ip', 'optin_src']
# "strip"
nodes['strip'].fields = ["email", "state", "gender", "dob"]
# "validate" - only email for now
nodes['validate'].function = email_vrfy
nodes['validate'].fields = ["email"]
# "lcase_email"
nodes["lcase_email"].field = "email"
nodes["lcase_email"].func = unicode.lower
# "fmt_dob" + mongo workaround
nodes['fmt_dob'].field = 'dob'
nodes['fmt_dob'].func = parse_timestamp
# "fmt_optin_ts"
nodes['fmt_optin_ts'].field = "optin_ts"
nodes['fmt_optin_ts'].func = parse_timestamp
# "fmt_gender"
nodes['fmt_gender'].field = "gender"
nodes['fmt_gender'].func = parse_gender
# "fmt_statecd"
nodes['fmt_statecd'].field = "state"
nodes['fmt_statecd'].func = parse_statecd
self.nodes = nodes
self.connections = [("source", "complete"), ("complete", "strip"),
("strip", "validate"), ("validate", "lcase_email"),
("lcase_email", "fmt_gender"), ("fmt_gender", "fmt_optin_ts"),
("fmt_optin_ts", "fmt_dob"), ("fmt_dob", "fmt_statecd"),
("fmt_statecd", "mongo")]
def process(self):
stream = Stream(self.nodes, self.connections)
stream.run()
if __name__ == "__main__":
field_list = [("email", "string"), ("first", "string"), ('last', 'string'),
('address', 'string'), ('city', 'string'), ('state', 'string'),
('zip', 'string'), ('tel', 'string'), ("dob", "date"),
('gender', 'string'), ('optin_ip', 'string'), ('optin_ts', 'time'),
("optin_src", "string")]
#logging.basicConfig(level=logging.DEBUG)
processor = MailerDataProcessor('test/bulktest2.csv', field_list, 'testdb', 'bulktest')
processor.process()
# Functions used in the FunctionSubstituteNodes
# util.py
#
#!/usr/bin/env python
"""Utility functions for import module"""
import dateutil.parser
from datetime import datetime, date
from lepl.apps.rfc3696 import Email
import re
import statestyle
statecd_re = re.compile(r'^[A-Za-z]{2}$')
def parse_statecd(statecd):
"""Normalize a state code, if any, or convert a state name to 2-digit code."""
'''
if len(statecd) > 0:
postal = statecd_re.match(statecd)
if postal is None:
try:
stateobj = statestyle.get()
return stateobj.postal
except:
return None
else:
return unicode.upper(statecd)
else:
return None
'''
return None
def parse_gender(gender):
"""Determine what the gender is, and convert it to 'M/F/Null' as appropriate."""
try:
if gender.startswith(('M', 'm')):
return u'M'
elif gender.startswith(('F', 'f')):
return u'F'
else:
return None
except:
return None
def parse_date(dt):
"""Try to coerce a date string into one format; we like YYYY-MM-DD.
returns a `date` object.
"""
if len(dt) == 0: # this will raise an exception if dt is not a string
return None
try:
dt = dateutil.parser.parse(dt)
except ValueError:
return None
return date.fromordinal(datetime.toordinal(dt))
def parse_timestamp(ts):
"""Try to coerce a date/time string into unix timestamp.
returns a `datetime` object
"""
if len(ts) == 0: # this will raise an exception if dt is not a string
return None
try:
return dateutil.parser.parse(ts)
except ValueError:
return None
def lcase(val):
try:
return val.lower()
except AttributeError: # e.g. None, objects etc.
return val # right back at ya, motherfucker!
email_vrfy = Email()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment