Skip to content

Instantly share code, notes, and snippets.

@freelancing-solutions
Created March 24, 2023 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save freelancing-solutions/78932c70c737dfc9950fb9808675ebff to your computer and use it in GitHub Desktop.
Save freelancing-solutions/78932c70c737dfc9950fb9808675ebff to your computer and use it in GitHub Desktop.
code snippet
async def download_eod_and_store(self, _stock, period, session):
"""
**download_and_store**
download eod data and then store
:param _stock:
:param period:
:param session:
:return:
"""
_data = None
with self.lock:
try:
_data = yf.download(_stock.code, start=period[0], end=period[1], interval=self._interval.interval,
ignore_tz=True, repair=True, timeout=15)
except (KeyError, RuntimeError) as e:
message: str = f"""
Runtime or KeyError
Unable to download eod data for {_stock.name},
method : download_eod_and_store
debug: {str(e)}
stock_code: {_stock.code}
"""
self._logger.error(message)
return
except (ReadTimeout, JSONDecodeError, ConnectionError) as e:
message: str = f"""
Remote or Response Error
Unable to connect or getting bad response for {_stock.name},
method : download_eod_and_store
debug: {str(e)}
stock_code: {_stock.code}
"""
self._logger.error(message)
return
_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)
eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
_eod_instances = await asyncio.gather(*eod_instances_list)
_put_tasks = []
eod_instances = list(_eod_instances)
for i in range(0, len(eod_instances), self._limit):
short_eod_instance_list: list[EODData] = eod_instances[i: i + self._limit]
_put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
self._count += len(short_eod_instance_list)
await asyncio.gather(*_put_tasks)
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
config_instance().DEBUG and self._logger.info(mess)
@bonosa
Copy link

bonosa commented Mar 24, 2023

import aiohttp

async def download_eod_data(self, stock_code, period):
url = f"https://query1.finance.yahoo.com/v7/finance/download/{stock_code}?start={period[0]}&end={period[1]}&interval={self._interval.interval}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=15) as resp:
resp.raise_for_status()
csv_data = await resp.text()
return pd.read_csv(io.StringIO(csv_data), index_col=0, parse_dates=True)
except aiohttp.ClientError as e:
return None, e
except Exception as e:
return None, e

async def process_and_store_data(self, _stock, _data, session):
if _data is None:
return

_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)

eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
_eod_instances = await asyncio.gather(*eod_instances_list)
_put_tasks = []
eod_instances = list(_eod_instances)

for i in range(0, len(eod_instances), self._limit):
    short_eod_instance_list: list[EODData] = eod_instances[i: i + self._limit]
    _put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
    self._count += len(short_eod_instance_list)

await asyncio.gather(*_put_tasks)
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
config_instance().DEBUG and self._logger.info(mess)

async def download_eod_and_store(self, _stock, period, session):
_data, error = await self.download_eod_data(_stock.code, period)

if _data is None:
    self._logger.error(f"Unable to download or process EOD data for {_stock.name}: {error}")
    return

with self.lock:
    await self.process_and_store_data(_stock, _data, session)

@bonosa
Copy link

bonosa commented Mar 24, 2023

To speed up the execution of the download_eod_and_store function, you can consider the following optimizations:

Move the downloading and processing of data into separate functions so that they can be run concurrently.
Use asynchronous libraries for downloading data, such as aiohttp.
Parallelize the data processing by using asyncio.gather more effectively.
Here's the optimized version of your code:

see code above

@bonosa
Copy link

bonosa commented Mar 24, 2023

This optimized version separates downloading and processing tasks, uses the aiohttp library for async downloading, and ensures that data processing is done concurrently using asyncio.gather.

@bonosa
Copy link

bonosa commented Mar 24, 2023

import aiohttp

async def download_eod_data(self, stock_code, period):
url = f"https://query1.finance.yahoo.com/v7/finance/download/{stock_code}?start={period[0]}&end={period[1]}&interval={self._interval.interval}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=15) as resp:
resp.raise_for_status()
csv_data = await resp.text()
return pd.read_csv(io.StringIO(csv_data), index_col=0, parse_dates=True)
except aiohttp.ClientError as e:
return None, e
except Exception as e:
return None, e

async def process_and_store_data(self, _stock, _data, session):
if _data is None:
return

_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)

eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
_eod_instances = await asyncio.gather(*eod_instances_list)
_put_tasks = []
eod_instances = list(_eod_instances)

for i in range(0, len(eod_instances), self._limit):
    short_eod_instance_list: list[EODData] = eod_instances[i: i + self._limit]
    _put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
    self._count += len(short_eod_instance_list)

await asyncio.gather(*_put_tasks)
mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
config_instance().DEBUG and self._logger.info(mess)

async def download_eod_and_store(self, _stock, period, session):
_data, error = await self.download_eod_data(_stock.code, period)

if _data is None:
    self._logger.error(f"Unable to download or process EOD data for {_stock.name}: {error}")
    return

with self.lock:
    await self.process_and_store_data(_stock, _data, session)

@bonosa
Copy link

bonosa commented Mar 24, 2023

import aiohttp

(1) Use a connection pool for aiohttp:

connector = aiohttp.TCPConnector(limit=10)

async def download_eod_data(self, stock_code, period, session):
url = f"https://query1.finance.yahoo.com/v7/finance/download/{stock_code}?start={period[0]}&end={period[1]}&interval={self._interval.interval}"
try:
async with session.get(url, timeout=15) as resp:
resp.raise_for_status()
csv_data = await resp.text()
return pd.read_csv(io.StringIO(csv_data), index_col=0, parse_dates=True)
except aiohttp.ClientError as e:
return None, e
except Exception as e:
return None, e

async def process_and_store_data(self, _stock, _data, session):
if _data is None:
return

_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)

eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
_eod_instances = await asyncio.gather(*eod_instances_list)
_put_tasks = []
eod_instances = list(_eod_instances)

# (2) Adjust the chunk size for processing:
chunk_size = 50  # Adjust this value according to your needs

for i in range(0, len(eod_instances), chunk_size):
    short_eod_instance_list: list[EODData] = eod_instances[i: i + chunk_size]
    _put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
    self._count += len(short_eod_instance_list)

# (3) Increase the number of concurrent tasks:
concurrency_limit = 5  # Adjust this value according to your needs
for i in range(0, len(_put_tasks), concurrency_limit):
    await asyncio.gather(*_put_tasks[i: i + concurrency_limit])

mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
config_instance().DEBUG and self._logger.info(mess)

async def download_eod_and_store(self, _stock, period, session):
_data, error = await self.download_eod_data(_stock.code, period, session)

if _data is None:
    self._logger.error(f"Unable to download or process EOD data for {_stock.name}: {error}")
    return

with self.lock:
    await self.process_and_store_data(_stock, _data, session)

Call the main function with the aiohttp session

async def main():
async with aiohttp.ClientSession(connector=connector) as session:
await download_eod_and_store(stock, period, session)

await main()

@freelancing-solutions
Copy link
Author

This part does the downloading

data = yf.download(_stock.code, start=period[0], end=period[1], interval=self._interval.interval,
                                    ignore_tz=True, repair=True, timeout=15)

So what ChatGPT could not figure out is that download is already separate from this method

all this is just processing

`
_data['date'] = pd.to_datetime(_data.index).date
map_partial = functools.partial(self.map_eod_data, stock=_stock)

    eod_instances_list = [map_partial(line=_line) for _line in _data.itertuples()]
    _eod_instances = await asyncio.gather(*eod_instances_list)
    _put_tasks = []
    eod_instances = list(_eod_instances)

    for i in range(0, len(eod_instances), self._limit):
        short_eod_instance_list: list[EODData] = eod_instances[i: i + self._limit]
        _put_tasks.append(EODData.save_all(instance_list=short_eod_instance_list, session=session))
        self._count += len(short_eod_instance_list)

    await asyncio.gather(*_put_tasks)
    mess: str = f"Added a total of : {self._count} EOD Data for the period of {period}"
    config_instance().DEBUG and self._logger.info(mess)

`

@bonosa
Copy link

bonosa commented Mar 24, 2023

it made new functions for each operation. Did it speed it up any? How do you run the function?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment