Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
import os
import socket
import sys
from threading import Thread
from Queue import PriorityQueue
from datetime import datetime
import pwd
from decimal import Decimal
import time
from ssh2.session import Session
from ssh2.sftp import LIBSSH2_FXF_WRITE
from paramiko import SSHClient, SFTPClient, RSAKey, MissingHostKeyPolicy
CMD = 'cat ssh2-python/LICENSE'
FILE = os.path.expanduser('~/Downloads/archive.tar.gz')
graph_host = 'localhost'
graph_port = 2003
ssh_host = 'localhost'
ssh_port = 22
USER = pwd.getpwuid(os.geteuid()).pw_name
_QUEUE = PriorityQueue()
def send_to_graphite(queue):
graph_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
graph_sock.connect((graph_host, graph_port))
while True:
message = queue.get()
graph_sock.sendall(message)
queue.task_done()
def _make_data(app, auth, channel_open, execute, chan_read,
close_and_exit_status, sftp_read, total,
client_num=1, clients=1):
data = {
'.'.join([app, 'auth']): format(Decimal(
str(auth.total_seconds() * 1000)), '.3f'),
'.'.join([app, 'channel_open']): format(Decimal(
str(channel_open.total_seconds() * 1000)), '.3f'),
'.'.join([app,'execute']): format(Decimal(
(execute.total_seconds() * 1000)), '.3f'),
'.'.join([app,'channel_read']): format(Decimal(
str(chan_read.total_seconds() * 1000)), '.3f'),
'.'.join([app,'close_and_exit_status']): format(Decimal(
str(close_and_exit_status.total_seconds() * 1000)), '.3f'),
'.'.join([app,'sftp_read']): format(Decimal(
str(sftp_read.total_seconds() * 1000)), '.3f'),
'.'.join([app,'total']): format(Decimal(
str(total.total_seconds() * 1000)), '.3f'),
'.'.join([app,'client_num']): str(client_num),
'.'.join([app,'clients']): str(clients),
}
_client_num = '.'.join([app,'client_num'])
# print("Sending graphite data for client %s" % data[_client_num])
return data
def queue_data(queue, data, priority):
message = make_message(data)
queue.put(message, priority)
def paramiko_execute(_queue, client_num=1, clients=1):
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_sock.connect((ssh_host, ssh_port))
client = SSHClient()
start = datetime.now()
now = datetime.now()
client.set_missing_host_key_policy(MissingHostKeyPolicy())
try:
client.connect(ssh_host, sock=_sock, look_for_keys=False)
except Exception:
data = {'paramiko.failure': '1'}
queue_data(_queue, data, clients)
return
auth = datetime.now() - now
try:
transport = client.get_transport()
channel = transport.open_session()
while channel is None:
channel = transport.open_session()
except Exception:
data = {'paramiko.failure': '1'}
queue_data(_queue, data, clients)
return
channel_open = datetime.now() - now
now = datetime.now()
stdout = channel.makefile('rb')
channel.exec_command(CMD)
execute = datetime.now() - now
now = datetime.now()
for line in stdout:
# print(line)
pass
chan_read = datetime.now() - now
now = datetime.now()
channel.close()
while not channel.closed:
pass
channel.recv_exit_status()
close_and_exit_status = datetime.now() - now
now = datetime.now()
sftp_transport = client.get_transport()
sftp_transport.open_session()
sftp = SFTPClient.from_transport(sftp_transport)
with sftp.open(FILE, 'rb') as remote_fh:
for data in remote_fh:
pass
sftp_write = datetime.now() - now
sftp.close()
client.close()
_sock.close()
total = datetime.now() - start
data = _make_data('paramiko', auth, channel_open, execute, chan_read,
close_and_exit_status, sftp_write, total,
client_num=client_num, clients=clients)
queue_data(_queue, data, clients)
def ssh2_execute(_queue, client_num=1, clients=1):
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_sock.connect((ssh_host, ssh_port))
s = Session()
start = datetime.now()
now = datetime.now()
try:
s.handshake(_sock)
s.agent_auth(USER)
except Exception:
data = {'ssh2.failure': '1'}
queue_data(_queue, data, clients)
return
auth = datetime.now() - now
now = datetime.now()
channel = s.open_session()
channel_open = datetime.now() - now
now = datetime.now()
try:
channel.execute(CMD)
except Exception:
data = {'ssh2.failure': '1'}
queue_data(_queue, data, clients)
return
execute = datetime.now() - now
now = datetime.now()
size, data = channel.read()
while size > 0:
# print(data)
size, data = channel.read()
chan_read = datetime.now() - now
now = datetime.now()
channel.close()
# Get exit status
channel.get_exit_status()
close_and_exit_status = datetime.now() - now
now = datetime.now()
sftp = s.sftp_init()
with sftp.open(FILE, 0, 0) as remote_fh:
for data in remote_fh:
pass
sftp_write = datetime.now() - now
del s
_sock.close()
total = datetime.now() - start
data = _make_data('ssh2', auth, channel_open, execute, chan_read,
close_and_exit_status, sftp_write, total,
client_num=client_num, clients=clients)
queue_data(_queue, data, clients)
def make_message(data):
"""Make and return metrics message(s)"""
dt = datetime.now()
test_data = [b"%s %s %s\n" % (serie.encode('utf-8'),
data[serie].encode('utf-8'),
dt.strftime("%s").encode('utf-8'))
for serie in data]
test_data = b"".join(test_data)
return test_data
def start_workers(target_func, app, num_workers=10):
graph_thread = Thread(target=send_to_graphite, args=(_QUEUE,))
graph_thread.daemon = True
graph_thread.start()
threads = [Thread(target=target_func, args=(_QUEUE, i, num_workers,))
for i in range(1, num_workers+1)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
_QUEUE.join()
def run_paramiko_workers(num_workers=10):
return start_workers(paramiko_execute, 'paramiko',
num_workers=num_workers)
def run_ssh2_workers(num_workers=10):
return start_workers(ssh2_execute, 'ssh2',
num_workers=num_workers)
if __name__ == "__main__":
# run_ssh2_workers(num_workers=2)
# run_paramiko_workers(num_workers=2)
max_workers = 50
for _max in range(max_workers):
run_paramiko_workers(num_workers=_max)
run_ssh2_workers(num_workers=_max)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment