Skip to content

Instantly share code, notes, and snippets.

@winiar93
Created March 23, 2023 09:23
Show Gist options
  • Save winiar93/8058bb948d0241f7de8158ae6644cdba to your computer and use it in GitHub Desktop.
Save winiar93/8058bb948d0241f7de8158ae6644cdba to your computer and use it in GitHub Desktop.
An example script that uses the POST HTTP method to send payloads in asynchronous way.
import asyncio
import json
import aiohttp
from aiohttp import ClientConnectorError, ClientSession
payload_contact = {
"email": "example0@mail.pl",
"hubspot_id": 11111,
"first_name": "test",
"last_name": "test",
"phone_mobile": 333445555,
}
host = "https://...."
url = f"{host}/api/v1/...."
async def fetch_html(url: str, session: ClientSession, data):
credentials = {
"username": "test",
"password": "test",
"api_key": "test",
}
try:
headers = {
"accept": "application/json",
"Content-Type": "application/json",
"apikey": credentials["api_key"],
}
auth = aiohttp.BasicAuth(credentials["username"], credentials["password"])
resp = await session.request(
method="POST",
url=url,
json=data,
headers=headers,
auth=auth,
verify_ssl=False,
)
except Exception as e:
return str(e)
return [url, resp.status, await resp.text()]
async def make_requests(url, payload_list, **kwargs):
async with ClientSession() as session:
tasks = []
for payload in payload_list:
tasks.append(fetch_html(url=url, session=session, data=payload))
results = await asyncio.gather(*tasks)
for result in results:
print(result)
if __name__ == "__main__":
# Genereate testing payloads
payload_list = [payload_contact for x in range(20)]
asyncio.run(make_requests(url=url, payload_list=payload_list))
@eSlider
Copy link

eSlider commented Mar 31, 2023

@winiar93, thank you for sharing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment