Last active
May 19, 2020 22:39
Star
You must be signed in to star a gist
IPInfo script for GeoIP detection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python command | |
python useragent.py process <PATH TO CSV> | |
# BigQuery deploy command | |
bq load --source_format=CSV --autodetect --replace <PROJECT ID>:<DATASET ID>.<TABLE ID> user-agent.csv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from dotenv import load_dotenv | |
from netaddr import IPAddress | |
import os | |
import pickledb | |
import codecs | |
import csv | |
import argparse | |
from tqdm import tqdm | |
from pprint import pprint | |
# Prepare environment | |
load_dotenv() | |
db = pickledb.load('ipinfo.db', False) | |
# Get args for CSV path | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers() | |
processPath = subparsers.add_parser('process') | |
processPath.add_argument('path') | |
args = parser.parse_args() | |
IN_FILENAME = args.path | |
# Setup IPInfo | |
access_token = os.getenv("IPINFO_KEY") | |
ipinfoHeaders = {"Authorization": "Bearer " + str(access_token)} | |
# Read CSV input | |
OUT_FILENAME = "ipinfo.csv" | |
def get_ip_info(ip): | |
global ipinfoHeaders | |
endpoint = "https://ipinfo.io/" + str(ip) + "/json" | |
data = requests.get(endpoint, headers=ipinfoHeaders).json() | |
# data = requests.get(endpoint).json() | |
resp = { | |
"city": data['city'] if ('city' in data) else "", | |
"region": data['region'] if ('region' in data) else "", | |
"country": data['country'] if ('country' in data) else "", | |
"loc": data['loc'] if ('loc' in data) else "", | |
"org": data['org'] if ('org' in data) else "", | |
} | |
return resp | |
# Split IP address | |
def get_ip(ip: str): | |
try: | |
ipc = IPAddress(ip) | |
txt = ip | |
except: | |
txt = None | |
return txt | |
# Open CSVs for reading / writing | |
def main(): | |
with codecs.open(IN_FILENAME, "r") as rp: | |
reader = csv.DictReader(rp) | |
with codecs.open(OUT_FILENAME, "w") as wp: | |
# read CSV headers | |
headers = next(reader) | |
data = get_ip_info(headers['ip']) | |
headers.update(data) | |
# Set up writer | |
writer = csv.DictWriter(wp, fieldnames=headers) | |
# Write headers and initial record to CSV | |
writer.writeheader() | |
writer.writerow(headers) | |
for row in tqdm(reader): | |
ip = row['ip'] | |
if get_ip(ip) is not None: | |
record = db.get(ip) | |
if record is False: | |
data = get_ip_info(ip) | |
row.update(data) | |
db.set(ip, row) | |
record = row | |
writer.writerow(record) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
astroid==2.4.1 | |
cachetools==4.1.0 | |
certifi==2020.4.5.1 | |
chardet==3.0.4 | |
device-detector==0.10 | |
idna==2.9 | |
ipinfo==3.0.0 | |
isort==4.3.21 | |
lazy-object-proxy==1.4.3 | |
mccabe==0.6.1 | |
netaddr==0.7.19 | |
pickleDB==0.9.2 | |
pprint==0.1 | |
pylint==2.5.2 | |
python-dotenv==0.13.0 | |
PyYAML==5.3.1 | |
regex==2020.5.14 | |
requests==2.23.0 | |
six==1.14.0 | |
toml==0.10.1 | |
tqdm==4.46.0 | |
typed-ast==1.4.1 | |
urllib3==1.25.9 | |
wrapt==1.12.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dotenv import load_dotenv | |
import os | |
import pickledb | |
import codecs | |
import csv | |
import argparse | |
from tqdm import tqdm | |
from pprint import pprint | |
from device_detector import DeviceDetector | |
# Get user agent data | |
def getDeviceData(ua): | |
device = DeviceDetector(ua).parse() | |
data = { | |
"device_brand": device.device_brand(), | |
"device_model": device.device_model(), | |
"device_type": device.device_type(), | |
"os_name": device.os_name(), | |
"os_version": device.os_version(), | |
"client_name": device.client_name(), | |
"client_type": device.client_type(), | |
"client_version": device.client_version() | |
} | |
return data | |
# Prepare environment | |
load_dotenv() | |
db = pickledb.load('device_detector.db', False) | |
# Get args for CSV path | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers() | |
processPath = subparsers.add_parser('process') | |
processPath.add_argument('path') | |
args = parser.parse_args() | |
IN_FILENAME = args.path | |
# Read CSV input | |
OUT_FILENAME = "user-agent.csv" | |
# Open CSVs for reading / writing | |
def main(): | |
with codecs.open(IN_FILENAME, "r") as rp: | |
reader = csv.DictReader(rp) | |
with codecs.open(OUT_FILENAME, "w") as wp: | |
# read CSV headers | |
headers = next(reader) | |
data = getDeviceData(headers['user_agent']) | |
headers.update(data) | |
# Set up writer | |
writer = csv.DictWriter(wp, fieldnames=headers) | |
# Write headers and initial record to CSV | |
writer.writeheader() | |
writer.writerow(headers) | |
for row in tqdm(reader): | |
ua = row['user_agent'] | |
data = db.get(ua) | |
if data is False: | |
data = getDeviceData(ua) | |
db.set(ua, row) | |
row.update(data) | |
writer.writerow(row) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment