Skip to content

Instantly share code, notes, and snippets.

@freelancing-solutions
Created January 9, 2023 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save freelancing-solutions/7159878c935e35db3d2610a6d2327a8e to your computer and use it in GitHub Desktop.
Save freelancing-solutions/7159878c935e35db3d2610a6d2327a8e to your computer and use it in GitHub Desktop.
updated general fundamental data
import asyncio
from asyncio import Future
from typing import List, Optional, Tuple, Coroutine, Callable
from google.cloud import ndb
from src.external_data.eod import DataSource
from src.external_data.eod.data_types import MDict
from src.main.loader import eod_data_source
from src.models.exchanges import Exchange
from src.models.fundamental import (FundamentalGeneral, GeneralAddress, GeneralContact, GeneralListings,GeneralOfficers)
from src.models.stock import Stock
from src.models.temp.fundamental import TempFundamentalJSON
from src.parsers import FundamentalGeneralParser
from src.utils import create_id
from src.utils.my_logger import init_logger
# from numba import jit, vectorize
# THIS IS THE ACTUAL EOD SOURCE
class DataSourceGeneral:
"""
**Class DataSourceGeneral**
Asynchronously fetches
Exchange Data
Ticker Data
General Fundamental Data
and store it asynchronously to GCP datastore
Estimated EOD api count for this Class is
Exchanges + Tickers = 144
Fundamental Data = ?
"""
def __init__(self, source: DataSource = eod_data_source) -> None:
self._src: DataSource = source
self._ndb_min_batch_size: int = 500
self._logger = init_logger("data_source_general")
@staticmethod
async def exchange_exist(exchange_list: List[Exchange], code: str) -> bool:
"""
Returns True if the exchange with the specified code is already present in
the list of Exchange objects, False otherwise. The results are cached using
the functools.lru_cache decorator.
:param exchange_list: List of Exchange objects to search.
:param code: Code of the exchange to search for.
:return: True if the exchange is present, False otherwise. """
return any([exchange for exchange in exchange_list if exchange.code.lower() == code.lower()])
@staticmethod
async def stock_exist(stock_list: List[Stock], code: str) -> bool:
"""
**stock_exist**
Returns True if the stock with the specified code is already present in the list of Stock objects,
False otherwise. The results are cached using the functools.lru_cache decorator.
:param stock_list: List of Stock objects to search.
:param code: Code of the stock to search for.
:return: True if the stock is present, False otherwise.
"""
return any([stock for stock in stock_list if stock.code.lower() == code.lower()])
@staticmethod
def _exchange_code_by_id(exchange_id: str, exchange_lists: List[Exchange]) -> Optional[str]:
"""
Returns the code of the exchange with the specified exchange ID from the list of Exchange objects,
if it is present. The results are cached using the functools.lru_cache decorator.
:param exchange_id: Exchange ID of the exchange to search for.
:param exchange_lists: List of Exchange objects to search.
:return: Code of the exchange if it is present, None otherwise.
"""
for exchange in exchange_lists:
if exchange.exchange_id.lower() == exchange_id.lower():
return exchange.code
@staticmethod
async def parse_exchange_data(exchange: MDict) -> MDict:
"""parses exchange data into our local format NOTE: some exchange do not have Operates"""
# NOTE if code is not present we will create our own CODE, if OPERATING MIC not present will create own
return dict(name=exchange.get('Name'),
code=exchange.get('Code', create_id(size=4)),
operating_mic=exchange.get('OperatingMIC', create_id(size=4)),
country=exchange.get('Country'),
currency_symbol=exchange.get('Currency'))
@staticmethod
async def parse_stock_data(stock: MDict) -> MDict:
"""
Converts the stock data in the specified MDict object into a format that can be stored
in the database.
:param stock: MDict object containing stock data.
:return: Dictionary containing the stock data in a format suitable for storage in the database.
"""
return dict(code=stock.get('Code'),
name=stock.get('Name'),
country=stock.get('Country'),
currency=stock.get('Currency'),
stock_type=stock.get('Type'),
exchange=stock.get('Exchange'))
async def update_exchanges_from_eod(self):
"""
**update_exchanges_from_eod**
Fetches a list of exchanges from the specified data source and stores them in the
database using the NDB client.
https://eodhistoricaldata.com/api/exchanges-list/?api_token=api_token&fmt=json
:return: List of keys for the saved exchanges in the database.
"""
# creating a request to eod to fetch a list of exchanges
eod_url: str = f"{self._src.base_url}/exchanges-list/"
args: dict[str, str] = dict(api_token=self._src.api_token, fmt='json')
json_data: List[dict[str, str]] = await self._src.async_get_request(_url=eod_url, args=args)
# Semaphore is not needed as there are 60 approximate exchanges in the world
_total_exchanges_put = 0
with self._src.context:
# noinspection PyArgumentList
# use a single query to fetch all of the existing exchanges
ex_exchange_list: List[Exchange] = await Exchange.get_all_exchanges()
# create a set of existing exchange codes
existing_codes = {exchange.code.lower() for exchange in ex_exchange_list}
# use the map function to asynchronously process the exchanges in parallel
processed_exchanges = [await _exchange for _exchange in map(self.parse_exchange_data, json_data)]
# create a list of exchanges from new_exchanges which are not already in ex_exchange_list
# use exchange_code to check for similarity
_unique_exchanges: List[MDict] = [exchange for exchange in processed_exchanges
if exchange.get('code').lower() not in existing_codes]
list_exchanges: List[Exchange] = [Exchange(**{**exchange, "exchange_id": create_id()})
for exchange in _unique_exchanges]
exchange_puts_list = map(self.put_exchange, list_exchanges)
for _put_task in asyncio.as_completed(exchange_puts_list):
await _put_task
_total_exchanges_put += 1
_message: str = f"added a total number of {_total_exchanges_put} , Exchanges in the database"
self._logger.info(_message)
return dict(exchanges=_total_exchanges_put, message=_message)
@staticmethod
async def put_exchange(exchange: Exchange):
"""
:return:
"""
return exchange.put_async().get_result()
async def update_tickers(self) -> None:
"""
**get_tickers**
read all the exchanges from the database for each exchange
obtain a list of ticker symbols the store on the database
https://eodhistoricaldata.com/api/exchange-symbol-list/LSE?api_token=api
:return: None
"""
with self._src.context:
exchange_list: List[Exchange] = await Exchange.get_all_exchanges()
self._logger.info(exchange_list)
_requester = self._src.async_get_request
tasks = [self.get_store_tickers_routine(exchange=exchange, requester=_requester) for exchange in exchange_list]
# this loads all the stock tickers for each exchange
# wait for all tasks to complete and filter out any None values
# NOTE: in one go this will complile a list of valid tasks
_ = [await task for task in asyncio.as_completed([task for task in tasks if task])]
return
async def get_store_tickers_routine(self, exchange: Exchange, requester: Callable) -> None:
"""
**get_tickers_routine**
get_store_tickers_routine is a set_greeting that asynchronously fetches a list of
ticker symbols for the specified exchange
from the data source and stores them in the database using the NDB client.
:param exchange: Exchange object for which to fetch ticker symbols.
:param requester: Callable that makes async HTTP requests.
:return: Key for the saved ticker symbols in the database, or None if no tickers were saved.
"""
_args = dict(api_token=self._src.api_token, fmt="json")
_url = f"{self._src.base_url}exchange-symbol-list/{exchange.code}"
json_data: List[MDict] = await requester(_url=_url, args=_args)
# fetching stock symbols which already exists
stock_list: List[Stock] = await Stock.get_stock_by_exchange_id(exchange_id=exchange.exchange_id)
# if the length of the returned json data is equal to the length of stock_list then we have all the ticker
# symbols
if (len(json_data) == len(stock_list)) or not len(json_data):
# POTENTIAL BUG One stock is removed then another added before any processing is done, then no stock
# will be added
return
# processed_tickers = list()
_json_data = [await _data for _data in map(self.parse_stock_data, json_data)]
for i in range(0, len(_json_data), self._src.request_limit):
async with self._src.semaphore:
_tasks = [self.process_tickers(exchange=exchange, stock=stock)
for stock in _json_data[i:i + self._src.request_limit]
if not await self.stock_exist(stock_list=stock_list, code=stock.get('Code'))]
# processing tickers
_processed_tickers = [await _task for _task in asyncio.as_completed(_tasks)]
# storing part of the stock for this exchange
ticker_puts_map = map(self.put_ticker, _processed_tickers)
for _task in asyncio.as_completed(ticker_puts_map):
await _task
return
@staticmethod
async def put_ticker(ticker: Stock):
"""
**put_ticket**
put_ticket is a method to add a new stock to the database.
:param ticker: Stock object to add to the database.
:return: None
"""
return ticker.put_async().get_result()
@staticmethod
async def process_tickers(exchange: Exchange, stock: MDict) -> Stock:
"""
**process_tiokers**
process_tickers is a static set_greeting that asynchronously creates a Stock object from the
specified stock data and exchange, and returns the resulting object.
:param exchange: Exchange object to use for creating the Stock object.
:param stock: MDict object containing stock data.
:return: Stock object created from the specified stock data and exchange.
"""
return Stock(**{**stock, "stock_id":stock.get("stock_id", create_id()), "exchange_id":exchange.exchange_id})
async def save_general_data_to_datastore(self) -> None:
"""
**save_data_to_datastore**
if data present in self.fundamental_json go through it and save the data
:return: None 127
"""
with self._src.context:
await self._src.update_status_fundamentals()
_generator = self._src.fundamentals_generator
for fundamentals_list in _generator(count=self._src.total_fundamentals, size=self._src.request_limit):
tasks = [self._save_general_data_routine(fundamental_data) for fundamental_data in fundamentals_list]
for task in asyncio.as_completed(tasks):
await task
return
async def _save_general_data_routine(self, fundamental_data: TempFundamentalJSON) -> Optional[List[Coroutine]]:
"""
**save_general_routine**
enables async execution of each chunk of json_data
:param fundamental_data:
:return: fundamental_data, dict
"""
parser: FundamentalGeneralParser = FundamentalGeneralParser(json_data=fundamental_data.json_data)
parser.set_data(json_data=fundamental_data.json_data)
if parser.is_etf:
return
# transtion will ensure that all this either gets committed to database or do not get committed to database
# noinspection PyArgumentList
async with self._src.semaphore:
# NOTE this is the time when the infamous fundamental_id gets created
fundamental_id, general_data = await self.async_parse_general(parser)
_tasks: List[Coroutine] = [self.async_store_general_instance(general_data),
self._store_address_routine(fundamental_id, parser),
self._store_contact_routine(fundamental_id, parser),
self._store_listings_routine(fundamental_id, parser),
self._store_officers_routine(fundamental_id, parser)]
for _task in asyncio.as_completed(_tasks):
await _task
# return [await _task for _task in asyncio.as_completed(_tasks)]
async def _store_officers_routine(self, fundamental_id: str, parser: FundamentalGeneralParser) -> List[Coroutine]:
"""
**_store_officers_routine**
will create several routines to store officers data into the database
:param fundamental_id (str):
:param parser (FundamentalGeneralParser):
:return:
"""
# general officers listing
_gol: List[MDict] = await self.async_parse_officers(fundamental_id, parser)
# return await asyncio.gather(*list(map(lambda _go: GeneralOfficers(**_go).put_async().get_result(), _gol)))
return await ndb.put_multi_async([GeneralOfficers(**_go) for _go in _gol])
async def _store_listings_routine(self, fundamental_id: str, parser: FundamentalGeneralParser):
# NOTE: this may not be asynchronous
general_listings_list: List[MDict] = await self.async_parse_listings(fundamental_id, parser)
return await ndb.put_multi_async([GeneralListings(**_sl) for _sl in general_listings_list])
async def _store_contact_routine(self, fundamental_id: str, parser: FundamentalGeneralParser) -> Future:
general_contact: MDict = await self.async_parse_contact(fundamental_id, parser)
return GeneralContact(**general_contact).put_async(use_memcache=True).get_result()
async def _store_address_routine(self, fundamental_id: str, parser: FundamentalGeneralParser) -> Future:
general_address: MDict = await self.async_parse_address(fundamental_id, parser)
return GeneralAddress(**general_address).put_async(use_memcache=True).get_result()
async def async_store_general_instance(self, general_data: MDict) -> Future:
with self._src.context:
general_data_instance: FundamentalGeneral = FundamentalGeneral(**general_data)
return general_data_instance.put_async(use_memcache=True).get_result()
@staticmethod
async def async_parse_officers(fundamental_id: str, parser: FundamentalGeneralParser) -> List[MDict]:
general_officers_list: List[MDict] = await parser.parse_officers_data()
return [{**_officer, 'fundamental_id': fundamental_id} for _officer in general_officers_list]
@staticmethod
async def async_parse_listings(fundamental_id: str, parser: FundamentalGeneralParser) -> List[MDict]:
general_listings_list: List[MDict] = await parser.parse_listings_data()
# updating fundamental id for each item in the list then return the processed list
return [{**_general, 'fundamental_id': fundamental_id} for _general in general_listings_list]
@staticmethod
async def async_parse_contact(fundamental_id: str, parser: FundamentalGeneralParser) -> MDict:
general_contact: MDict = await parser.parse_contact_data()
return {**general_contact, 'fundamental_id': fundamental_id}
@staticmethod
async def async_parse_address(fundamental_id: str, parser: FundamentalGeneralParser) -> MDict:
general_address: MDict = await parser.parse_address_data()
return {**general_address, 'fundamental_id':fundamental_id}
@staticmethod
async def async_parse_general(parser: FundamentalGeneralParser) -> Tuple[str, MDict]:
general_data: MDict = await parser.parse()
fundamental_id: str = create_id()
general_data = {**general_data, 'fundamental_id':fundamental_id}
return fundamental_id, general_data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment