Created
December 16, 2020 18:23
-
-
Save saltukalakus/0c02ede981e66f1b0f4f411dee42395e to your computer and use it in GitHub Desktop.
Test script to prevent timeouts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import time | |
import requests | |
import socket | |
from datetime import datetime | |
from functools import lru_cache | |
from urllib3.connection import HTTPConnection | |
class KeepaliveAdapter(requests.adapters.HTTPAdapter): | |
def init_poolmanager(self, *args, **kwargs): | |
kwargs['socket_options'] = HTTPConnection.default_socket_options + [ | |
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), | |
] | |
super(KeepaliveAdapter, self).init_poolmanager(*args, **kwargs) | |
class ApiClient: | |
def __init__(self, useKeepalive=False): | |
self.s = requests.Session() | |
if useKeepalive: | |
print('using keepalive adapter') | |
print('check value of sysctl net.ipv4.tcp_keepalive_time as well?') | |
adapter = KeepaliveAdapter() | |
self.s.mount('http://', adapter) | |
self.s.mount('https://', adapter) | |
def get_info(self, domain): | |
return self.s.get(f"https://{domain}/_debug").json() | |
@lru_cache(maxsize=2) | |
def get_client(): | |
return ApiClient(False) | |
def test(): | |
client = get_client() | |
try: | |
print(f'{datetime.now().isoformat()} - start get info') | |
print(client.get_info('tmp-a2drg523.dse.ninja')) | |
print(f'{datetime.now().isoformat()} - end get info') | |
except Exception as e: | |
print(e) | |
for _ in range(10): | |
test() | |
time.sleep(350 + 10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment