-
-
Save freelancing-solutions/78932c70c737dfc9950fb9808675ebff to your computer and use it in GitHub Desktop.
async def download_eod_and_store(self, _stock, period, session): | |
""" | |
**download_and_store** | |
download eod data and then store | |
:param _stock: | |
:param period: | |
:param session: | |
:return: | |
""" | |
_data = None | |
with self.lock: | |
try: | |
_data = yf.download(_stock.code, start=period[0], end=period[1], interval=self._interval.interval, | |
ignore_tz=True, repair=True, timeout=15) | |
except (KeyError, RuntimeError) as e: | |
message: str = f""" | |
Runtime or KeyError | |
Unable to download eod data for {_stock.name}, | |
method : download_eod_and_store | |
debug: {str(e)} | |
stock_code: {_stock.code} | |
""" | |
self._logger.error(message) | |
return | |
except (ReadTimeout, JSONDecodeError, ConnectionError) as e: | |
message: str = f""" | |
Remote or Response Error | |
Unable to connect or getting bad response for {_stock.name}, | |
method : download_eod_and_store | |
debug: {str(e)} | |
stock_code: {_stock.code} | |
""" | |
self._logger.error(message) | |
return | |
_data['date'] = pd.to_datetime(_data.index).date | |
map_partial = functools.partial(self.map_eod_data, stock=_stock) | |
eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()] | |
_eod_instances = await asyncio.gather(*eod_instances_list) | |
_put_tasks = [] | |
eod_instances = list(_eod_instances) | |
for i in range(0, len(eod_instances), self._limit): | |
short_eod_instance_list: list[EODData] = eod_instances[i: i + self._limit] | |
_put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session)) | |
self._count += len(short_eod_instance_list) | |
await asyncio.gather(*_put_tasks) | |
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}" | |
config_instance().DEBUG and self._logger.info(mess) |
To speed up the execution of the download_eod_and_store function, you can consider the following optimizations:
Move the downloading and processing of data into separate functions so that they can be run concurrently.
Use asynchronous libraries for downloading data, such as aiohttp.
Parallelize the data processing by using asyncio.gather more effectively.
Here's the optimized version of your code:
see code above
This optimized version separates downloading and processing tasks, uses the aiohttp library for async downloading, and ensures that data processing is done concurrently using asyncio.gather.
import aiohttp
async def download_eod_data(self, stock_code, period):
url = f"https://query1.finance.yahoo.com/v7/finance/download/{stock_code}?start={period[0]}&end={period[1]}&interval={self._interval.interval}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=15) as resp:
resp.raise_for_status()
csv_data = await resp.text()
return pd.read_csv(io.StringIO(csv_data), index_col=0, parse_dates=True)
except aiohttp.ClientError as e:
return None, e
except Exception as e:
return None, e
async def process_and_store_data(self, _stock, _data, session):
if _data is None:
return
_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)
eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
_eod_instances = await asyncio.gather(*eod_instances_list)
_put_tasks = []
eod_instances = list(_eod_instances)
for i in range(0, len(eod_instances), self._limit):
short_eod_instance_list: list[EODData] = eod_instances[i: i + self._limit]
_put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
self._count += len(short_eod_instance_list)
await asyncio.gather(*_put_tasks)
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
config_instance().DEBUG and self._logger.info(mess)
async def download_eod_and_store(self, _stock, period, session):
_data, error = await self.download_eod_data(_stock.code, period)
if _data is None:
self._logger.error(f"Unable to download or process EOD data for {_stock.name}: {error}")
return
with self.lock:
await self.process_and_store_data(_stock, _data, session)
import aiohttp
(1) Use a connection pool for aiohttp:
connector = aiohttp.TCPConnector(limit=10)
async def download_eod_data(self, stock_code, period, session):
url = f"https://query1.finance.yahoo.com/v7/finance/download/{stock_code}?start={period[0]}&end={period[1]}&interval={self._interval.interval}"
try:
async with session.get(url, timeout=15) as resp:
resp.raise_for_status()
csv_data = await resp.text()
return pd.read_csv(io.StringIO(csv_data), index_col=0, parse_dates=True)
except aiohttp.ClientError as e:
return None, e
except Exception as e:
return None, e
async def process_and_store_data(self, _stock, _data, session):
if _data is None:
return
_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)
eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
_eod_instances = await asyncio.gather(*eod_instances_list)
_put_tasks = []
eod_instances = list(_eod_instances)
# (2) Adjust the chunk size for processing:
chunk_size = 50 # Adjust this value according to your needs
for i in range(0, len(eod_instances), chunk_size):
short_eod_instance_list: list[EODData] = eod_instances[i: i + chunk_size]
_put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
self._count += len(short_eod_instance_list)
# (3) Increase the number of concurrent tasks:
concurrency_limit = 5 # Adjust this value according to your needs
for i in range(0, len(_put_tasks), concurrency_limit):
await asyncio.gather(*_put_tasks[i: i + concurrency_limit])
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
config_instance().DEBUG and self._logger.info(mess)
async def download_eod_and_store(self, _stock, period, session):
_data, error = await self.download_eod_data(_stock.code, period, session)
if _data is None:
self._logger.error(f"Unable to download or process EOD data for {_stock.name}: {error}")
return
with self.lock:
await self.process_and_store_data(_stock, _data, session)
Call the main function with the aiohttp session
async def main():
async with aiohttp.ClientSession(connector=connector) as session:
await download_eod_and_store(stock, period, session)
await main()
This part does the downloading
data = yf.download(_stock.code, start=period[0], end=period[1], interval=self._interval.interval,
ignore_tz=True, repair=True, timeout=15)
So what ChatGPT could not figure out is that download is already separate from this method
all this is just processing
`
_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)
eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
_eod_instances = await asyncio.gather(*eod_instances_list)
_put_tasks = []
eod_instances = list(_eod_instances)
for i in range(0, len(eod_instances), self._limit):
short_eod_instance_list: list[EODData] = eod_instances[i: i + self._limit]
_put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
self._count += len(short_eod_instance_list)
await asyncio.gather(*_put_tasks)
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
config_instance().DEBUG and self._logger.info(mess)
`
it made new functions for each operation. Did it speed it up any? How do you run the function?
import aiohttp
async def download_eod_data(self, stock_code, period):
url = f"https://query1.finance.yahoo.com/v7/finance/download/{stock_code}?start={period[0]}&end={period[1]}&interval={self._interval.interval}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=15) as resp:
resp.raise_for_status()
csv_data = await resp.text()
return pd.read_csv(io.StringIO(csv_data), index_col=0, parse_dates=True)
except aiohttp.ClientError as e:
return None, e
except Exception as e:
return None, e
async def process_and_store_data(self, _stock, _data, session):
if _data is None:
return
async def download_eod_and_store(self, _stock, period, session):
_data, error = await self.download_eod_data(_stock.code, period)