Skip to content

Instantly share code, notes, and snippets.

@pascalwhoop
Created October 20, 2019 14:30
Show Gist options
  • Save pascalwhoop/0ba7ea5f54f90cd8e0ca00215b06958d to your computer and use it in GitHub Desktop.
Save pascalwhoop/0ba7ea5f54f90cd8e0ca00215b06958d to your computer and use it in GitHub Desktop.
def run():
with beam.Pipeline(options=options) as p:
raw_values = (
p
| "ReadTable" >> beam.io.Read(source)
| "cleanup" >> beam.ParDo(ElementCleanup())
| "writeTable" >> beam.io.Write(target)
)
# pipeline
# parDo for all values in PCollection: process
# each element: define a target datatype and a set of cleanup functions for each
class ElementCleanup(beam.DoFn):
"""
tasker uses the %VAR_NAME syntax to construct JSON. Sometimes, values aren't replaced. In these cases, the string starts with a "%". If this is the case, simply replace it with a None
"""
def __init__(self):
self.transforms = self.make_transform_map()
def make_transform_map(self):
return {
"battery_status": [self.trim, self.percent_cleaner, self.to_int],
"bluetooth_status": [self.trim, self.percent_cleaner, ],
"cell_id": [self.trim, self.percent_cleaner, ],
"cell_strength": [self.trim, self.percent_cleaner, self.to_int],
"gps_status": [self.trim, self.percent_cleaner, ],
"last_app": [self.trim, self.percent_cleaner, ],
"location_gps": [self.trim, self.percent_cleaner, ], #keeping encoding as "LAT,LON" as data studio likes this
"location_net": [self.trim, self.percent_cleaner, ], #keeping encoding as "LAT,LON" as data studio likes this
"location_accuracy": [self.trim, self.percent_cleaner, self.to_float],
"altitude": [self.trim, self.percent_cleaner, self.to_float],
"speed": [self.trim, self.percent_cleaner, self.to_float],
"location_seconds": [self.trim, self.percent_cleaner, ],
"timestamp": [self.trim, self.percent_cleaner, self.to_int],
}
def process(self, row):
#process receives the object and (must) return an iterable (in case of breaking objects up into several)
return [self.handle_row(row, self.transforms)]
def handle_row(self, row, transforms):
fixed = {}
for key in row.keys():
val = row[key]
for func in transforms[key]:
val = func(val)
fixed[key] = val
return fixed
def percent_cleaner(self, value: str):
if isinstance(value, str) and value.startswith("%"):
return None
else:
return value
def trim(self, val:str):
return val.strip()
def to_int(self, val: str):
return (int(val) if val != None else None)
def to_float(self, val: str):
return (float(val) if val != None else None)
if __name__ == "__main__":
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment