Skip to content

Instantly share code, notes, and snippets.

@freelancing-solutions
Created December 25, 2022 08:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save freelancing-solutions/f701ce0f7676f6126c1d08e3da90112e to your computer and use it in GitHub Desktop.
Save freelancing-solutions/f701ce0f7676f6126c1d08e3da90112e to your computer and use it in GitHub Desktop.
Python - Fast Method of Processing Fundamental Highlights Data from using asyncio and ndb.put_multi_async
import asyncio
from typing import List, Optional, Coroutine
from google.cloud import ndb
from src.external_data.eod import DataSource, MDict
from src.main.loader import eod_data_source
from src.models.fundamental import FundamentalHighlights
from src.models.temp.fundamental import TempFundamentalJSON
from src.parsers import FundamentalHighlightsParser
from itertools import chain
class DataSourceHighlights:
"""
**DataSourceHighlights**
fetches raw fundamental data from datastore and does some processing then store the fundamental highlights
data back to database
this class serves as a link between this api and EOD api in regard to highlights data
"""
def __init__(self, source: DataSource = eod_data_source):
self._src: DataSource = source
async def set_fundamental_highlights(self) -> List[ndb.Key]:
"""
**set_fundamental_highlights**
high throughput fundamental highlights processing methods using asyncio.as_completed and NDB
put_multi_async to increase performance
:return: None
"""
_storage_tasks = []
for f_list in self._src.fundamentals_generator(count=self._src.total_fundamentals, size=self._src.request_limit):
# creating fundamental processing tasks
_tasks: List[Coroutine] = [self.async_process_highlights(_fundamental) for _fundamental in f_list]
results: List[Optional[FundamentalHighlights]] = [await _task for _task in asyncio.as_completed(_tasks)]
# removing None results
highlights_list: List[FundamentalHighlights] = [_result for _result in results if _result]
# batch storing fundamental highlights data
with self._src.context:
# Note because of the limit in fundamentals generator this will store 500 results at a time
_storage_tasks.append(ndb.put_multi_async(highlights_list, use_memstore=True))
# waiting for all storage requests to be finished then flatten the list and return
return list(chain(*[await _storage for _storage in asyncio.as_completed(_storage_tasks)]))
async def async_process_highlights(self, _fundamental: TempFundamentalJSON) -> Optional[FundamentalHighlights]:
"""
**async_put_highlights**
asynchronously process highlights_data
"""
parser: FundamentalHighlightsParser = FundamentalHighlightsParser(json_data=_fundamental.json_data)
highlights_data: MDict = await parser.parse_highlights_data()
if not highlights_data:
return
# Note for each fundamental data the parser will hold the exchange and stock property which i am
# using here to lookup fundamental_id
fundamental_id: str = await self._src.get_fundamental_id(exchange=parser.exchange, stock_symbol=parser.stock)
# update highlights_data with fundamental_id then create class by passing kwargs to FundamentalHighlights
return FundamentalHighlights(**{**highlights_data, 'fundamental_id': fundamental_id})
@freelancing-solutions
Copy link
Author

Is there a way to improve the speed of execution ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment