Skip to content

Instantly share code, notes, and snippets.

@kuznetsov-m
Last active October 15, 2021 12:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kuznetsov-m/91c2f5ca0a13b7644f803b59cf94a4ef to your computer and use it in GitHub Desktop.
Save kuznetsov-m/91c2f5ca0a13b7644f803b59cf94a4ef to your computer and use it in GitHub Desktop.
import pandas as pd
import numpy as np
from datetime import datetime
import json
import time
from dateutil import tz
import models
import db_utils
import telegram_client
class SberPriceMonitor():
_channel_url = 'https://t.me/sber_price_monitor'
def _load_candels_from_moex():
ticker = 'SBER'
from_ = datetime.now().strftime('%Y-%m-%d')
till = from_
interval = 1
# отладка
# from_ = '2021-10-10'
# till = '2021-10-13'
result_df = pd.DataFrame()
for start in range(0, 10000000000, 500):
query = f'http://iss.moex.com/iss/engines/stock/markets/shares/securities/{ticker}/candles.csv?from={from_}&till={till}&interval={interval}&start={start}'
df = None
try_count = 1
timeout = 2
while df is None:
try:
content = requests.get(query, timeout=timeout).content
df = pd.read_csv(
io.StringIO(content.decode('utf-8')),
sep=';',
header=1,
parse_dates=['begin','end']
)
except Exception as e:
print(f'{__name__} Error: Try count: {try_count}. Timeout: {timeout}. Error message: {str(e)}')
timeout = min(timeout * 2, 20)
if df.shape[0]:
result_df = result_df.append(df, ignore_index=True)
else:
break
return result_df
# row - строка из dataframe
def _build_message(self, row) -> str:
# data = json.loads(df_final.iloc[-1:].to_json(orient='records', date_format = 'iso'))[0]
# text = json.dumps(data, indent=0)[2:][:-2]
# return text
end = row['end']
close = row['close']
step = row['step']
step_prev_1 = row['step_prev_1']
simvol = '⏹'
if step > 0:
simvol = '↗️'
elif step < 0:
simvol = '↘️'
return f'{end.to_pydatetime().strftime("%H:%M:%S %d.%m.%Y %Z")}\n{close} ({simvol} {step})'
def loop(self):
timeout = 60
pd.set_option('display.max_rows', None)
step = 0.01
last_send_datetime = None
while True:
print(f'{__name__}: loop')
df = db_utils.last_rows_from_sql_table(table_name=models.SberCandels.__tablename__, rows=2000)
print(f'{__name__}: df rows count ={len(df.index)}')
dt_now = datetime.combine(datetime.utcnow().date(), datetime.min.time())
# dt_now = dt_now.astimezone(tz.UTC)
# отладка
# date_time_obj = datetime.strptime('14.10.21 14:55:19', '%d.%m.%y %H:%M:%S')
# dt_now = datetime.combine(date_time_obj.date(), datetime.min.time())
mask = (df['begin'].dt.tz_localize(None) >= np.datetime64(dt_now))
df = df.loc[mask]
print(f'{__name__}: df rows count after filter ={len(df.index)}')
# Оставляем только сегодняшние строки
# df.loc[df['begin'].dt.date == pd.to_datetime('today').normalize()]
# print(df.iloc()[:1])
if not df.empty:
df.drop(columns=['begin','high','low','value','volume'], inplace=True)
df['end'] += pd.to_timedelta(1, unit='s')
df['frac'] = (df['close'] / df['open'].iloc[0]) - 1
df['step'] = np.nan
for val in range(0, 20):
df.loc[(df['frac'] >= val*step) & (df['frac'] < (val+1)*step), 'step'] = val*step
for val in range(-20, 0):
df.loc[(df['frac'] >= val*step) & (df['frac'] < (val+1)*step), 'step'] = (val+1)*step
df['step_prev_1'] = df['step'].shift(1).fillna(0)
df['step_prev_2'] = df['step'].shift(2).fillna(0)
df['step_prev_3'] = df['step'].shift(3).fillna(0)
df['step_prev_4'] = df['step'].shift(4).fillna(0)
df['step_prev_5'] = df['step'].shift(5).fillna(0)
df_final = df.loc[
(df['step'] == df['step_prev_1']) &
(df['step'] == df['step_prev_2']) &
(df['step'] == df['step_prev_3']) &
(df['step'] == df['step_prev_4']) &
(df['step'] != df['step_prev_5']),
['end','close','frac','step']
]
df_final['step_prev_1'] = df_final['step'].shift(1).fillna(0)
df_final = df_final.loc[df_final['step'] != df_final['step_prev_1']]
print(f'{__name__}: df_final rows count ={len(df_final.index)}')
print(df_final)
if not df_final.empty and last_send_datetime != df_final.iloc[-1]['end']:
# debug code block
# print(f'{__name__}: df_final\n{df_final}')
# df_final.to_csv('df_final.csv', sep='\t', encoding='utf-8')
# # telegram_bot.send_document_to_user('df_final.csv')
# print(f'{__name__}: df_final.csv was sent')
############
row = df_final.iloc[-1]
text = _build_message(row)
print(f'{__name__}: post publication')
telegram_client.post_message_to_channel(
message=text,
channel_url=self._channel_url
)
print(f'{__name__}: post successfully published')
last_send_datetime = df_final.iloc[-1]['end']
time.sleep(timeout)
def run(self):
while True:
try:
# telegram_client.post_message_to_channel(
# message='Monitor restarted',
# channel_url=self._channel_url
# )
self.loop()
except Exception as e:
print(f'{__name__}: EXCEPTION: {str(e)}')
monitor = SberPriceMonitor()
monitor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment