Skip to content

Instantly share code, notes, and snippets.

@lsferreira42
Created December 18, 2021 03:17
Show Gist options
  • Save lsferreira42/249e818d8a8e41592fd2ef14a9d8bce7 to your computer and use it in GitHub Desktop.
Save lsferreira42/249e818d8a8e41592fd2ef14a9d8bce7 to your computer and use it in GitHub Desktop.
Threaded socket testing tool
from collections import deque
from queue import Queue
from threading import Thread
import threading
from time import sleep
from collections import Counter
import os
import socket
import sys
import argparse
import resource
#import requests
NUM_RUNNING = 0
NUM_SUCCESS = 0
LIST_FAIL = []
sem = threading.Semaphore()
NUM_THREADS = 2000
SOCKET_TIMEOUT = 30
DEBUG = os.environ.get('DEBUG', False)
class Worker(Thread):
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as error:
print("Error: ", error)
finally:
self.tasks.task_done()
class ThreadPool(object):
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
self.tasks.put((func, args, kargs))
def wait_completion(self):
self.tasks.join()
def print_debug(msg):
if DEBUG:
print("\r\n\nDEBUG: {}\n".format(msg))
def connect_socket(host, port, timeout, path, message="\0", sleep_before=0, sleep_after=0):
global NUM_RUNNING
global NUM_SUCCESS
global LIST_FAIL
global sem
sem.acquire()
NUM_RUNNING = NUM_RUNNING + 1
print('\r' + 'Url: {} - Running: {} - Errors: {} - Ok: {}'.format(host, NUM_RUNNING, len(LIST_FAIL), NUM_SUCCESS), end='')
sem.release()
sleep(sleep_before)
try:
socket.setdefaulttimeout(timeout)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host , port))
get_request = message.format(path, host)
get_request = bytes(get_request, 'utf-8')
s.sendall(get_request)
ret = s.recv(128)
print_debug(ret)
sleep(sleep_after)
s.close()
NUM_SUCCESS = NUM_SUCCESS + 1
except Exception as error:
LIST_FAIL.append(str(error))
return
def connect_http(url, timeout):
global NUM_RUNNING
global NUM_SUCCESS
global LIST_FAIL
global sem
sem.acquire()
NUM_RUNNING = NUM_RUNNING + 1
print('\r' + 'Url: {} - Running: {} - Errors: {} - Ok: {}'.format(url, NUM_RUNNING, len(LIST_FAIL), NUM_SUCCESS), end='')
sem.release()
sleep(5)
try:
requests.get(url, timeout=timeout)
return True
except Exception as error:
return False
def get_args():
parser = argparse.ArgumentParser(description='Para o modo de debug, sete a variavel de ambiente DEBUG=True')
parser.add_argument("-n", "--num-threads",
required=False,
default=50,
type=int,
help="Numero de threads")
parser.add_argument("-t", "--timeout",
required=False,
default=30.0,
type=float,
help="Timeout do socket")
parser.add_argument("-r", "--requests",
required=False,
default=50,
type=int,
help="Numero de requests")
parser.add_argument("-p", "--port",
required=False,
default=80,
type=int,
help="Porta do socket")
parser.add_argument("-u", "--url",
required=True,
type=str,
help="Url do socket")
parser.add_argument("-P", "--path",
required=False,
type=str,
default="/",
help="Path da requisicaoo http")
parser.add_argument("-m", "--message",
required=False,
type=str,
default="\0",
help="Mensagem para ser enviada no socket")
parser.add_argument("-s", "--sleep-before",
required=False,
type=int,
default=0,
help="Tempo de sleep antes de enviar a requisicao")
parser.add_argument("-S", "--sleep-after",
required=False,
type=int,
default=0,
help="Tempo de sleep depois de enviar a requisicao")
return parser.parse_args()
def main():
args = get_args()
if args.requests < args.num_threads:
args.num_threads = args.requests
if args.requests < 1:
print("O numero de requests deve ser maior que 0")
exit(1)
if args.num_threads < 1:
print("O numero de threads deve ser maior que 0")
exit(1)
# fix rlimits
limit_procs = args.num_threads
resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536))
resource.setrlimit(resource.RLIMIT_NPROC, (limit_procs, limit_procs))
pool = ThreadPool(args.num_threads)
for i in range(args.requests):
pool.add_task(connect_socket, args.url, args.port, float(args.timeout), args.path, args.message, args.sleep_before, args.sleep_after)
pool.wait_completion()
if DEBUG:
print("DEBUG:\n")
print("Erros: {}".format(LIST_FAIL))
print("Ok: {}".format(NUM_SUCCESS))
print("Total: {}\n\n".format(NUM_SUCCESS + len(LIST_FAIL)))
print("Args: {}\n\n".format(args))
print('\r' + 'Url: {} - Running: {} - Errors: {} - Ok: {}'.format(args.url, NUM_RUNNING, len(LIST_FAIL), NUM_SUCCESS))
if len(LIST_FAIL) > 0:
print("\nErrors: {}".format(Counter(LIST_FAIL)))
if __name__ == "__main__":
sys.exit(
main()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment