Skip to content

Instantly share code, notes, and snippets.

@scuddalo
Last active May 19, 2019 18:00
Show Gist options
  • Save scuddalo/7a438e3dba768b8af57580dfdba2b9ff to your computer and use it in GitHub Desktop.
Save scuddalo/7a438e3dba768b8af57580dfdba2b9ff to your computer and use it in GitHub Desktop.
from dask import dataframe
from configs.config import Config
import dask.dataframe as dd
from dask.distributed import Client
import dateparser
class DataFormatStep:
def __init__(self, client: Client):
self.client = client
def run(self, datasetId: str, computationId: str, columnNames: list, inputDateFormats: list, outputDateFormat:str, **kwargs):
date_parser = lambda x: dateparser.parse(x, date_formats=inputDateFormats) #lambda x: pd.datetime.strptime(x, input_date_format)
def execute():
# TODO extract the s3 url from datasetId
s3_url = datasetId
df: dataframe = dd.read_csv(s3_url, parse_dates= columnNames, date_parser=date_parser)
# storage_options={'key': 'AKIARTX4LIRKAAQOGJ72', 'secret': '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'})
print(f"df.columns={df.columns.values}")
print(df.head())
output_url = s3_url + '-transformed'
df.to_csv(output_url, index=False, date_format=outputDateFormat, compute=False)[0].compute()
return {"computationId": computationId,
"response": {"datasetId": output_url, "passed":True}}
result = self.client.submit(execute).result()
print(f"result ={result}")
return result
import jsonschema
import json
from dask.distributed import Client
from dask import dataframe
from dataset import DataSet
from utils.datasetError import DatasetError
class SchemaValidator:
def __init__(self, client: Client):
self.client = client
def run(self, dataset_host:str, computationId: str, inputs:dict, createdBy:str, clientId: str,**kwargs):
def convert(org_value, types:list):
if not org_value:
return None
types = types.copy()
if "null" in types:
types.remove("null")
type_name = types.pop()
if type_name == 'string':
return org_value if isinstance(org_value, str) else str(org_value)
elif type_name == 'number':
return org_value if isinstance(org_value, float) else float(org_value)
elif type_name == 'integer':
return org_value if isinstance(org_value, int) else int(org_value)
elif type_name == 'object':
return org_value if isinstance(org_value, dict) else dict(org_value)
elif type_name == 'array':
return org_value if isinstance(org_value, list) else list(org_value)
elif type_name == 'boolean':
return org_value if isinstance(org_value, bool) else bool(org_value)
elif type_name == 'null':
return None
else:
raise Exception('Not supported Type')
def execute():
dataset_id: str = inputs.get('datasetId')
namespace: str = inputs.get('namespace')
ds = DataSet.load(dataset_id=dataset_id, dataset_host=dataset_host,
created_by=createdBy, client_id=clientId)
df: dataframe = ds.load_dataframe()
print(f"df#columns={df.columns.values}")
properties = ds.json_schema.get('properties')
for index, row in df.iterrows():
row_json = json.loads(row.to_json())
# print(f"row data={row_json}")
perform_schema_validation = True
for k,v in properties.items():
# print(f"key={k}, type={v.get('type')} row_value={row_json.get(k)}")
try:
row_json[k] = convert(row_json.get(k), (v.get('type') if isinstance(v.get('type'), list) else [v.get('type')]))
except Exception as e:
# perform_schema_validation = False
# print(f'TYPE CONVERSION ERROR %%%%% Validation ERROR REPORTING ROW number={row_json.get(DataSet.ROW_NUMBER_KEY)}, row data={row_json}, Reason={str(e)}')
#e.with_traceback()
break
# print(f"AFTER Converstion={row_json} schema={ds.json_schema}")
if perform_schema_validation:
try:
jsonschema.validate(instance=row_json, schema=ds.json_schema)
except jsonschema.exceptions.ValidationError as e:
column_name = e.path.pop()
event = {"id":dataset_id, "pipelineId":namespace,
"rowNumber": row_json.get(DataSet.ROW_NUMBER_KEY), "clientId": clientId,
"errors": [{"errorCode":"Add one here", "columnName": column_name,
"errorMessage": e.args[0]}]}
# error table already exists in dynamoDB??
# print(f"e.message={str(e.message)}")
# print(f"e.context={e.context}")
# print(f"e.absolute_path={e.absolute_path}")
# print(e.absolute_schema_path)
# print(e.args[0])
# print(e.cause)
# print(f"e.instance={e.instance}")
# print(f'column={column_name}')
# print(e.validator)
# print(e.validator_value)
# print(e.parent)
# print(e.__str__)
# print(f'SCHEMA ERROR %%%%%Validation ERROR REPORTING ROW number={row_json.get(DataSet.ROW_NUMBER_KEY)}, row data={row_json}, Reason={str(e)}')
DatasetError(context=event, create=True)
return {"computationId": computationId,
"response": {"datasetId": "output_url", "passed":True,
"dsDetails": ds.serialize()}}
result = self.client.submit(execute).result()
print(f"result ={result}")
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment