Skip to content

Instantly share code, notes, and snippets.

@itdaniher
Last active December 18, 2017 04:39
Show Gist options
  • Save itdaniher/32967b4a1bdad9ff7472fc93504dc4c7 to your computer and use it in GitHub Desktop.
Save itdaniher/32967b4a1bdad9ff7472fc93504dc4c7 to your computer and use it in GitHub Desktop.
netutils.py
from time import time
import socket
import fcntl
import struct
import urllib.request, urllib.error, urllib.parse
import re
import json
ip_sources = ['https://wtfismyip.com/json', 'https://canhazip.com/', 'https://icanhazip.com', 'https://vyncke.org/ip.php', 'https://www.dan.me.uk/ipinfo', 'https://ipinfo.me/', 'https://my-ip.herokuapp.com/', 'https://httpbin.org/ip', 'https://onlinebanking.tdbank.com/']
ip_regex = re.compile('(?:[0-9]{1,3}\.){3}[0-9]{1,3}')
ip_regex_b = re.compile(b'(?:[0-9]{1,3}\.){3}[0-9]{1,3}')
def host_from_verbphrase(verb_phrase):
try:
verb, url, style = verb_phrase.split(b' ')
except:
print('Verb Phrase Malformed', verb_phrase)
port = 80
if (verb == b'CONNECT'):
host, port = url.split(b':')
else:
parsed = urllib.parse.urlsplit(url)
if b':' in parsed.netloc:
host, port = parsed.netloc.split(b':')
else:
host = parsed.netloc
port = {b'http': 80, b'https': 443}[parsed.scheme]
port = int(port)
return host, port
def get_ip_from_host(hostname):
if '.' in hostname:
resp = urllib.request.urlopen('https://dns.google.com/resolve?name='+hostname)
out = json.loads(str(resp.read(), 'utf-8'))
if ('Answer' in out.keys()) and (len(out['Answer']) > 0):
return out['Answer'][0]['data']
return ''
def parse_headers(message):
# 3x faster than http.client.parse_headers
try:
header_end = message.index(b'\r\n\r\n')
headers = [z.split(b': ',1) for z in message[0:header_end].split(b'\r\n')][1::]
headers = dict([(x[0].lower(), x[1]) for x in headers])
return message.split(b'\r\n', 1)[0], headers, message[header_end::]
except:
raise Exception(("Couldn't parse HTTP headers in message.", message))
def build_and_connect(addy, port):
# build a socket object for connecting to the upstream proxy
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.connect((addy, port))
return sock
def get_ip_interface(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack(b'256s', bytes(ifname[:15], 'utf-8'))
)[20:24])
def build_socket_on_interface(ifname):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# SO_BINDTODEVICE
sock.setsockopt(socket.SOL_SOCKET, 25, bytes(ifname, 'utf-8')+b'\x00')
return sock
def build_and_connect_on_interface(addy, port, ifname, timeout=60.0):
sock = build_socket_on_interface(ifname)
sock.settimeout(timeout)
try:
sock.connect((addy, port))
return sock
except socket.timeout:
sock.close()
return None
def build_and_listen_on_interface(ifname, port):
# type TCP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# allow address re-use
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# set TCP option NODELAY (performance boost)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# bind a socket on the ethernet interface for remote connections
sock.bind((get_ip_interface(ifname), port))
# buffer up to five requests
sock.listen(5)
return sock
def get_ip(third_party=ip_sources[int(time()) % len(ip_sources)], proxy=None, timeout=10):
try:
if proxy:
proxy = 'http://'+proxy[0]+':'+str(proxy[1])
results = ip_regex.findall(get(third_party, headers={'User-Agent': 'Mozilla/5.0'}, proxies={'https':proxy}, timeout=timeout).content)
else:
results = ip_regex.findall(get(third_party, headers={'User-Agent': 'Mozilla/5.0'}, timeout=timeout).content)
except:
return None
if results:
return results[0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment