Skip to content

Instantly share code, notes, and snippets.

@akiasprin
Created October 27, 2017 19:06
Show Gist options
  • Save akiasprin/c827c5b501f11cd67cb1daf46d2643a0 to your computer and use it in GitHub Desktop.
Save akiasprin/c827c5b501f11cd67cb1daf46d2643a0 to your computer and use it in GitHub Desktop.
Shadowsocks接入檢測腳本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys, json, os, socket, time, logging, datetime, random, pymysql
from bs4 import BeautifulSoup
from tornado import httpclient, curl_httpclient ,gen, ioloop, queues
APP_KEY = [
'cd2306261560b164317de7a2c5c259a9',
'f8f22c028b3ad53163da5a7a0ca854b3'
]
RTBAsia_URL = 'http://apis.baidu.com/rtbasia/ip_type/ip_type?ip='
IPLOCAT_URL = 'http://freeapi.ipip.net/?ip='
NETS_CMD = 'netstat -np | egrep \':(4433) \' | awk \'{print $5}\' | cut -d: -f1 | sort | uniq'
BANIP_CMD = 'iptables -I INPUT -s %s -j DROP'
excludes = ('127.0.0.1', socket.gethostbyname(socket.getfqdn(socket.gethostname())))
ip_type_alias = {'1': '数据中心', '2': '专用出口', '3': '普通宽带', '4': '蜂窝数据'}
def get_current_time():
now = datetime.datetime.now()
return now.strftime('%Y-%m-%d %H:%M:%S')
class DBHelper:
def __init__(self):
self.db = pymysql.connect(host="localhost", user="root", password="admin", database="ip", charset='utf8', autocommit=True)
self.cursor = self.db.cursor()
self.db.ping()
def get(self, sql):
self.cursor.execute(sql)
data = self.cursor.fetchone()
return data
def add(self, sql):
self.cursor.execute(sql)
class DBAcess:
@staticmethod
def insert(ip, country, province, city, isp, type):
helper = DBHelper()
sql = "INSERT INTO ip_type(ip, country, province, city, isp, type) VALUES ('%s', '%s', '%s', '%s', '%s', '%s') ;" % (ip, country, province, city, isp, type)
print(sql)
helper.add(sql)
@staticmethod
def query(ip):
helper = DBHelper()
sql = "SELECT (type) FROM ip_type WHERE ip = '%s' ;" % ip
data = helper.get(sql)
return data
class Observer:
def __init__(self):
self.q = queues.Queue()
self.vis = set()
self.map = {}
for item in excludes:
self.vis.add(item)
@gen.coroutine
def ban(self, ip):
#os.system(BANIP_CMD % ip)
pass
@gen.coroutine
def fetch(self, ip):
@gen.coroutine
def check_source(url, args):
try:
http_client = httpclient.AsyncHTTPClient()
request = httpclient.HTTPRequest(url + args, method='GET',
headers=dict(apikey=random.sample(APP_KEY, 1)[0]))
response = yield http_client.fetch(request)
JSON = json.loads(response.body.decode('utf-8'))
type = JSON['data']['type']
logging.info('[TIME]:%s [IP]:%s [TPE]:%s ' % (get_current_time(), args, ip_type_alias[type]))
if type == '1': self.ban(ip)
except Exception as e:
logging.error(e)
yield check_source(url, args)
return type
@gen.coroutine
def check_location(url, args):
try:
http_client = httpclient.AsyncHTTPClient()
request = httpclient.HTTPRequest(url + args, method='GET')
response = yield http_client.fetch(request)
JSON = json.loads(response.body.decode('utf-8'))
logging.info('[TIME]:%s [IP]:%s [FRM]:%s%s%s%s-%s' % (get_current_time(), ip, JSON[0], JSON[1], JSON[2] if JSON[1] != JSON[2] else '', JSON[3], JSON[4]))
if JSON[0] != '中国': yield self.ban(ip)
except Exception as e:
yield check_location(url, args)
return JSON[0], JSON[1], JSON[2], JSON[4]
type = yield check_source(RTBAsia_URL, ip)
country, province, city, isp = yield check_location(IPLOCAT_URL, ip)
DBAcess.insert(ip, country, province, city, isp, type)
@gen.coroutine
def worker(self):
@gen.coroutine
def get(self):
ip = yield self.q.get()
try:
result = DBAcess.query(ip)
if result is None:
yield self.fetch(ip)
elif result == 1:
self.ban(ip)
finally:
self.q.task_done()
while True:
yield get(self)
@gen.coroutine
def addlines(self):
@gen.coroutine
def get(self):
try:
ips = os.popen(NETS_CMD).readlines()
for ip in ips:
ip = ip.strip()
if ip in self.vis:
continue
yield self.q.put(ip)
self.vis.add(ip)
except:
pass
yield get(self)
@gen.coroutine
def run(self):
while self.q.empty():
self.addlines()
for _ in range(0, 10):
self.worker()
yield self.q.join()
time.sleep(60)
@gen.coroutine
def main():
observer = Observer()
yield observer.run()
if __name__ == '__main__':
httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.94 Safari/537.36"), max_clients=100)
logging.basicConfig(filename='/root/work/ips.log', level=logging.DEBUG)
io_loop = ioloop.IOLoop.current()
io_loop.run_sync(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment