Skip to content

Instantly share code, notes, and snippets.

@thehesiod
Created January 10, 2018 06:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thehesiod/6bb9ec91d1334b019a0a0e024f2351f4 to your computer and use it in GitHub Desktop.
Save thehesiod/6bb9ec91d1334b019a0a0e024f2351f4 to your computer and use it in GitHub Desktop.
test_script.py
#!/usr/local/bin/python3
from aiohttp import (ClientSession, TCPConnector, BasicAuth)
from asyncio import get_event_loop
from async_timeout import timeout as aio_timeout
from ssl import create_default_context
from yarl import URL
# allows us to customize the session with cert files
def setup_session(self, path_to_cert=None, custom_headers=None, login=None, close=False):
if path_to_cert:
try:
# with open | no need for importing `os` to do path.exists
with open(path_to_cert) as cert:
cert.read()
sslcontext = create_default_context(cafile=path_to_cert)
conn = TCPConnector(ssl_context=sslcontext)
except:
return 'Cert file not Found!'
else:
conn = TCPConnector(verify_ssl=False)
if login:
session = ClientSession(connector=conn, headers=custom_headers, auth=BasicAuth(login['login'], login['password']))
else:
session = ClientSession(connector=conn, headers=custom_headers)
return session
class Requests():
"""
@param `proxy`: use proxy in session if passed
"""
def __init__(self, session, proxy=None):
self.session = session
self.proxy = proxy
async def do_get(self, url, data=None):
with aio_timeout(10):
async with self.session.get(url, proxy=self.proxy, params=data) as response:
j_resp = await response.json()
return j_resp
async def main():
async with setup_session() as session:
R = Requests(session)
for _ in range(0, 10):
await R.do_get('https://api.coinbase.com/v2/exchange-rates?currency=BTC'))
if __name__ == "__main__":
get_event_loop().run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment