Skip to content

Instantly share code, notes, and snippets.

@xmonader
Created October 23, 2017 21:34
Show Gist options
  • Save xmonader/5a1097a0193f7be8691248fc8f768605 to your computer and use it in GitHub Desktop.
Save xmonader/5a1097a0193f7be8691248fc8f768605 to your computer and use it in GitHub Desktop.
asyncsess.py
import requests
import threading
from concurrent.futures.thread import ThreadPoolExecutor
from queue import Queue
import asyncio
import aiohttp
import time
links = ["https://google.com", "https://www.amazon.com/",
"https://microsoft.com", "https://shoudlnt.exist"]
def timeit(f):
def wrapper(*args, **kw):
ts = time.time()
result = f(*args, **kw)
te = time.time()
print('function {f} took {timediff} sec'.format(
f=str(f), timediff=(te - ts)))
return result
return wrapper
@timeit
def seqchecker():
def checklink(link):
try:
res = requests.get(link)
if res.status_code == 200:
return link, True
except:
return link, False
def checkmany(links):
for link in links:
print(checklink(link))
checkmany(links)
@timeit
def threadedchecker():
q = Queue()
def checklink(link):
try:
res = requests.get(link)
if res.status_code == 200:
q.put((link, True))
except Exception as e:
q.put((link, False))
def checkmany(links):
with ThreadPoolExecutor(4) as executor:
for link in links:
executor.submit(checklink, link)
while not q.empty():
print(q.get())
checkmany(links)
@timeit
def asynclinkschecker():
loop = asyncio.get_event_loop()
async def checklink(link):
async with aiohttp.ClientSession() as s:
try:
res = await s.get(link)
if res.status == 200:
return link, True
except:
return link, False
tasks = []
for link in links:
tasks.append(asyncio.ensure_future(checklink(link)))
gatherfuture = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(gatherfuture)
print(gatherfuture.result())
seqchecker()
threadedchecker()
asynclinkschecker()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment