updated general fundamental data
import asyncio
from asyncio import Future
from typing import List, Optional, Tuple, Coroutine, Callable
from import ndb
from src.external_data.eod import DataSource
from src.external_data.eod.data_types import MDict
from src.main.loader import eod_data_source
from src.models.exchanges import Exchange
from src.models.fundamental import (FundamentalGeneral, GeneralAddress, GeneralContact, GeneralListings,GeneralOfficers)
from src.models.stock import Stock
from src.models.temp.fundamental import TempFundamentalJSON
from src.parsers import FundamentalGeneralParser
from src.utils import create_id
from src.utils.my_logger import init_logger
# from numba import jit, vectorize
class DataSourceGeneral:
**Class DataSourceGeneral**
Asynchronously fetches
Exchange Data
Ticker Data
General Fundamental Data
and store it asynchronously to GCP datastore
Estimated EOD api count for this Class is
Exchanges + Tickers = 144
Fundamental Data = ?
def __init__(self, source: DataSource = eod_data_source) -> None:
self._src: DataSource = source
self._ndb_min_batch_size: int = 500
self._logger = init_logger("data_source_general")
async def exchange_exist(exchange_list: List[Exchange], code: str) -> bool:
Returns True if the exchange with the specified code is already present in
the list of Exchange objects, False otherwise. The results are cached using
the functools.lru_cache decorator.
:param exchange_list: List of Exchange objects to search.
:param code: Code of the exchange to search for.
:return: True if the exchange is present, False otherwise. """
return any([exchange for exchange in exchange_list if exchange.code.lower() == code.lower()])
async def stock_exist(stock_list: List[Stock], code: str) -> bool:
Returns True if the stock with the specified code is already present in the list of Stock objects,
False otherwise. The results are cached using the functools.lru_cache decorator.
:param stock_list: List of Stock objects to search.
:param code: Code of the stock to search for.
:return: True if the stock is present, False otherwise.
return any([stock for stock in stock_list if stock.code.lower() == code.lower()])
def _exchange_code_by_id(exchange_id: str, exchange_lists: List[Exchange]) -> Optional[str]:
Returns the code of the exchange with the specified exchange ID from the list of Exchange objects,
if it is present. The results are cached using the functools.lru_cache decorator.
:param exchange_id: Exchange ID of the exchange to search for.
:param exchange_lists: List of Exchange objects to search.
:return: Code of the exchange if it is present, None otherwise.
for exchange in exchange_lists:
if exchange.exchange_id.lower() == exchange_id.lower():
return exchange.code
async def parse_exchange_data(exchange: MDict) -> MDict:
"""parses exchange data into our local format NOTE: some exchange do not have Operates"""
# NOTE if code is not present we will create our own CODE, if OPERATING MIC not present will create own
return dict(name=exchange.get('Name'),
code=exchange.get('Code', create_id(size=4)),
operating_mic=exchange.get('OperatingMIC', create_id(size=4)),
async def parse_stock_data(stock: MDict) -> MDict:
Converts the stock data in the specified MDict object into a format that can be stored
in the database.
:param stock: MDict object containing stock data.
:return: Dictionary containing the stock data in a format suitable for storage in the database.
return dict(code=stock.get('Code'),
async def update_exchanges_from_eod(self):
Fetches a list of exchanges from the specified data source and stores them in the
database using the NDB client.
:return: List of keys for the saved exchanges in the database.
# creating a request to eod to fetch a list of exchanges
eod_url: str = f"{self._src.base_url}/exchanges-list/"
args: dict[str, str] = dict(api_token=self._src.api_token, fmt='json')
json_data: List[dict[str, str]] = await self._src.async_get_request(_url=eod_url, args=args)
# Semaphore is not needed as there are 60 approximate exchanges in the world
_total_exchanges_put = 0
with self._src.context:
# noinspection PyArgumentList
# use a single query to fetch all of the existing exchanges
ex_exchange_list: List[Exchange] = await Exchange.get_all_exchanges()
# create a set of existing exchange codes
existing_codes = {exchange.code.lower() for exchange in ex_exchange_list}
# use the map function to asynchronously process the exchanges in parallel
processed_exchanges = [await _exchange for _exchange in map(self.parse_exchange_data, json_data)]
# create a list of exchanges from new_exchanges which are not already in ex_exchange_list
# use exchange_code to check for similarity
_unique_exchanges: List[MDict] = [exchange for exchange in processed_exchanges
if exchange.get('code').lower() not in existing_codes]
list_exchanges: List[Exchange] = [Exchange(**{**exchange, "exchange_id": create_id()})
for exchange in _unique_exchanges]
exchange_puts_list = map(self.put_exchange, list_exchanges)
for _put_task in asyncio.as_completed(exchange_puts_list):
await _put_task
_total_exchanges_put += 1
_message: str = f"added a total number of {_total_exchanges_put} , Exchanges in the database"
return dict(exchanges=_total_exchanges_put, message=_message)
async def put_exchange(exchange: Exchange):
return exchange.put_async().get_result()
async def update_tickers(self) -> None:
read all the exchanges from the database for each exchange
obtain a list of ticker symbols the store on the database
:return: None
with self._src.context:
exchange_list: List[Exchange] = await Exchange.get_all_exchanges()
_requester = self._src.async_get_request
tasks = [self.get_store_tickers_routine(exchange=exchange, requester=_requester) for exchange in exchange_list]
# this loads all the stock tickers for each exchange
# wait for all tasks to complete and filter out any None values
# NOTE: in one go this will complile a list of valid tasks
_ = [await task for task in asyncio.as_completed([task for task in tasks if task])]
async def get_store_tickers_routine(self, exchange: Exchange, requester: Callable) -> None:
get_store_tickers_routine is a set_greeting that asynchronously fetches a list of
ticker symbols for the specified exchange
from the data source and stores them in the database using the NDB client.
:param exchange: Exchange object for which to fetch ticker symbols.
:param requester: Callable that makes async HTTP requests.
:return: Key for the saved ticker symbols in the database, or None if no tickers were saved.
_args = dict(api_token=self._src.api_token, fmt="json")
_url = f"{self._src.base_url}exchange-symbol-list/{exchange.code}"
json_data: List[MDict] = await requester(_url=_url, args=_args)
# fetching stock symbols which already exists
stock_list: List[Stock] = await Stock.get_stock_by_exchange_id(exchange_id=exchange.exchange_id)
# if the length of the returned json data is equal to the length of stock_list then we have all the ticker
# symbols
if (len(json_data) == len(stock_list)) or not len(json_data):
# POTENTIAL BUG One stock is removed then another added before any processing is done, then no stock
# will be added
# processed_tickers = list()
_json_data = [await _data for _data in map(self.parse_stock_data, json_data)]
for i in range(0, len(_json_data), self._src.request_limit):
async with self._src.semaphore:
_tasks = [self.process_tickers(exchange=exchange, stock=stock)
for stock in _json_data[i:i + self._src.request_limit]
if not await self.stock_exist(stock_list=stock_list, code=stock.get('Code'))]
# processing tickers
_processed_tickers = [await _task for _task in asyncio.as_completed(_tasks)]
# storing part of the stock for this exchange
ticker_puts_map = map(self.put_ticker, _processed_tickers)
for _task in asyncio.as_completed(ticker_puts_map):
await _task
async def put_ticker(ticker: Stock):
put_ticket is a method to add a new stock to the database.
:param ticker: Stock object to add to the database.
:return: None
return ticker.put_async().get_result()
async def process_tickers(exchange: Exchange, stock: MDict) -> Stock:
process_tickers is a static set_greeting that asynchronously creates a Stock object from the
specified stock data and exchange, and returns the resulting object.
:param exchange: Exchange object to use for creating the Stock object.
:param stock: MDict object containing stock data.
:return: Stock object created from the specified stock data and exchange.
return Stock(**{**stock, "stock_id":stock.get("stock_id", create_id()), "exchange_id":exchange.exchange_id})
async def save_general_data_to_datastore(self) -> None:
if data present in self.fundamental_json go through it and save the data
:return: None 127
with self._src.context:
await self._src.update_status_fundamentals()
_generator = self._src.fundamentals_generator
for fundamentals_list in _generator(count=self._src.total_fundamentals, size=self._src.request_limit):
tasks = [self._save_general_data_routine(fundamental_data) for fundamental_data in fundamentals_list]
for task in asyncio.as_completed(tasks):
await task
async def _save_general_data_routine(self, fundamental_data: TempFundamentalJSON) -> Optional[List[Coroutine]]:
enables async execution of each chunk of json_data
:param fundamental_data:
:return: fundamental_data, dict
parser: FundamentalGeneralParser = FundamentalGeneralParser(json_data=fundamental_data.json_data)
if parser.is_etf:
# transtion will ensure that all this either gets committed to database or do not get committed to database
# noinspection PyArgumentList
async with self._src.semaphore:
# NOTE this is the time when the infamous fundamental_id gets created
fundamental_id, general_data = await self.async_parse_general(parser)
_tasks: List[Coroutine] = [self.async_store_general_instance(general_data),
self._store_address_routine(fundamental_id, parser),
self._store_contact_routine(fundamental_id, parser),
self._store_listings_routine(fundamental_id, parser),
self._store_officers_routine(fundamental_id, parser)]
for _task in asyncio.as_completed(_tasks):
await _task
# return [await _task for _task in asyncio.as_completed(_tasks)]
async def _store_officers_routine(self, fundamental_id: str, parser: FundamentalGeneralParser) -> List[Coroutine]:
will create several routines to store officers data into the database
:param fundamental_id (str):
:param parser (FundamentalGeneralParser):
# general officers listing
_gol: List[MDict] = await self.async_parse_officers(fundamental_id, parser)
# return await asyncio.gather(*list(map(lambda _go: GeneralOfficers(**_go).put_async().get_result(), _gol)))
return await ndb.put_multi_async([GeneralOfficers(**_go) for _go in _gol])
async def _store_listings_routine(self, fundamental_id: str, parser: FundamentalGeneralParser):
# NOTE: this may not be asynchronous
general_listings_list: List[MDict] = await self.async_parse_listings(fundamental_id, parser)
return await ndb.put_multi_async([GeneralListings(**_sl) for _sl in general_listings_list])
async def _store_contact_routine(self, fundamental_id: str, parser: FundamentalGeneralParser) -> Future:
general_contact: MDict = await self.async_parse_contact(fundamental_id, parser)
return GeneralContact(**general_contact).put_async(use_memcache=True).get_result()
async def _store_address_routine(self, fundamental_id: str, parser: FundamentalGeneralParser) -> Future:
general_address: MDict = await self.async_parse_address(fundamental_id, parser)
return GeneralAddress(**general_address).put_async(use_memcache=True).get_result()
async def async_store_general_instance(self, general_data: MDict) -> Future:
with self._src.context:
general_data_instance: FundamentalGeneral = FundamentalGeneral(**general_data)
return general_data_instance.put_async(use_memcache=True).get_result()
async def async_parse_officers(fundamental_id: str, parser: FundamentalGeneralParser) -> List[MDict]:
general_officers_list: List[MDict] = await parser.parse_officers_data()
return [{**_officer, 'fundamental_id': fundamental_id} for _officer in general_officers_list]
async def async_parse_listings(fundamental_id: str, parser: FundamentalGeneralParser) -> List[MDict]:
general_listings_list: List[MDict] = await parser.parse_listings_data()
# updating fundamental id for each item in the list then return the processed list
return [{**_general, 'fundamental_id': fundamental_id} for _general in general_listings_list]
async def async_parse_contact(fundamental_id: str, parser: FundamentalGeneralParser) -> MDict:
general_contact: MDict = await parser.parse_contact_data()
return {**general_contact, 'fundamental_id': fundamental_id}
async def async_parse_address(fundamental_id: str, parser: FundamentalGeneralParser) -> MDict:
general_address: MDict = await parser.parse_address_data()
return {**general_address, 'fundamental_id':fundamental_id}
async def async_parse_general(parser: FundamentalGeneralParser) -> Tuple[str, MDict]:
general_data: MDict = await parser.parse()
fundamental_id: str = create_id()
general_data = {**general_data, 'fundamental_id':fundamental_id}
return fundamental_id, general_data
