Last active
March 30, 2016 00:16
-
-
Save escapewindow/e9c6ded64196f40ffa28 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import absolute_import, division, print_function, \ | |
unicode_literals | |
import asyncio | |
import aiohttp | |
from contextlib import contextmanager | |
import taskcluster.utils as utils | |
import taskcluster.baseclient as baseclient | |
@contextmanager | |
def getSession(session=None): | |
if session: | |
yield session | |
else: | |
with aiohttp.ClientSession() as session: | |
yield session | |
async def testAuth(name, url, headers, payload, session=None): | |
retries = 0 | |
with getSession(session) as session: | |
while retries < 5: | |
retries += 1 | |
print("{} {}".format(name, retries)) | |
try: | |
async with session.post(url, headers=headers, data=payload) as resp: | |
print("{}: {}".format(name, resp.status)) | |
if resp.status == 200: | |
print(await resp.text()) | |
return resp | |
except aiohttp.ClientError as exc: | |
if retries < 5: | |
print("%s retrying: %s" % (name, str(exc))) | |
asyncio.sleep(1) | |
else: | |
pass | |
print("{} incomplete!".format(name)) | |
url = "https://auth.taskcluster.net/v1/test-authenticate" | |
payload = utils.dumpJson({ | |
'clientScopes': ['test:a'], | |
'requiredScopes': ['test:a'], | |
}) | |
bc = baseclient.BaseClient({ | |
'credentials': {'clientId': 'tester', 'accessToken': 'no-secret'} | |
}) | |
hawkExt = bc.makeHawkExt() | |
headers = bc.makeHeaders('post', url, payload, hawkExt) | |
print(headers) | |
headers['Content-Type'] = 'application/json' | |
async def main(): | |
return await asyncio.wait([ | |
testAuth("A", url, headers, payload), | |
testAuth("B", "https://invalid-domain.invalid-tld!!!", headers, payload), | |
]) | |
loop = asyncio.get_event_loop() | |
result = loop.run_until_complete(main()) | |
loop.close() | |
print(result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import absolute_import, division, print_function, \ | |
unicode_literals | |
import asyncio | |
import aiohttp | |
from contextlib import contextmanager | |
import taskcluster.asyncutils as asyncutils | |
import taskcluster.utils as utils | |
from taskcluster.async import Auth | |
url = "https://auth.taskcluster.net/v1/test-authenticate" | |
payload = utils.dumpJson({ | |
'clientScopes': ['test:a'], | |
'requiredScopes': ['test:a'], | |
}) | |
bc = Auth({ | |
'credentials': {'clientId': 'tester', 'accessToken': 'no-secret'} | |
}) | |
hawkExt = bc.makeHawkExt() | |
headers = bc.makeHeaders('post', url, payload, hawkExt) | |
async def _a(): | |
r = await bc._makeHttpRequest('post', 'https://auth.taskcluster.net/v1/test-authenticate', payload, headers) | |
return r | |
resp = asyncio.get_event_loop().run_until_complete(_a()) | |
print(resp) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import absolute_import, division, print_function, \ | |
unicode_literals | |
import asyncio | |
import aiohttp | |
from contextlib import contextmanager | |
import taskcluster.asyncutils as asyncutils | |
import taskcluster.utils as utils | |
from taskcluster.async import Auth | |
url = "https://auth.taskcluster.net/v1/test-authenticate" | |
payload = utils.dumpJson({ | |
'clientScopes': ['test:a'], | |
'requiredScopes': ['test:a'], | |
}) | |
bc = Auth({ | |
'credentials': {'clientId': 'tester', 'accessToken': 'no-secret'} | |
}) | |
hawkExt = bc.makeHawkExt() | |
headers = bc.makeHeaders('post', url, payload, hawkExt) | |
session = aiohttp.ClientSession() | |
async def _a(): | |
r = await asyncutils.makeHttpRequest('post', 'https://auth.taskcluster.net/v1/test-authenticate', payload, headers, session=session) | |
raw = await r.text() | |
print(raw) | |
return await r.json() | |
resp = asyncio.get_event_loop().run_until_complete(_a()) | |
print(resp) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import absolute_import, division, print_function, \ | |
unicode_literals | |
import asyncio | |
import aiohttp | |
from contextlib import contextmanager | |
import taskcluster.asyncutils as asyncutils | |
import taskcluster.utils as utils | |
from taskcluster.async import Auth | |
url = "https://auth.taskcluster.net/v1/test-authenticate" | |
payload = utils.dumpJson({ | |
'clientScopes': ['test:a'], | |
'requiredScopes': ['test:a'], | |
}) | |
bc = Auth({ | |
'credentials': {'clientId': 'tester', 'accessToken': 'no-secret'} | |
}) | |
hawkExt = bc.makeHawkExt() | |
headers = bc.makeHeaders('post', url, payload, hawkExt) | |
session = aiohttp.ClientSession() | |
async def _a(): | |
r = await asyncutils.makeSingleHttpRequest('post', 'https://auth.taskcluster.net/v1/test-authenticate', payload, headers, session=session) | |
raw = await r.text() | |
print(raw) | |
return await r.json() | |
resp = asyncio.get_event_loop().run_until_complete(_a()) | |
print(resp) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment