-
-
Save Safihre/12f9826631c0aaf47491c118511679c4 to your computer and use it in GitHub Desktop.
Socket test NNTP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import select | |
import ssl | |
import subprocess | |
import socket | |
import time | |
# SETTINGS | |
WITH_SSL = True | |
nr_wrappers = 5 | |
nr_runs = 500 | |
total_data = 783664 | |
recv_size = 16384 # 16384 / 262144 | |
# ------- | |
nserv = [ | |
r"C:\Program Files\NZBGet\nzbget.exe", | |
"--nserv", | |
"-v", | |
"0", | |
"-d", | |
r"C:\backup\data", | |
"-c", | |
r"C:\backup\cache", | |
] | |
if WITH_SSL: | |
nserv.extend( | |
[ | |
"-s", | |
r"C:\Users\me\AppData\Local\sabnzbd\admin\server.cert", | |
r"C:\Users\me\AppData\Local\sabnzbd\admin\server.key", | |
] | |
) | |
print(" ".join(nserv)) | |
nserv_proc = subprocess.Popen(nserv) | |
sslctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) | |
sslctx.check_hostname = False | |
sslctx.verify_mode = ssl.CERT_NONE | |
for _ in range(nr_wrappers): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as bare_sock: | |
if WITH_SSL: | |
s = sslctx.wrap_socket(bare_sock) | |
else: | |
s = bare_sock | |
# Connect and skip the "200 Welcome" message | |
s.connect(("127.0.0.1", 6791)) | |
s.recv(1024) | |
s.setblocking(False) | |
# The actual test - Base approach | |
start = time.process_time_ns() | |
for _ in range(nr_runs): | |
chunk_list = [] | |
s.sendall(b"ARTICLE <test.bin?1=0:760000>\r\n") | |
while 1: | |
# Wait for data to be received | |
s = select.select([s], [], [])[0][0] | |
chunk = s.recv(recv_size) | |
chunk_list.append(chunk) | |
chunk_len = len(chunk) | |
if chunk[-5:] == b"\r\n.\r\n": | |
break | |
elif chunk_len < 5 and len(chunk) > 1: | |
# We need to make sure the end is not split over 2 chunks | |
# This is faster than join() | |
combine_chunk = chunk_list[-2][-5:] + chunk | |
if combine_chunk[-5:] == b"\r\n.\r\n": | |
break | |
print("Chunks:", (time.process_time_ns() - start) / 1e6) | |
# The actual test - Base approach | |
start = time.process_time_ns() | |
for _ in range(nr_runs): | |
chunk_buffer = bytearray(total_data) | |
chunk_buffer_view = memoryview(chunk_buffer) | |
position = 0 | |
s.sendall(b"ARTICLE <test.bin?1=0:760000>\r\n") | |
while 1: | |
# Wait for data to be received | |
s = select.select([s], [], [])[0][0] | |
position += s.recv_into(chunk_buffer_view[position:], min(recv_size, total_data - position)) | |
if chunk_buffer[position - 5 : position] == b"\r\n.\r\n": | |
break | |
print("Buffer:", (time.process_time_ns() - start) / 1e6) | |
print("------------------") | |
# Just to make sure we got the same size: | |
if len(b"".join(chunk_list)) != position: | |
raise ValueError("Sizes should match!") | |
nserv_proc.kill() | |
nserv_proc.wait(timeout=10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment