Skip to content

Instantly share code, notes, and snippets.

@abduakhatov
Last active December 24, 2022 07:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abduakhatov/ada4fbf8a8a98605a66b55b76f750f08 to your computer and use it in GitHub Desktop.
Save abduakhatov/ada4fbf8a8a98605a66b55b76f750f08 to your computer and use it in GitHub Desktop.
The ticker.py is a python asyncio implementation within a multiprocessing context.

Ticker

The ticker.py is a python asyncio implementation within a multiprocessing context.

Usage

ticker.py [-h] [-m {sync,async}] [-c COUNT] [-p PARALLEL] -t TICKERCONF [-o OUTDIR]

Example

Synchronous mode with one process

python3 ticker.py -t ticker.json -c 100 -p 1
13:54:40 INFO:Ticker: Processing sync 100 tickers
13:54:40 INFO:Ticker: Spawning 1 gatherers...
13:54:40 INFO:Ticker: Ticker-1 processing sync 100 tickers
13:55:10 INFO:Ticker: Done, success: 100/100, failure: 0/100

Synchronous mode with four process

python3 ticker.py -t ticker.json -c 300 -p 4
20:48:47 INFO:Ticker: Processing sync 300 tickers
20:48:47 INFO:Ticker: Spawning 4 gatherers...
20:48:47 INFO:Ticker: Ticker-2 processing sync 75 tickers
20:48:47 INFO:Ticker: Ticker-3 processing sync 75 tickers
20:48:47 INFO:Ticker: Ticker-1 processing sync 76 tickers
20:48:47 INFO:Ticker: Ticker-4 processing sync 74 tickers
20:49:01 INFO:Ticker: Done, success: 300/300, failure: 0/300

Asynchronous mode with two process

python3 ticker.py -t ticker.json -c 300 -p 2 -m async
20:44:22 INFO:Ticker: Processing async 300 tickers
20:44:22 INFO:Ticker: Spawning 2 gatherers...
20:44:22 INFO:Ticker: Ticker-2 processing async 135 tickers
20:44:22 INFO:Ticker: Ticker-1 processing async 165 tickers
20:44:22 INFO:Ticker: Ticker-2 session for 135 tickers
20:44:22 INFO:Ticker: Ticker-1 session for 165 tickers
20:44:24 INFO:Ticker: Done, success: 300/300, failure: 0/300
"""Evaulate Sync and Async with multiprocessing."""
import sys
import argparse
import json
import multiprocessing
import logging
import asyncio
import aiofiles
from aiohttp import ClientSession
import requests
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
level=logging.INFO,
datefmt="%H:%M:%S",
stream=sys.stderr,
)
logger = logging.getLogger("Ticker")
class Ticker(multiprocessing.Process):
''' Ticker class fetches stocker ticker daily data. '''
def __init__(self, task_queue, result_queue,
burl, uparams, odir, mode):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.base_url = burl
self.params = uparams
self.odir = odir
self.mode = mode
async def get(self, ticker: str, session: ClientSession) -> str:
"""Make http GET request to fetch ticker data."""
url = f'{self.base_url}{ticker}'
logger.debug(f'{self.name} http-get for {url}')
resp = await session.get(url, params=self.params)
if resp.status != 200:
logger.error(f'{self.name}:{ticker} failed, status={resp.status}')
return ''
logger.debug(f'Got response [{resp.status}] for URL: {url}')
tdata = await resp.text()
return tdata
async def aioprocess(self, ticker: str, session: ClientSession) -> str:
"""Issue GET for the ticker and write to file."""
logger.debug(f'{self.name} processing_ticker {ticker}')
fname = f'{self.odir}/{ticker}.csv'
res = await self.get(ticker=ticker, session=session)
if not res:
return f'{ticker} fetch failed'
async with aiofiles.open(fname, "a") as f:
await f.write(res)
return f'{ticker} fetch succeeded'
async def asyncio_sessions(self, tickers: list) -> None:
"""Create session to concurrently fetch tickers."""
logger.info(f'{self.name} session for {len(tickers)} tickers')
results = []
async with ClientSession() as session:
tasks = []
for t in tickers:
tasks.append(self.aioprocess(ticker=t, session=session))
results = await asyncio.gather(*tasks)
# send result status
for r in results:
self.result_queue.put(r)
def syncprocess(self, tickers: list) -> None:
for ticker in tickers:
url = f'{self.base_url}{ticker}'
logger.debug(f'{self.name} http-get for {url}')
resp = requests.get(url, params=self.params)
if resp.status_code != requests.codes.ok:
logger.error(f'{self.name}:{ticker} failed, status={resp.status_code}')
self.result_queue.put(f'{ticker} fetch failed')
continue
data = resp.text
fname = f'{self.odir}/{ticker}.csv'
with open(fname, 'w') as file:
file.write(data)
self.result_queue.put(f'{ticker} fetch succeeded')
def run(self):
pname = self.name
tickers = []
# Get all tasks
while True:
t = self.task_queue.get()
if t is None:
logger.debug(f'{pname} Received all allocated tickers')
break
tickers.append(t)
self.task_queue.task_done()
logger.info(f'{pname} processing {self.mode} {len(tickers)} tickers')
# Do sync or async processing
if self.mode == "async":
asyncio.run(self.asyncio_sessions(tickers))
else:
self.syncprocess(tickers)
# Respond to None received in task_queue
self.task_queue.task_done()
def __str__(self):
return 'Ticker %s.' % self.name
def parse_clargs():
''' Command line argument parser. '''
mparser = argparse.ArgumentParser(
description='Evaluate sync vs async multiprocessing')
mparser.add_argument('-m',
'--mode',
action='store',
default='sync',
choices=['sync', 'async'],
help='evaluation mode')
mparser.add_argument('-c',
'--count',
action='store',
type=int,
default=10,
help='count of tickers to fetch')
mparser.add_argument('-p',
'--parallel',
action='store',
type=int,
default=1,
help='multiprocessing count')
mparser.add_argument('-t',
'--tickerconf',
action='store',
type=str,
help='ticker config file',
required=True)
mparser.add_argument('-o',
'--outdir',
action='store',
default='tickerdata',
help='output directory to store downloaded tickers')
return mparser.parse_args()
def main():
'''Main entry function'''
# Parse command line arguments.
args = parse_clargs()
fc = args.count # fetch count
pc = args.parallel # multiprocess count
# From the ticker.conf file
# get tickers list, base_url and url_params
tconf = {}
with open(args.tickerconf, 'r') as f:
tconf = json.load(f)
tlist = tconf['tickers']
burl = tconf['base_url']
uparams = tconf['params']
logger.info(f'Processing {args.mode} {fc} tickers')
# Task queue is used to send the tickers to processes
# Result queue is used to get the result from processes
tq = multiprocessing.JoinableQueue() # task queue
rq = multiprocessing.Queue() # result queue
# spawning multiprocessing limited by the available cores
if pc > multiprocessing.cpu_count():
pc = multiprocessing.cpu_count()
logger.info(f'Spawning {pc} gatherers...')
tickers = [Ticker(tq, rq, burl, uparams,
args.outdir, args.mode) for i in range(pc)]
for ticker in tickers:
ticker.start()
# enqueueing ticker jobs in task_queue
for idx, item in enumerate(tlist):
if idx >= fc:
break
tq.put(item)
# enqueue None in task_queue to indicate completion
for _ in range(pc):
tq.put(None)
tq.join()
failc = sum(1 for i in range(fc) if 'failed' in rq.get())
logger.info(f'Done, success: {fc-failc}/{fc}, failure: {failc}/{fc}')
if __name__ == '__main__':
main()
ticker_json = 'https://raw.githubusercontent.com/nbasker/tools/master/asyncioeval/ticker.json'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment