Created
September 9, 2011 08:11
-
-
Save Stiivi/1205736 to your computer and use it in GitHub Desktop.
Brewery leak
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import brewery.streams | |
import brewery.nodes | |
import time | |
class FunctionSubstituteNode(brewery.nodes.Node): | |
"""Manipulate a field using a function. | |
This is a simpler version of DeriveNode(); a single field is passed in | |
rather than the entire record. | |
""" | |
__node_info__ = { | |
"type": "field", | |
"label" : "Function Substitute", | |
"description" : "Modify a field using a given function.", | |
"attributes" : [ | |
{ | |
"name": "field", | |
"label": "modified field", | |
"description": "Field containing a string or text value where substition will " | |
"be applied" | |
}, | |
{ | |
"name": "func", | |
"label": "function", | |
"description": "Modifier function." | |
} | |
] | |
} | |
def __init__(self, field=None, func=None): | |
"""Modifies a node. | |
It is up to the caller to make sure that the function doesn't raise an Exception. ? | |
:Attributes: | |
* `field`: field to be used for substitution (should contain a string) | |
* `func`: some callable that will manipulate the data in `field`. | |
""" | |
super(FunctionSubstituteNode, self).__init__() | |
self.field = field | |
self.func = func | |
def run(self): | |
pipe = self.input | |
index = self.input_fields.index(self.field) | |
for row in pipe.rows(): | |
value = self.func(row[index]) | |
row[index] = value | |
self.put(row) | |
def lcase(val): | |
try: | |
return val.lower() | |
except AttributeError: # e.g. None, objects etc. | |
return val # right back at ya, motherfucker! | |
def sleep(val): | |
time.sleep(0.001) | |
return val | |
stream = brewery.streams.Stream() | |
trunk = stream.fork() | |
trunk.csv_source('source.csv', read_header=True) | |
trunk.string_strip(fields = ['id']) | |
trunk.string_strip(fields = ['f2']) | |
trunk.string_strip(fields = ['f3']) | |
trunk.string_strip(fields = ['f4']) | |
trunk.string_strip(fields = ['f4']) | |
trunk.string_strip(fields = ['f4']) | |
trunk.string_strip(fields = ['f4']) | |
trunk.function_substitute(field='f1', func=lcase) | |
trunk.function_substitute(field='f2', func=lcase) | |
trunk.function_substitute(field='f3', func=lcase) | |
trunk.function_substitute(field='f4', func=lcase) | |
trunk.function_substitute(field='f1', func=sleep) | |
trunk.csv_target('out.csv') | |
stream.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment