Created
December 25, 2022 08:01
-
-
Save freelancing-solutions/f701ce0f7676f6126c1d08e3da90112e to your computer and use it in GitHub Desktop.
Python - Fast Method of Processing Fundamental Highlights Data from using asyncio and ndb.put_multi_async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from typing import List, Optional, Coroutine | |
from google.cloud import ndb | |
from src.external_data.eod import DataSource, MDict | |
from src.main.loader import eod_data_source | |
from src.models.fundamental import FundamentalHighlights | |
from src.models.temp.fundamental import TempFundamentalJSON | |
from src.parsers import FundamentalHighlightsParser | |
from itertools import chain | |
class DataSourceHighlights: | |
""" | |
**DataSourceHighlights** | |
fetches raw fundamental data from datastore and does some processing then store the fundamental highlights | |
data back to database | |
this class serves as a link between this api and EOD api in regard to highlights data | |
""" | |
def __init__(self, source: DataSource = eod_data_source): | |
self._src: DataSource = source | |
async def set_fundamental_highlights(self) -> List[ndb.Key]: | |
""" | |
**set_fundamental_highlights** | |
high throughput fundamental highlights processing methods using asyncio.as_completed and NDB | |
put_multi_async to increase performance | |
:return: None | |
""" | |
_storage_tasks = [] | |
for f_list in self._src.fundamentals_generator(count=self._src.total_fundamentals, size=self._src.request_limit): | |
# creating fundamental processing tasks | |
_tasks: List[Coroutine] = [self.async_process_highlights(_fundamental) for _fundamental in f_list] | |
results: List[Optional[FundamentalHighlights]] = [await _task for _task in asyncio.as_completed(_tasks)] | |
# removing None results | |
highlights_list: List[FundamentalHighlights] = [_result for _result in results if _result] | |
# batch storing fundamental highlights data | |
with self._src.context: | |
# Note because of the limit in fundamentals generator this will store 500 results at a time | |
_storage_tasks.append(ndb.put_multi_async(highlights_list, use_memstore=True)) | |
# waiting for all storage requests to be finished then flatten the list and return | |
return list(chain(*[await _storage for _storage in asyncio.as_completed(_storage_tasks)])) | |
async def async_process_highlights(self, _fundamental: TempFundamentalJSON) -> Optional[FundamentalHighlights]: | |
""" | |
**async_put_highlights** | |
asynchronously process highlights_data | |
""" | |
parser: FundamentalHighlightsParser = FundamentalHighlightsParser(json_data=_fundamental.json_data) | |
highlights_data: MDict = await parser.parse_highlights_data() | |
if not highlights_data: | |
return | |
# Note for each fundamental data the parser will hold the exchange and stock property which i am | |
# using here to lookup fundamental_id | |
fundamental_id: str = await self._src.get_fundamental_id(exchange=parser.exchange, stock_symbol=parser.stock) | |
# update highlights_data with fundamental_id then create class by passing kwargs to FundamentalHighlights | |
return FundamentalHighlights(**{**highlights_data, 'fundamental_id': fundamental_id}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Is there a way to improve the speed of execution ?