import os | |
import socket | |
import sys | |
from threading import Thread | |
from Queue import PriorityQueue | |
from datetime import datetime | |
import pwd | |
from decimal import Decimal | |
import time | |
from ssh2.session import Session | |
from ssh2.sftp import LIBSSH2_FXF_WRITE | |
from paramiko import SSHClient, SFTPClient, RSAKey, MissingHostKeyPolicy | |
CMD = 'cat ssh2-python/LICENSE' | |
FILE = os.path.expanduser('~/Downloads/archive.tar.gz') | |
graph_host = 'localhost' | |
graph_port = 2003 | |
ssh_host = 'localhost' | |
ssh_port = 22 | |
USER = pwd.getpwuid(os.geteuid()).pw_name | |
_QUEUE = PriorityQueue() | |
def send_to_graphite(queue): | |
graph_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
graph_sock.connect((graph_host, graph_port)) | |
while True: | |
message = queue.get() | |
graph_sock.sendall(message) | |
queue.task_done() | |
def _make_data(app, auth, channel_open, execute, chan_read, | |
close_and_exit_status, sftp_read, total, | |
client_num=1, clients=1): | |
data = { | |
'.'.join([app, 'auth']): format(Decimal( | |
str(auth.total_seconds() * 1000)), '.3f'), | |
'.'.join([app, 'channel_open']): format(Decimal( | |
str(channel_open.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'execute']): format(Decimal( | |
(execute.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'channel_read']): format(Decimal( | |
str(chan_read.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'close_and_exit_status']): format(Decimal( | |
str(close_and_exit_status.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'sftp_read']): format(Decimal( | |
str(sftp_read.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'total']): format(Decimal( | |
str(total.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'client_num']): str(client_num), | |
'.'.join([app,'clients']): str(clients), | |
} | |
_client_num = '.'.join([app,'client_num']) | |
# print("Sending graphite data for client %s" % data[_client_num]) | |
return data | |
def queue_data(queue, data, priority): | |
message = make_message(data) | |
queue.put(message, priority) | |
def paramiko_execute(_queue, client_num=1, clients=1): | |
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
_sock.connect((ssh_host, ssh_port)) | |
client = SSHClient() | |
start = datetime.now() | |
now = datetime.now() | |
client.set_missing_host_key_policy(MissingHostKeyPolicy()) | |
try: | |
client.connect(ssh_host, sock=_sock, look_for_keys=False) | |
except Exception: | |
data = {'paramiko.failure': '1'} | |
queue_data(_queue, data, clients) | |
return | |
auth = datetime.now() - now | |
now = datetime.now() | |
try: | |
transport = client.get_transport() | |
channel = transport.open_session() | |
while channel is None: | |
channel = transport.open_session() | |
except Exception: | |
data = {'paramiko.failure': '1'} | |
queue_data(_queue, data, clients) | |
return | |
channel_open = datetime.now() - now | |
now = datetime.now() | |
stdout = channel.makefile('rb') | |
channel.exec_command(CMD) | |
execute = datetime.now() - now | |
now = datetime.now() | |
for line in stdout: | |
# print(line) | |
pass | |
chan_read = datetime.now() - now | |
now = datetime.now() | |
channel.close() | |
while not channel.closed: | |
pass | |
channel.recv_exit_status() | |
close_and_exit_status = datetime.now() - now | |
now = datetime.now() | |
sftp_transport = client.get_transport() | |
sftp_transport.open_session() | |
sftp = SFTPClient.from_transport(sftp_transport) | |
with sftp.open(FILE, 'rb') as remote_fh: | |
for data in remote_fh: | |
pass | |
sftp_write = datetime.now() - now | |
sftp.close() | |
client.close() | |
_sock.close() | |
total = datetime.now() - start | |
data = _make_data('paramiko', auth, channel_open, execute, chan_read, | |
close_and_exit_status, sftp_write, total, | |
client_num=client_num, clients=clients) | |
queue_data(_queue, data, clients) | |
def ssh2_execute(_queue, client_num=1, clients=1): | |
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
_sock.connect((ssh_host, ssh_port)) | |
s = Session() | |
start = datetime.now() | |
now = datetime.now() | |
try: | |
s.handshake(_sock) | |
s.agent_auth(USER) | |
except Exception: | |
data = {'ssh2.failure': '1'} | |
queue_data(_queue, data, clients) | |
return | |
auth = datetime.now() - now | |
now = datetime.now() | |
channel = s.open_session() | |
channel_open = datetime.now() - now | |
now = datetime.now() | |
try: | |
channel.execute(CMD) | |
except Exception: | |
data = {'ssh2.failure': '1'} | |
queue_data(_queue, data, clients) | |
return | |
execute = datetime.now() - now | |
now = datetime.now() | |
size, data = channel.read() | |
while size > 0: | |
# print(data) | |
size, data = channel.read() | |
chan_read = datetime.now() - now | |
now = datetime.now() | |
channel.close() | |
# Get exit status | |
channel.get_exit_status() | |
close_and_exit_status = datetime.now() - now | |
now = datetime.now() | |
sftp = s.sftp_init() | |
with sftp.open(FILE, 0, 0) as remote_fh: | |
for data in remote_fh: | |
pass | |
sftp_write = datetime.now() - now | |
del s | |
_sock.close() | |
total = datetime.now() - start | |
data = _make_data('ssh2', auth, channel_open, execute, chan_read, | |
close_and_exit_status, sftp_write, total, | |
client_num=client_num, clients=clients) | |
queue_data(_queue, data, clients) | |
def make_message(data): | |
"""Make and return metrics message(s)""" | |
dt = datetime.now() | |
test_data = [b"%s %s %s\n" % (serie.encode('utf-8'), | |
data[serie].encode('utf-8'), | |
dt.strftime("%s").encode('utf-8')) | |
for serie in data] | |
test_data = b"".join(test_data) | |
return test_data | |
def start_workers(target_func, app, num_workers=10): | |
graph_thread = Thread(target=send_to_graphite, args=(_QUEUE,)) | |
graph_thread.daemon = True | |
graph_thread.start() | |
threads = [Thread(target=target_func, args=(_QUEUE, i, num_workers,)) | |
for i in range(1, num_workers+1)] | |
for thread in threads: | |
thread.start() | |
for thread in threads: | |
thread.join() | |
_QUEUE.join() | |
def run_paramiko_workers(num_workers=10): | |
return start_workers(paramiko_execute, 'paramiko', | |
num_workers=num_workers) | |
def run_ssh2_workers(num_workers=10): | |
return start_workers(ssh2_execute, 'ssh2', | |
num_workers=num_workers) | |
if __name__ == "__main__": | |
# run_ssh2_workers(num_workers=2) | |
# run_paramiko_workers(num_workers=2) | |
max_workers = 50 | |
for _max in range(max_workers): | |
run_paramiko_workers(num_workers=_max) | |
run_ssh2_workers(num_workers=_max) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment