-
-
Save freelancing-solutions/78932c70c737dfc9950fb9808675ebff to your computer and use it in GitHub Desktop.
async def download_eod_and_store(self, _stock, period, session): | |
""" | |
**download_and_store** | |
download eod data and then store | |
:param _stock: | |
:param period: | |
:param session: | |
:return: | |
""" | |
_data = None | |
with self.lock: | |
try: | |
_data = yf.download(_stock.code, start=period[0], end=period[1], interval=self._interval.interval, | |
ignore_tz=True, repair=True, timeout=15) | |
except (KeyError, RuntimeError) as e: | |
message: str = f""" | |
Runtime or KeyError | |
Unable to download eod data for {_stock.name}, | |
method : download_eod_and_store | |
debug: {str(e)} | |
stock_code: {_stock.code} | |
""" | |
self._logger.error(message) | |
return | |
except (ReadTimeout, JSONDecodeError, ConnectionError) as e: | |
message: str = f""" | |
Remote or Response Error | |
Unable to connect or getting bad response for {_stock.name}, | |
method : download_eod_and_store | |
debug: {str(e)} | |
stock_code: {_stock.code} | |
""" | |
self._logger.error(message) | |
return | |
_data['date'] = pd.to_datetime(_data.index).date | |
map_partial = functools.partial(self.map_eod_data, stock=_stock) | |
eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()] | |
_eod_instances = await asyncio.gather(*eod_instances_list) | |
_put_tasks = [] | |
eod_instances = list(_eod_instances) | |
for i in range(0, len(eod_instances), self._limit): | |
short_eod_instance_list: list[EODData] = eod_instances[i: i + self._limit] | |
_put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session)) | |
self._count += len(short_eod_instance_list) | |
await asyncio.gather(*_put_tasks) | |
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}" | |
config_instance().DEBUG and self._logger.info(mess) |
import aiohttp
(1) Use a connection pool for aiohttp:
connector = aiohttp.TCPConnector(limit=10)
async def download_eod_data(self, stock_code, period, session):
url = f"https://query1.finance.yahoo.com/v7/finance/download/{stock_code}?start={period[0]}&end={period[1]}&interval={self._interval.interval}"
try:
async with session.get(url, timeout=15) as resp:
resp.raise_for_status()
csv_data = await resp.text()
return pd.read_csv(io.StringIO(csv_data), index_col=0, parse_dates=True)
except aiohttp.ClientError as e:
return None, e
except Exception as e:
return None, e
async def process_and_store_data(self, _stock, _data, session):
if _data is None:
return
_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)
eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
_eod_instances = await asyncio.gather(*eod_instances_list)
_put_tasks = []
eod_instances = list(_eod_instances)
# (2) Adjust the chunk size for processing:
chunk_size = 50 # Adjust this value according to your needs
for i in range(0, len(eod_instances), chunk_size):
short_eod_instance_list: list[EODData] = eod_instances[i: i + chunk_size]
_put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
self._count += len(short_eod_instance_list)
# (3) Increase the number of concurrent tasks:
concurrency_limit = 5 # Adjust this value according to your needs
for i in range(0, len(_put_tasks), concurrency_limit):
await asyncio.gather(*_put_tasks[i: i + concurrency_limit])
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
config_instance().DEBUG and self._logger.info(mess)
async def download_eod_and_store(self, _stock, period, session):
_data, error = await self.download_eod_data(_stock.code, period, session)
if _data is None:
self._logger.error(f"Unable to download or process EOD data for {_stock.name}: {error}")
return
with self.lock:
await self.process_and_store_data(_stock, _data, session)
Call the main function with the aiohttp session
async def main():
async with aiohttp.ClientSession(connector=connector) as session:
await download_eod_and_store(stock, period, session)
await main()
This part does the downloading
data = yf.download(_stock.code, start=period[0], end=period[1], interval=self._interval.interval,
ignore_tz=True, repair=True, timeout=15)
So what ChatGPT could not figure out is that download is already separate from this method
all this is just processing
`
_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)
eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
_eod_instances = await asyncio.gather(*eod_instances_list)
_put_tasks = []
eod_instances = list(_eod_instances)
for i in range(0, len(eod_instances), self._limit):
short_eod_instance_list: list[EODData] = eod_instances[i: i + self._limit]
_put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
self._count += len(short_eod_instance_list)
await asyncio.gather(*_put_tasks)
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
config_instance().DEBUG and self._logger.info(mess)
`
it made new functions for each operation. Did it speed it up any? How do you run the function?
import aiohttp
async def download_eod_data(self, stock_code, period):
url = f"https://query1.finance.yahoo.com/v7/finance/download/{stock_code}?start={period[0]}&end={period[1]}&interval={self._interval.interval}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=15) as resp:
resp.raise_for_status()
csv_data = await resp.text()
return pd.read_csv(io.StringIO(csv_data), index_col=0, parse_dates=True)
except aiohttp.ClientError as e:
return None, e
except Exception as e:
return None, e
async def process_and_store_data(self, _stock, _data, session):
if _data is None:
return
async def download_eod_and_store(self, _stock, period, session):
_data, error = await self.download_eod_data(_stock.code, period)