Skip to content

Instantly share code, notes, and snippets.

@vicotrbb
Created October 17, 2021 21:21
Show Gist options
  • Save vicotrbb/0aa53c54fa804ee7a0d1b4c026f53019 to your computer and use it in GitHub Desktop.
Save vicotrbb/0aa53c54fa804ee7a0d1b4c026f53019 to your computer and use it in GitHub Desktop.
import urllib.request
import urllib.parse
import http.cookiejar
import json
import time
def to_decimal(string):
string = string.replace('.', '') \
.replace(',', '.') \
.replace('\n', '') \
.strip()
if (string.endswith('%')):
string = string[:-1]
return float(string) / 100
else:
return float(string)
def convert_pandas_to_json(dataframe, attributes,
has_data_index=True, orientation="records"):
if attributes:
dataframe = dataframe[attributes]
dataframe.reset_index(level=0, inplace=True)
if has_data_index:
dataframe['Date'] = dataframe['Date'].astype(str)
dataframe_json_result = dataframe.to_json(orient=orientation)
dataframe_parsed = json.loads(dataframe_json_result)
json.dumps(dataframe_parsed, indent=4)
return dataframe_parsed
def get_url_opener_config():
cookie_jar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookie_jar))
opener.addheaders = [('User-agent', 'Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201'),
('Accept', 'text/html, text/plain, text/css, text/sgml, */*;q=0.01')]
return opener
def add_error_message_to_pipeline(stock, error_msg, step):
if 'error' not in stock:
stock['error'] = list()
stock['error'].append({
'message': error_msg,
'step': step
})
class Pipeline():
def __init__(self, skip_step_when_fail=False, retry=False, retry_count=1, create_report=False, report_to=None, async_pipeline=False, wait_for_next_iteration=False, wait_seconds=0.25):
self.steps = []
self.skip_step_when_fail = skip_step_when_fail
self.retry = retry
self.retry_count = retry_count
self.create_report = create_report
self.report_to = report_to
self.execution_report = {}
self.async_pipeline = async_pipeline
self.wait_for_next_iteration = wait_for_next_iteration
self.wait_seconds = wait_seconds
def add_step(self, name, function):
self.steps.append({'name': name, 'callable': function})
return self
def create_and_send_report(self, execution_report):
if not self.report_to:
raise RuntimeError('A report has been requested but no recipient email was provided.')
def evaluate(self):
pass
def teardown(self):
if not self.async_pipeline:
raise RuntimeError("Non async pipelines can't be turned off.")
def get_steps(self):
return self.steps
def execute(self, memoized):
for ix, step in enumerate(self.steps):
print(f"Running {step['name']} step")
try:
step['callable'](memoized)
print(f"Step {step['name']} runned well")
except Exception as e:
if self.retry:
for retry_count in range(0, self.retry_count):
try:
print(f"Running retry number {retry_count + 1}")
step['callable'](memoized)
except:
continue
if self.skip_step_when_fail:
print(
f"Failed at {step['name']} step, skipping this step since the option skip_step_when_fail is enabled.")
print(f'Error: {str(e)}')
continue
print(f"Failed at {step['name']} step")
raise RuntimeError(f'A step has failed with error: {str(e)}')
if self.wait_for_next_iteration:
time.sleep(self.wait_seconds)
return self
@staticmethod
def make_pipeline(steps, skip_step_when_fail=False, retry=False, create_report=True, report_to=None, async_pipeline=False, wait_for_next_iteration=False, wait_seconds=0.25):
if not isinstance(steps, list):
raise TypeError("The steps input must be list like object.")
pipeline = Pipeline(skip_step_when_fail, retry, create_report, report_to,
async_pipeline, wait_for_next_iteration, wait_seconds)
for function in steps:
if not callable(function):
raise TypeError("All the pipeline steps must be callables.")
pipeline.add_step(function.__name__, function)
return pipeline
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment