This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pssh.pssh_client import ParallelSSHClient | |
from pssh.pssh2_client import ParallelSSHClient as ParallelSSH2Client | |
from pssh.utils import load_private_key | |
from gevent import monkey, sleep, threadpool, spawn, joinall | |
import os | |
import socket | |
import sys | |
from threading import Thread | |
try: | |
from Queue import PriorityQueue | |
except ImportError: | |
from queue import PriorityQueue | |
from datetime import datetime | |
import pwd | |
from decimal import Decimal | |
import time | |
CMD = 'cat Projects/ssh2-python/LICENSE' | |
FILE = 'dl/PDF%20File%20Reader_1.4_apk-dl.com.apk' | |
FILE_ABS = os.path.expanduser('~/' + FILE) | |
graph_host = 'localhost' | |
graph_port = 2003 | |
ssh_host = 'localhost' | |
_QUEUE = PriorityQueue() | |
def send_to_graphite(queue): | |
graph_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
graph_sock.connect((graph_host, graph_port)) | |
while True: | |
message = queue.get() | |
graph_sock.sendall(message) | |
queue.task_done() | |
def _make_data(app, commands_start, chan_read, | |
commands_finished, commands_restart, total, clients=1): | |
data = { | |
'.'.join([app,'auth_and_execute']): format(Decimal( | |
str(commands_start.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'execute']): format(Decimal( | |
str(commands_restart.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'channel_read']): format(Decimal( | |
str(chan_read.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'close_and_exit_status']): format(Decimal( | |
str(commands_finished.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'total']): format(Decimal( | |
str(total.total_seconds() * 1000)), '.3f'), | |
'.'.join([app,'clients']): str(clients), | |
} | |
return data | |
def queue_data(queue, data, priority): | |
message = make_message(data) | |
queue.put(message, priority) | |
def run_test(_queue, client, name): | |
clients = client.pool_size | |
start = datetime.now() | |
now = datetime.now() | |
try: | |
output = client.run_command(CMD) | |
except Exception: | |
data = {'%s.failure' % name: '1'} | |
queue_data(_queue, data, clients) | |
# raise | |
return | |
commands_start = datetime.now() - now | |
now = datetime.now() | |
client.join(output) | |
commands_finished = datetime.now() - now | |
now = datetime.now() | |
for host, host_output in output.items(): | |
if host_output.stdout is None: | |
continue | |
for line in host_output.stdout: | |
# print(line) | |
pass | |
chan_read = datetime.now() - now | |
now = datetime.now() | |
try: | |
client.run_command(CMD) | |
except Exception: | |
data = {'%s.failure' % name: '1'} | |
queue_data(_queue, data, clients) | |
return | |
commands_restart = datetime.now() - now | |
total = datetime.now() - start | |
data = _make_data(name, commands_start, chan_read, | |
commands_finished, | |
commands_restart, | |
total, clients=clients) | |
queue_data(_queue, data, clients) | |
def make_message(data): | |
"""Make and return metrics message(s)""" | |
dt = datetime.now() | |
test_data = [b"%s %s %s\n" % (serie.encode('utf-8'), | |
data[serie].encode('utf-8'), | |
dt.strftime("%s").encode('utf-8')) | |
for serie in data] | |
test_data = b"".join(test_data) | |
return test_data | |
def start_graph_thread(): | |
graph_thread = Thread(target=send_to_graphite, args=(_QUEUE,)) | |
graph_thread.daemon = True | |
graph_thread.start() | |
return graph_thread | |
def run_paramiko_workers(hosts): | |
client = ParallelSSHClient(hosts, num_retries=1, | |
pool_size=len(hosts), | |
allow_agent=False, | |
pkey=load_private_key(os.path.expanduser('~/.ssh/local'))) | |
return run_test(_QUEUE, client, 'paramiko') | |
def run_ssh2_workers(hosts): | |
client = ParallelSSH2Client(hosts, num_retries=1, | |
pool_size=len(hosts), | |
allow_agent=False, | |
pkey=os.path.expanduser('~/.ssh/local')) | |
return run_test(_QUEUE, client, 'ssh2') | |
def _start(num_workers, step=1): | |
for _max in range(1, num_workers+step+1, step): | |
hosts = [ssh_host for _ in range(_max)] | |
# run_paramiko_workers(hosts) | |
run_ssh2_workers(hosts) | |
def _start_ssh2(num_workers, step=1): | |
for _max in range(1, num_workers+step+1, step): | |
hosts = [ssh_host for _ in range(_max)] | |
now = datetime.now() | |
run_ssh2_workers(hosts) | |
diff = datetime.now() - now | |
if diff.total_seconds() < 2: | |
sleep(2 - diff.total_seconds()) | |
def test(): | |
hosts = [ssh_host for _ in range(5)] | |
run_ssh2_workers(hosts) | |
run_paramiko_workers(hosts) | |
_QUEUE.join() | |
sys.exit(0) | |
if __name__ == "__main__": | |
try: | |
max_workers = int(sys.argv[1]) | |
except IndexError: | |
sys.stderr.write("Run as %s <max concurrency>\n" % sys.argv[0]) | |
sys.exit(1) | |
except TypeError: | |
sys.stderr.write("Max workers must be integer\n") | |
sys.exit(1) | |
graph_thread = start_graph_thread() | |
# test() | |
_start(max_workers) | |
# _start_ssh2(max_workers) | |
_QUEUE.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment