Skip to content

Instantly share code, notes, and snippets.

@mylamour
Created October 29, 2018 02:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mylamour/08f9d4781c22eff7291be4a468ebde95 to your computer and use it in GitHub Desktop.
Save mylamour/08f9d4781c22eff7291be4a468ebde95 to your computer and use it in GitHub Desktop.
[threatfeeds download] #python #threat feeds.io
import re, os, sys
import json
import logging
import hashlib
import requests
from functools import partial
from bs4 import BeautifulSoup
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool, TimeoutError
from urllib.parse import urlparse
def md5sum(filename):
with open(filename, mode='rb') as f:
d = hashlib.md5()
for buf in iter(partial(f.read, 128), b''):
d.update(buf)
return d.hexdigest()
def json_from_s(s):
match = re.findall(r"{.+[:,].+}|\[.+[,:].+\]", s)
return json.loads(match[0]) if match else None
def download_file(info):
url,name = info
print("Updataing From: {} ".format(url))
local_filename = name.replace(" ","_").replace("\'","")
local_filename = 'tmp/{}'.format(local_filename)
# local_filename = url.split('/')[-1]
r = requests.get(url, stream=True)
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
#f.flush() commented by recommendation from J.F.Sebastian
# return local_filename
if os.path.isfile(local_filename):
print("Updata Feeds {} Sucessful".format(url))
else:
print("Updata Feeds {} Failed".format(url))
cookies = {
'PHPSESSID': 'ju2u8ln4rek3ek9i18t7is6hq4',
'_ga': 'GA1.2.1949120521.1540173253',
'_gid': 'GA1.2.257606040.1540173253',
}
headers = {
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Referer': 'https://www.google.com/',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
}
res = requests.get('https://threatfeeds.io/', headers=headers, cookies=cookies)
if res.status_code == 200:
soup = BeautifulSoup(res.text)
feedswithscript = soup.findAll("script")[-1]
feeds = json_from_s(feedswithscript.text)
feedsurl = [ (_['url'],_['name'])for _ in feeds if _['url']]
try:
with Pool(processes=4) as pool:
pool.map(download_file,feedsurl)
except Exception as e:
logging.error("Error: {}".format(e))
sys.exit(1)
else:
print("Can't Get Info From threatfeeds.io, status code :{}".format(res.status_code))
sys.exit(1)
import os
import re
import pandas as pd
ips = []
domains = []
for r,p,d in os.walk('./tmp'):
for f in d:
fpath = os.path.join(r,f)
try:
with open(fpath,'r', encoding="utf-8") as infile:
for line in infile.readlines():
if not line.startswith("#"):
ip =re.findall(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", line)
domain = re.findall(r"(?: |//|^)([A-Za-z0-9]{1,}\.[A-Za-z0-9]{1,10}\.?[A-Za-z]{1,}\.?[A-Za-z]{1,})(?: |/|$)",line)
if ip:
ips.append(ip[0])
if domain:
domains.append(domain[0])
except Exception as e:
print("Convert {} Failed, Please Check it Manualy".format(fpath))
oip = pd.DataFrame(ips, columns=["mip"])
oip.to_csv("BadIp.csv",index=None)
odomain = pd.DataFrame(domains, columns=['domain'])
odomain.to_csv("BadDomain.csv",index=None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment