Created
October 17, 2021 21:21
-
-
Save vicotrbb/0aa53c54fa804ee7a0d1b4c026f53019 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib.request | |
import urllib.parse | |
import http.cookiejar | |
import json | |
import time | |
def to_decimal(string): | |
string = string.replace('.', '') \ | |
.replace(',', '.') \ | |
.replace('\n', '') \ | |
.strip() | |
if (string.endswith('%')): | |
string = string[:-1] | |
return float(string) / 100 | |
else: | |
return float(string) | |
def convert_pandas_to_json(dataframe, attributes, | |
has_data_index=True, orientation="records"): | |
if attributes: | |
dataframe = dataframe[attributes] | |
dataframe.reset_index(level=0, inplace=True) | |
if has_data_index: | |
dataframe['Date'] = dataframe['Date'].astype(str) | |
dataframe_json_result = dataframe.to_json(orient=orientation) | |
dataframe_parsed = json.loads(dataframe_json_result) | |
json.dumps(dataframe_parsed, indent=4) | |
return dataframe_parsed | |
def get_url_opener_config(): | |
cookie_jar = http.cookiejar.CookieJar() | |
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookie_jar)) | |
opener.addheaders = [('User-agent', 'Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201'), | |
('Accept', 'text/html, text/plain, text/css, text/sgml, */*;q=0.01')] | |
return opener | |
def add_error_message_to_pipeline(stock, error_msg, step): | |
if 'error' not in stock: | |
stock['error'] = list() | |
stock['error'].append({ | |
'message': error_msg, | |
'step': step | |
}) | |
class Pipeline(): | |
def __init__(self, skip_step_when_fail=False, retry=False, retry_count=1, create_report=False, report_to=None, async_pipeline=False, wait_for_next_iteration=False, wait_seconds=0.25): | |
self.steps = [] | |
self.skip_step_when_fail = skip_step_when_fail | |
self.retry = retry | |
self.retry_count = retry_count | |
self.create_report = create_report | |
self.report_to = report_to | |
self.execution_report = {} | |
self.async_pipeline = async_pipeline | |
self.wait_for_next_iteration = wait_for_next_iteration | |
self.wait_seconds = wait_seconds | |
def add_step(self, name, function): | |
self.steps.append({'name': name, 'callable': function}) | |
return self | |
def create_and_send_report(self, execution_report): | |
if not self.report_to: | |
raise RuntimeError('A report has been requested but no recipient email was provided.') | |
def evaluate(self): | |
pass | |
def teardown(self): | |
if not self.async_pipeline: | |
raise RuntimeError("Non async pipelines can't be turned off.") | |
def get_steps(self): | |
return self.steps | |
def execute(self, memoized): | |
for ix, step in enumerate(self.steps): | |
print(f"Running {step['name']} step") | |
try: | |
step['callable'](memoized) | |
print(f"Step {step['name']} runned well") | |
except Exception as e: | |
if self.retry: | |
for retry_count in range(0, self.retry_count): | |
try: | |
print(f"Running retry number {retry_count + 1}") | |
step['callable'](memoized) | |
except: | |
continue | |
if self.skip_step_when_fail: | |
print( | |
f"Failed at {step['name']} step, skipping this step since the option skip_step_when_fail is enabled.") | |
print(f'Error: {str(e)}') | |
continue | |
print(f"Failed at {step['name']} step") | |
raise RuntimeError(f'A step has failed with error: {str(e)}') | |
if self.wait_for_next_iteration: | |
time.sleep(self.wait_seconds) | |
return self | |
@staticmethod | |
def make_pipeline(steps, skip_step_when_fail=False, retry=False, create_report=True, report_to=None, async_pipeline=False, wait_for_next_iteration=False, wait_seconds=0.25): | |
if not isinstance(steps, list): | |
raise TypeError("The steps input must be list like object.") | |
pipeline = Pipeline(skip_step_when_fail, retry, create_report, report_to, | |
async_pipeline, wait_for_next_iteration, wait_seconds) | |
for function in steps: | |
if not callable(function): | |
raise TypeError("All the pipeline steps must be callables.") | |
pipeline.add_step(function.__name__, function) | |
return pipeline |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment