Created
January 9, 2023 14:21
-
-
Save freelancing-solutions/7159878c935e35db3d2610a6d2327a8e to your computer and use it in GitHub Desktop.
updated general fundamental data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from asyncio import Future | |
from typing import List, Optional, Tuple, Coroutine, Callable | |
from google.cloud import ndb | |
from src.external_data.eod import DataSource | |
from src.external_data.eod.data_types import MDict | |
from src.main.loader import eod_data_source | |
from src.models.exchanges import Exchange | |
from src.models.fundamental import (FundamentalGeneral, GeneralAddress, GeneralContact, GeneralListings,GeneralOfficers) | |
from src.models.stock import Stock | |
from src.models.temp.fundamental import TempFundamentalJSON | |
from src.parsers import FundamentalGeneralParser | |
from src.utils import create_id | |
from src.utils.my_logger import init_logger | |
# from numba import jit, vectorize | |
# THIS IS THE ACTUAL EOD SOURCE | |
class DataSourceGeneral: | |
""" | |
**Class DataSourceGeneral** | |
Asynchronously fetches | |
Exchange Data | |
Ticker Data | |
General Fundamental Data | |
and store it asynchronously to GCP datastore | |
Estimated EOD api count for this Class is | |
Exchanges + Tickers = 144 | |
Fundamental Data = ? | |
""" | |
def __init__(self, source: DataSource = eod_data_source) -> None: | |
self._src: DataSource = source | |
self._ndb_min_batch_size: int = 500 | |
self._logger = init_logger("data_source_general") | |
@staticmethod | |
async def exchange_exist(exchange_list: List[Exchange], code: str) -> bool: | |
""" | |
Returns True if the exchange with the specified code is already present in | |
the list of Exchange objects, False otherwise. The results are cached using | |
the functools.lru_cache decorator. | |
:param exchange_list: List of Exchange objects to search. | |
:param code: Code of the exchange to search for. | |
:return: True if the exchange is present, False otherwise. """ | |
return any([exchange for exchange in exchange_list if exchange.code.lower() == code.lower()]) | |
@staticmethod | |
async def stock_exist(stock_list: List[Stock], code: str) -> bool: | |
""" | |
**stock_exist** | |
Returns True if the stock with the specified code is already present in the list of Stock objects, | |
False otherwise. The results are cached using the functools.lru_cache decorator. | |
:param stock_list: List of Stock objects to search. | |
:param code: Code of the stock to search for. | |
:return: True if the stock is present, False otherwise. | |
""" | |
return any([stock for stock in stock_list if stock.code.lower() == code.lower()]) | |
@staticmethod | |
def _exchange_code_by_id(exchange_id: str, exchange_lists: List[Exchange]) -> Optional[str]: | |
""" | |
Returns the code of the exchange with the specified exchange ID from the list of Exchange objects, | |
if it is present. The results are cached using the functools.lru_cache decorator. | |
:param exchange_id: Exchange ID of the exchange to search for. | |
:param exchange_lists: List of Exchange objects to search. | |
:return: Code of the exchange if it is present, None otherwise. | |
""" | |
for exchange in exchange_lists: | |
if exchange.exchange_id.lower() == exchange_id.lower(): | |
return exchange.code | |
@staticmethod | |
async def parse_exchange_data(exchange: MDict) -> MDict: | |
"""parses exchange data into our local format NOTE: some exchange do not have Operates""" | |
# NOTE if code is not present we will create our own CODE, if OPERATING MIC not present will create own | |
return dict(name=exchange.get('Name'), | |
code=exchange.get('Code', create_id(size=4)), | |
operating_mic=exchange.get('OperatingMIC', create_id(size=4)), | |
country=exchange.get('Country'), | |
currency_symbol=exchange.get('Currency')) | |
@staticmethod | |
async def parse_stock_data(stock: MDict) -> MDict: | |
""" | |
Converts the stock data in the specified MDict object into a format that can be stored | |
in the database. | |
:param stock: MDict object containing stock data. | |
:return: Dictionary containing the stock data in a format suitable for storage in the database. | |
""" | |
return dict(code=stock.get('Code'), | |
name=stock.get('Name'), | |
country=stock.get('Country'), | |
currency=stock.get('Currency'), | |
stock_type=stock.get('Type'), | |
exchange=stock.get('Exchange')) | |
async def update_exchanges_from_eod(self): | |
""" | |
**update_exchanges_from_eod** | |
Fetches a list of exchanges from the specified data source and stores them in the | |
database using the NDB client. | |
https://eodhistoricaldata.com/api/exchanges-list/?api_token=api_token&fmt=json | |
:return: List of keys for the saved exchanges in the database. | |
""" | |
# creating a request to eod to fetch a list of exchanges | |
eod_url: str = f"{self._src.base_url}/exchanges-list/" | |
args: dict[str, str] = dict(api_token=self._src.api_token, fmt='json') | |
json_data: List[dict[str, str]] = await self._src.async_get_request(_url=eod_url, args=args) | |
# Semaphore is not needed as there are 60 approximate exchanges in the world | |
_total_exchanges_put = 0 | |
with self._src.context: | |
# noinspection PyArgumentList | |
# use a single query to fetch all of the existing exchanges | |
ex_exchange_list: List[Exchange] = await Exchange.get_all_exchanges() | |
# create a set of existing exchange codes | |
existing_codes = {exchange.code.lower() for exchange in ex_exchange_list} | |
# use the map function to asynchronously process the exchanges in parallel | |
processed_exchanges = [await _exchange for _exchange in map(self.parse_exchange_data, json_data)] | |
# create a list of exchanges from new_exchanges which are not already in ex_exchange_list | |
# use exchange_code to check for similarity | |
_unique_exchanges: List[MDict] = [exchange for exchange in processed_exchanges | |
if exchange.get('code').lower() not in existing_codes] | |
list_exchanges: List[Exchange] = [Exchange(**{**exchange, "exchange_id": create_id()}) | |
for exchange in _unique_exchanges] | |
exchange_puts_list = map(self.put_exchange, list_exchanges) | |
for _put_task in asyncio.as_completed(exchange_puts_list): | |
await _put_task | |
_total_exchanges_put += 1 | |
_message: str = f"added a total number of {_total_exchanges_put} , Exchanges in the database" | |
self._logger.info(_message) | |
return dict(exchanges=_total_exchanges_put, message=_message) | |
@staticmethod | |
async def put_exchange(exchange: Exchange): | |
""" | |
:return: | |
""" | |
return exchange.put_async().get_result() | |
async def update_tickers(self) -> None: | |
""" | |
**get_tickers** | |
read all the exchanges from the database for each exchange | |
obtain a list of ticker symbols the store on the database | |
https://eodhistoricaldata.com/api/exchange-symbol-list/LSE?api_token=api | |
:return: None | |
""" | |
with self._src.context: | |
exchange_list: List[Exchange] = await Exchange.get_all_exchanges() | |
self._logger.info(exchange_list) | |
_requester = self._src.async_get_request | |
tasks = [self.get_store_tickers_routine(exchange=exchange, requester=_requester) for exchange in exchange_list] | |
# this loads all the stock tickers for each exchange | |
# wait for all tasks to complete and filter out any None values | |
# NOTE: in one go this will complile a list of valid tasks | |
_ = [await task for task in asyncio.as_completed([task for task in tasks if task])] | |
return | |
async def get_store_tickers_routine(self, exchange: Exchange, requester: Callable) -> None: | |
""" | |
**get_tickers_routine** | |
get_store_tickers_routine is a set_greeting that asynchronously fetches a list of | |
ticker symbols for the specified exchange | |
from the data source and stores them in the database using the NDB client. | |
:param exchange: Exchange object for which to fetch ticker symbols. | |
:param requester: Callable that makes async HTTP requests. | |
:return: Key for the saved ticker symbols in the database, or None if no tickers were saved. | |
""" | |
_args = dict(api_token=self._src.api_token, fmt="json") | |
_url = f"{self._src.base_url}exchange-symbol-list/{exchange.code}" | |
json_data: List[MDict] = await requester(_url=_url, args=_args) | |
# fetching stock symbols which already exists | |
stock_list: List[Stock] = await Stock.get_stock_by_exchange_id(exchange_id=exchange.exchange_id) | |
# if the length of the returned json data is equal to the length of stock_list then we have all the ticker | |
# symbols | |
if (len(json_data) == len(stock_list)) or not len(json_data): | |
# POTENTIAL BUG One stock is removed then another added before any processing is done, then no stock | |
# will be added | |
return | |
# processed_tickers = list() | |
_json_data = [await _data for _data in map(self.parse_stock_data, json_data)] | |
for i in range(0, len(_json_data), self._src.request_limit): | |
async with self._src.semaphore: | |
_tasks = [self.process_tickers(exchange=exchange, stock=stock) | |
for stock in _json_data[i:i + self._src.request_limit] | |
if not await self.stock_exist(stock_list=stock_list, code=stock.get('Code'))] | |
# processing tickers | |
_processed_tickers = [await _task for _task in asyncio.as_completed(_tasks)] | |
# storing part of the stock for this exchange | |
ticker_puts_map = map(self.put_ticker, _processed_tickers) | |
for _task in asyncio.as_completed(ticker_puts_map): | |
await _task | |
return | |
@staticmethod | |
async def put_ticker(ticker: Stock): | |
""" | |
**put_ticket** | |
put_ticket is a method to add a new stock to the database. | |
:param ticker: Stock object to add to the database. | |
:return: None | |
""" | |
return ticker.put_async().get_result() | |
@staticmethod | |
async def process_tickers(exchange: Exchange, stock: MDict) -> Stock: | |
""" | |
**process_tiokers** | |
process_tickers is a static set_greeting that asynchronously creates a Stock object from the | |
specified stock data and exchange, and returns the resulting object. | |
:param exchange: Exchange object to use for creating the Stock object. | |
:param stock: MDict object containing stock data. | |
:return: Stock object created from the specified stock data and exchange. | |
""" | |
return Stock(**{**stock, "stock_id":stock.get("stock_id", create_id()), "exchange_id":exchange.exchange_id}) | |
async def save_general_data_to_datastore(self) -> None: | |
""" | |
**save_data_to_datastore** | |
if data present in self.fundamental_json go through it and save the data | |
:return: None 127 | |
""" | |
with self._src.context: | |
await self._src.update_status_fundamentals() | |
_generator = self._src.fundamentals_generator | |
for fundamentals_list in _generator(count=self._src.total_fundamentals, size=self._src.request_limit): | |
tasks = [self._save_general_data_routine(fundamental_data) for fundamental_data in fundamentals_list] | |
for task in asyncio.as_completed(tasks): | |
await task | |
return | |
async def _save_general_data_routine(self, fundamental_data: TempFundamentalJSON) -> Optional[List[Coroutine]]: | |
""" | |
**save_general_routine** | |
enables async execution of each chunk of json_data | |
:param fundamental_data: | |
:return: fundamental_data, dict | |
""" | |
parser: FundamentalGeneralParser = FundamentalGeneralParser(json_data=fundamental_data.json_data) | |
parser.set_data(json_data=fundamental_data.json_data) | |
if parser.is_etf: | |
return | |
# transtion will ensure that all this either gets committed to database or do not get committed to database | |
# noinspection PyArgumentList | |
async with self._src.semaphore: | |
# NOTE this is the time when the infamous fundamental_id gets created | |
fundamental_id, general_data = await self.async_parse_general(parser) | |
_tasks: List[Coroutine] = [self.async_store_general_instance(general_data), | |
self._store_address_routine(fundamental_id, parser), | |
self._store_contact_routine(fundamental_id, parser), | |
self._store_listings_routine(fundamental_id, parser), | |
self._store_officers_routine(fundamental_id, parser)] | |
for _task in asyncio.as_completed(_tasks): | |
await _task | |
# return [await _task for _task in asyncio.as_completed(_tasks)] | |
async def _store_officers_routine(self, fundamental_id: str, parser: FundamentalGeneralParser) -> List[Coroutine]: | |
""" | |
**_store_officers_routine** | |
will create several routines to store officers data into the database | |
:param fundamental_id (str): | |
:param parser (FundamentalGeneralParser): | |
:return: | |
""" | |
# general officers listing | |
_gol: List[MDict] = await self.async_parse_officers(fundamental_id, parser) | |
# return await asyncio.gather(*list(map(lambda _go: GeneralOfficers(**_go).put_async().get_result(), _gol))) | |
return await ndb.put_multi_async([GeneralOfficers(**_go) for _go in _gol]) | |
async def _store_listings_routine(self, fundamental_id: str, parser: FundamentalGeneralParser): | |
# NOTE: this may not be asynchronous | |
general_listings_list: List[MDict] = await self.async_parse_listings(fundamental_id, parser) | |
return await ndb.put_multi_async([GeneralListings(**_sl) for _sl in general_listings_list]) | |
async def _store_contact_routine(self, fundamental_id: str, parser: FundamentalGeneralParser) -> Future: | |
general_contact: MDict = await self.async_parse_contact(fundamental_id, parser) | |
return GeneralContact(**general_contact).put_async(use_memcache=True).get_result() | |
async def _store_address_routine(self, fundamental_id: str, parser: FundamentalGeneralParser) -> Future: | |
general_address: MDict = await self.async_parse_address(fundamental_id, parser) | |
return GeneralAddress(**general_address).put_async(use_memcache=True).get_result() | |
async def async_store_general_instance(self, general_data: MDict) -> Future: | |
with self._src.context: | |
general_data_instance: FundamentalGeneral = FundamentalGeneral(**general_data) | |
return general_data_instance.put_async(use_memcache=True).get_result() | |
@staticmethod | |
async def async_parse_officers(fundamental_id: str, parser: FundamentalGeneralParser) -> List[MDict]: | |
general_officers_list: List[MDict] = await parser.parse_officers_data() | |
return [{**_officer, 'fundamental_id': fundamental_id} for _officer in general_officers_list] | |
@staticmethod | |
async def async_parse_listings(fundamental_id: str, parser: FundamentalGeneralParser) -> List[MDict]: | |
general_listings_list: List[MDict] = await parser.parse_listings_data() | |
# updating fundamental id for each item in the list then return the processed list | |
return [{**_general, 'fundamental_id': fundamental_id} for _general in general_listings_list] | |
@staticmethod | |
async def async_parse_contact(fundamental_id: str, parser: FundamentalGeneralParser) -> MDict: | |
general_contact: MDict = await parser.parse_contact_data() | |
return {**general_contact, 'fundamental_id': fundamental_id} | |
@staticmethod | |
async def async_parse_address(fundamental_id: str, parser: FundamentalGeneralParser) -> MDict: | |
general_address: MDict = await parser.parse_address_data() | |
return {**general_address, 'fundamental_id':fundamental_id} | |
@staticmethod | |
async def async_parse_general(parser: FundamentalGeneralParser) -> Tuple[str, MDict]: | |
general_data: MDict = await parser.parse() | |
fundamental_id: str = create_id() | |
general_data = {**general_data, 'fundamental_id':fundamental_id} | |
return fundamental_id, general_data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment