Last active
May 19, 2019 18:00
-
-
Save scuddalo/7a438e3dba768b8af57580dfdba2b9ff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dask import dataframe | |
from configs.config import Config | |
import dask.dataframe as dd | |
from dask.distributed import Client | |
import dateparser | |
class DataFormatStep: | |
def __init__(self, client: Client): | |
self.client = client | |
def run(self, datasetId: str, computationId: str, columnNames: list, inputDateFormats: list, outputDateFormat:str, **kwargs): | |
date_parser = lambda x: dateparser.parse(x, date_formats=inputDateFormats) #lambda x: pd.datetime.strptime(x, input_date_format) | |
def execute(): | |
# TODO extract the s3 url from datasetId | |
s3_url = datasetId | |
df: dataframe = dd.read_csv(s3_url, parse_dates= columnNames, date_parser=date_parser) | |
# storage_options={'key': 'AKIARTX4LIRKAAQOGJ72', 'secret': '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'}) | |
print(f"df.columns={df.columns.values}") | |
print(df.head()) | |
output_url = s3_url + '-transformed' | |
df.to_csv(output_url, index=False, date_format=outputDateFormat, compute=False)[0].compute() | |
return {"computationId": computationId, | |
"response": {"datasetId": output_url, "passed":True}} | |
result = self.client.submit(execute).result() | |
print(f"result ={result}") | |
return result | |
import jsonschema | |
import json | |
from dask.distributed import Client | |
from dask import dataframe | |
from dataset import DataSet | |
from utils.datasetError import DatasetError | |
class SchemaValidator: | |
def __init__(self, client: Client): | |
self.client = client | |
def run(self, dataset_host:str, computationId: str, inputs:dict, createdBy:str, clientId: str,**kwargs): | |
def convert(org_value, types:list): | |
if not org_value: | |
return None | |
types = types.copy() | |
if "null" in types: | |
types.remove("null") | |
type_name = types.pop() | |
if type_name == 'string': | |
return org_value if isinstance(org_value, str) else str(org_value) | |
elif type_name == 'number': | |
return org_value if isinstance(org_value, float) else float(org_value) | |
elif type_name == 'integer': | |
return org_value if isinstance(org_value, int) else int(org_value) | |
elif type_name == 'object': | |
return org_value if isinstance(org_value, dict) else dict(org_value) | |
elif type_name == 'array': | |
return org_value if isinstance(org_value, list) else list(org_value) | |
elif type_name == 'boolean': | |
return org_value if isinstance(org_value, bool) else bool(org_value) | |
elif type_name == 'null': | |
return None | |
else: | |
raise Exception('Not supported Type') | |
def execute(): | |
dataset_id: str = inputs.get('datasetId') | |
namespace: str = inputs.get('namespace') | |
ds = DataSet.load(dataset_id=dataset_id, dataset_host=dataset_host, | |
created_by=createdBy, client_id=clientId) | |
df: dataframe = ds.load_dataframe() | |
print(f"df#columns={df.columns.values}") | |
properties = ds.json_schema.get('properties') | |
for index, row in df.iterrows(): | |
row_json = json.loads(row.to_json()) | |
# print(f"row data={row_json}") | |
perform_schema_validation = True | |
for k,v in properties.items(): | |
# print(f"key={k}, type={v.get('type')} row_value={row_json.get(k)}") | |
try: | |
row_json[k] = convert(row_json.get(k), (v.get('type') if isinstance(v.get('type'), list) else [v.get('type')])) | |
except Exception as e: | |
# perform_schema_validation = False | |
# print(f'TYPE CONVERSION ERROR %%%%% Validation ERROR REPORTING ROW number={row_json.get(DataSet.ROW_NUMBER_KEY)}, row data={row_json}, Reason={str(e)}') | |
#e.with_traceback() | |
break | |
# print(f"AFTER Converstion={row_json} schema={ds.json_schema}") | |
if perform_schema_validation: | |
try: | |
jsonschema.validate(instance=row_json, schema=ds.json_schema) | |
except jsonschema.exceptions.ValidationError as e: | |
column_name = e.path.pop() | |
event = {"id":dataset_id, "pipelineId":namespace, | |
"rowNumber": row_json.get(DataSet.ROW_NUMBER_KEY), "clientId": clientId, | |
"errors": [{"errorCode":"Add one here", "columnName": column_name, | |
"errorMessage": e.args[0]}]} | |
# error table already exists in dynamoDB?? | |
# print(f"e.message={str(e.message)}") | |
# print(f"e.context={e.context}") | |
# print(f"e.absolute_path={e.absolute_path}") | |
# print(e.absolute_schema_path) | |
# print(e.args[0]) | |
# print(e.cause) | |
# print(f"e.instance={e.instance}") | |
# print(f'column={column_name}') | |
# print(e.validator) | |
# print(e.validator_value) | |
# print(e.parent) | |
# print(e.__str__) | |
# print(f'SCHEMA ERROR %%%%%Validation ERROR REPORTING ROW number={row_json.get(DataSet.ROW_NUMBER_KEY)}, row data={row_json}, Reason={str(e)}') | |
DatasetError(context=event, create=True) | |
return {"computationId": computationId, | |
"response": {"datasetId": "output_url", "passed":True, | |
"dsDetails": ds.serialize()}} | |
result = self.client.submit(execute).result() | |
print(f"result ={result}") | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment