Skip to content

Instantly share code, notes, and snippets.

@orangemio
Created March 11, 2019 08:07
Show Gist options
  • Save orangemio/6e43be425f724bdd1842540bf2e4c5c2 to your computer and use it in GitHub Desktop.
Save orangemio/6e43be425f724bdd1842540bf2e4c5c2 to your computer and use it in GitHub Desktop.
orange.py
#!/usr/bin/python
# coding: utf-8
import pexpect
import sys
import threading
import time
import random
HOST = "192.168.1.80"
PORT = 2222
USER = "admin"
PASSWORD = "admin"
HOST_NUM = 3
DEBUG = False
# 并发会话数
TOTAL = 500
PS1 = "root@localhost"
RUN_COMMAND = ["tail /var/log/dmesg", "echo hello world"]
stop_event = threading.Event()
def connect(i=1):
cmd = "ssh %s@%s -p%s" % (USER, HOST, PORT)
print(u"开始连接 [%s]: %s" % (i, cmd))
child = pexpect.spawn(cmd)
if DEBUG:
child.logfile = sys.stdout
while not stop_event.is_set():
index = child.expect([
"password:", "Jumpserver", "系统用户root", PS1,
pexpect.EOF, pexpect.TIMEOUT
])
if index == 0:
if DEBUG:
print(u"开始输入密码:")
child.sendline(PASSWORD)
elif index == 1:
if DEBUG:
print(u"输入 p")
child.sendline("p")
elif index == 2:
n = random.choice(range(2, HOST_NUM + 1))
if DEBUG:
print(u"选择一个登陆: %s" % (n))
child.sendline(str(n))
elif index == 3:
if DEBUG:
print(u"登陆成功,执行命令:%s" % RUN_COMMAND)
child.sendline(RUN_COMMAND[0])
time.sleep(1)
time.sleep(1)
def run():
for i in range(TOTAL):
t = threading.Thread(target=connect, args=(i,))
t.daemon = True
t.start()
time.sleep(0.3)
done = input("Done ?")
print("Done test")
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment