Skip to content

Instantly share code, notes, and snippets.

@Stiivi
Created September 9, 2011 08:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Stiivi/1205736 to your computer and use it in GitHub Desktop.
Save Stiivi/1205736 to your computer and use it in GitHub Desktop.
Brewery leak
import brewery.streams
import brewery.nodes
import time
class FunctionSubstituteNode(brewery.nodes.Node):
"""Manipulate a field using a function.
This is a simpler version of DeriveNode(); a single field is passed in
rather than the entire record.
"""
__node_info__ = {
"type": "field",
"label" : "Function Substitute",
"description" : "Modify a field using a given function.",
"attributes" : [
{
"name": "field",
"label": "modified field",
"description": "Field containing a string or text value where substition will "
"be applied"
},
{
"name": "func",
"label": "function",
"description": "Modifier function."
}
]
}
def __init__(self, field=None, func=None):
"""Modifies a node.
It is up to the caller to make sure that the function doesn't raise an Exception. ?
:Attributes:
* `field`: field to be used for substitution (should contain a string)
* `func`: some callable that will manipulate the data in `field`.
"""
super(FunctionSubstituteNode, self).__init__()
self.field = field
self.func = func
def run(self):
pipe = self.input
index = self.input_fields.index(self.field)
for row in pipe.rows():
value = self.func(row[index])
row[index] = value
self.put(row)
def lcase(val):
try:
return val.lower()
except AttributeError: # e.g. None, objects etc.
return val # right back at ya, motherfucker!
def sleep(val):
time.sleep(0.001)
return val
stream = brewery.streams.Stream()
trunk = stream.fork()
trunk.csv_source('source.csv', read_header=True)
trunk.string_strip(fields = ['id'])
trunk.string_strip(fields = ['f2'])
trunk.string_strip(fields = ['f3'])
trunk.string_strip(fields = ['f4'])
trunk.string_strip(fields = ['f4'])
trunk.string_strip(fields = ['f4'])
trunk.string_strip(fields = ['f4'])
trunk.function_substitute(field='f1', func=lcase)
trunk.function_substitute(field='f2', func=lcase)
trunk.function_substitute(field='f3', func=lcase)
trunk.function_substitute(field='f4', func=lcase)
trunk.function_substitute(field='f1', func=sleep)
trunk.csv_target('out.csv')
stream.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment