Skip to content

Instantly share code, notes, and snippets.

@escapewindow
Last active March 30, 2016 00:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save escapewindow/e9c6ded64196f40ffa28 to your computer and use it in GitHub Desktop.
Save escapewindow/e9c6ded64196f40ffa28 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function, \
unicode_literals
import asyncio
import aiohttp
from contextlib import contextmanager
import taskcluster.utils as utils
import taskcluster.baseclient as baseclient
@contextmanager
def getSession(session=None):
if session:
yield session
else:
with aiohttp.ClientSession() as session:
yield session
async def testAuth(name, url, headers, payload, session=None):
retries = 0
with getSession(session) as session:
while retries < 5:
retries += 1
print("{} {}".format(name, retries))
try:
async with session.post(url, headers=headers, data=payload) as resp:
print("{}: {}".format(name, resp.status))
if resp.status == 200:
print(await resp.text())
return resp
except aiohttp.ClientError as exc:
if retries < 5:
print("%s retrying: %s" % (name, str(exc)))
asyncio.sleep(1)
else:
pass
print("{} incomplete!".format(name))
url = "https://auth.taskcluster.net/v1/test-authenticate"
payload = utils.dumpJson({
'clientScopes': ['test:a'],
'requiredScopes': ['test:a'],
})
bc = baseclient.BaseClient({
'credentials': {'clientId': 'tester', 'accessToken': 'no-secret'}
})
hawkExt = bc.makeHawkExt()
headers = bc.makeHeaders('post', url, payload, hawkExt)
print(headers)
headers['Content-Type'] = 'application/json'
async def main():
return await asyncio.wait([
testAuth("A", url, headers, payload),
testAuth("B", "https://invalid-domain.invalid-tld!!!", headers, payload),
])
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main())
loop.close()
print(result)
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function, \
unicode_literals
import asyncio
import aiohttp
from contextlib import contextmanager
import taskcluster.asyncutils as asyncutils
import taskcluster.utils as utils
from taskcluster.async import Auth
url = "https://auth.taskcluster.net/v1/test-authenticate"
payload = utils.dumpJson({
'clientScopes': ['test:a'],
'requiredScopes': ['test:a'],
})
bc = Auth({
'credentials': {'clientId': 'tester', 'accessToken': 'no-secret'}
})
hawkExt = bc.makeHawkExt()
headers = bc.makeHeaders('post', url, payload, hawkExt)
async def _a():
r = await bc._makeHttpRequest('post', 'https://auth.taskcluster.net/v1/test-authenticate', payload, headers)
return r
resp = asyncio.get_event_loop().run_until_complete(_a())
print(resp)
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function, \
unicode_literals
import asyncio
import aiohttp
from contextlib import contextmanager
import taskcluster.asyncutils as asyncutils
import taskcluster.utils as utils
from taskcluster.async import Auth
url = "https://auth.taskcluster.net/v1/test-authenticate"
payload = utils.dumpJson({
'clientScopes': ['test:a'],
'requiredScopes': ['test:a'],
})
bc = Auth({
'credentials': {'clientId': 'tester', 'accessToken': 'no-secret'}
})
hawkExt = bc.makeHawkExt()
headers = bc.makeHeaders('post', url, payload, hawkExt)
session = aiohttp.ClientSession()
async def _a():
r = await asyncutils.makeHttpRequest('post', 'https://auth.taskcluster.net/v1/test-authenticate', payload, headers, session=session)
raw = await r.text()
print(raw)
return await r.json()
resp = asyncio.get_event_loop().run_until_complete(_a())
print(resp)
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function, \
unicode_literals
import asyncio
import aiohttp
from contextlib import contextmanager
import taskcluster.asyncutils as asyncutils
import taskcluster.utils as utils
from taskcluster.async import Auth
url = "https://auth.taskcluster.net/v1/test-authenticate"
payload = utils.dumpJson({
'clientScopes': ['test:a'],
'requiredScopes': ['test:a'],
})
bc = Auth({
'credentials': {'clientId': 'tester', 'accessToken': 'no-secret'}
})
hawkExt = bc.makeHawkExt()
headers = bc.makeHeaders('post', url, payload, hawkExt)
session = aiohttp.ClientSession()
async def _a():
r = await asyncutils.makeSingleHttpRequest('post', 'https://auth.taskcluster.net/v1/test-authenticate', payload, headers, session=session)
raw = await r.text()
print(raw)
return await r.json()
resp = asyncio.get_event_loop().run_until_complete(_a())
print(resp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment