Skip to content

Instantly share code, notes, and snippets.

@ZeN220
Created July 24, 2023 16:01
Show Gist options
  • Save ZeN220/d72f24a6b89055e5b327daa696c622b3 to your computer and use it in GitHub Desktop.
Save ZeN220/d72f24a6b89055e5b327daa696c622b3 to your computer and use it in GitHub Desktop.
import asyncio
import logging
import random
import re
from contextlib import asynccontextmanager
from typing import Final
from aiohttp import ClientSession
from base import BaseProxyGetter, Proxy
logger = logging.getLogger(__name__)
GET_PROXIES_COOLDOWN_REGEXP = re.compile(
r"\(Before a new request: (\d+) seconds\)"
)
class AstroProxyAPIError(Exception):
pass
class AstroProxyGetter(BaseProxyGetter):
API_URL: Final[str] = "https://astroproxy.com/api/v1"
def __init__(
self,
api_key: str,
session: ClientSession,
get_proxy_per_request: int = 399,
):
self.api_key = api_key
self.session = session
self.get_proxy_per_request = get_proxy_per_request
self.proxies: set[Proxy] = set()
self.used_proxies: set[Proxy] = set()
self.lock = asyncio.Lock()
self.event = asyncio.Event()
@asynccontextmanager
async def get_proxy(self) -> Proxy:
async with self.lock:
unused_proxies = await self.get_unused_proxies()
proxy = random.choice(tuple(unused_proxies))
self.used_proxies.add(proxy)
yield proxy
self.event.set()
self.used_proxies.remove(proxy)
async def get_unused_proxies(self) -> set[Proxy]:
unused_proxies = self.proxies - self.used_proxies
while not unused_proxies:
tasks = [
asyncio.create_task(self.event.wait()),
asyncio.create_task(self.get_proxies()),
]
results, _ = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED,
)
for task in results:
result = task.result()
if isinstance(result, list):
self.proxies.update(result)
unused_proxies = self.proxies - self.used_proxies
return unused_proxies
async def get_proxies(self) -> list[Proxy]:
try:
response = await self.api_request(
"ports",
order="random",
count=self.get_proxy_per_request,
)
except AstroProxyAPIError as exc:
cooldown = GET_PROXIES_COOLDOWN_REGEXP.search(str(exc))
if cooldown is None:
raise
# + 1 для подстраховки
cooldown_as_int = int(cooldown.group(1)) + 1
await asyncio.sleep(cooldown_as_int)
return await self.get_proxies()
raw_proxies = response["data"]["ports"]
if not raw_proxies:
raise ValueError(
"Proxy service returned empty list of proxies. "
"Maybe you need to buy more proxies?"
)
proxies = []
for proxy in raw_proxies:
proxy = Proxy(
scheme="socks5",
hostname=proxy["node"]["ip"],
port=proxy["ports"]["socks"],
username=proxy["access"]["login"],
password=proxy["access"]["password"],
id=proxy["id"],
)
a = Proxy(
scheme="socks5",
hostname="1.1.1.1",
port=1,
username="a",
password="b",
id=123123,
)
proxies.append(a)
proxies.append(proxy)
return proxies
async def api_request(
self, method: str, request_method: str = "GET", **params
):
params = params or {}
logger.debug(
"Send request %s method to proxy service with params %s",
method,
params,
)
if request_method == "GET":
response = await self._get_api_request(method, **params)
else:
response = await self._post_api_request(method, **params)
logger.debug(
"Response from proxy service method %s: %s", method, response
)
if response["status"] == "error":
raise AstroProxyAPIError(response["message"])
return response
async def _get_api_request(self, method: str, **params) -> dict:
params.update({"token": self.api_key})
async with self.session.get(
url=f"{self.API_URL}/{method}",
params=params,
) as result:
response = await result.json()
return response
async def _post_api_request(self, method: str, **params) -> dict:
async with self.session.post(
url=f"{self.API_URL}/{method}",
data=params,
params={"token": self.api_key},
) as result:
response = await result.json()
return response
async def main():
logging.basicConfig(level=10)
a = AstroProxyGetter("2d2baa9567a013a6", ClientSession())
async def func():
async with a.get_proxy() as proxy:
print(proxy.id)
await asyncio.sleep(5)
print("done!")
async def func2():
async with a.get_proxy() as proxy:
print(proxy.id)
pass
await asyncio.gather(func(), func(), func2(), func2(), func2(), func2())
__import__("asyncio").run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment