Skip to content

Instantly share code, notes, and snippets.

@kyletaylored
Last active May 19, 2020 22:39
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save kyletaylored/443f5a9b3585612e2d362dba5da305ef to your computer and use it in GitHub Desktop.
IPInfo script for GeoIP detection
# Python command
python useragent.py process <PATH TO CSV>
# BigQuery deploy command
bq load --source_format=CSV --autodetect --replace <PROJECT ID>:<DATASET ID>.<TABLE ID> user-agent.csv
import requests
from dotenv import load_dotenv
from netaddr import IPAddress
import os
import pickledb
import codecs
import csv
import argparse
from tqdm import tqdm
from pprint import pprint
# Prepare environment
load_dotenv()
db = pickledb.load('ipinfo.db', False)
# Get args for CSV path
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
processPath = subparsers.add_parser('process')
processPath.add_argument('path')
args = parser.parse_args()
IN_FILENAME = args.path
# Setup IPInfo
access_token = os.getenv("IPINFO_KEY")
ipinfoHeaders = {"Authorization": "Bearer " + str(access_token)}
# Read CSV input
OUT_FILENAME = "ipinfo.csv"
def get_ip_info(ip):
global ipinfoHeaders
endpoint = "https://ipinfo.io/" + str(ip) + "/json"
data = requests.get(endpoint, headers=ipinfoHeaders).json()
# data = requests.get(endpoint).json()
resp = {
"city": data['city'] if ('city' in data) else "",
"region": data['region'] if ('region' in data) else "",
"country": data['country'] if ('country' in data) else "",
"loc": data['loc'] if ('loc' in data) else "",
"org": data['org'] if ('org' in data) else "",
}
return resp
# Split IP address
def get_ip(ip: str):
try:
ipc = IPAddress(ip)
txt = ip
except:
txt = None
return txt
# Open CSVs for reading / writing
def main():
with codecs.open(IN_FILENAME, "r") as rp:
reader = csv.DictReader(rp)
with codecs.open(OUT_FILENAME, "w") as wp:
# read CSV headers
headers = next(reader)
data = get_ip_info(headers['ip'])
headers.update(data)
# Set up writer
writer = csv.DictWriter(wp, fieldnames=headers)
# Write headers and initial record to CSV
writer.writeheader()
writer.writerow(headers)
for row in tqdm(reader):
ip = row['ip']
if get_ip(ip) is not None:
record = db.get(ip)
if record is False:
data = get_ip_info(ip)
row.update(data)
db.set(ip, row)
record = row
writer.writerow(record)
if __name__ == '__main__':
main()
astroid==2.4.1
cachetools==4.1.0
certifi==2020.4.5.1
chardet==3.0.4
device-detector==0.10
idna==2.9
ipinfo==3.0.0
isort==4.3.21
lazy-object-proxy==1.4.3
mccabe==0.6.1
netaddr==0.7.19
pickleDB==0.9.2
pprint==0.1
pylint==2.5.2
python-dotenv==0.13.0
PyYAML==5.3.1
regex==2020.5.14
requests==2.23.0
six==1.14.0
toml==0.10.1
tqdm==4.46.0
typed-ast==1.4.1
urllib3==1.25.9
wrapt==1.12.1
from dotenv import load_dotenv
import os
import pickledb
import codecs
import csv
import argparse
from tqdm import tqdm
from pprint import pprint
from device_detector import DeviceDetector
# Get user agent data
def getDeviceData(ua):
device = DeviceDetector(ua).parse()
data = {
"device_brand": device.device_brand(),
"device_model": device.device_model(),
"device_type": device.device_type(),
"os_name": device.os_name(),
"os_version": device.os_version(),
"client_name": device.client_name(),
"client_type": device.client_type(),
"client_version": device.client_version()
}
return data
# Prepare environment
load_dotenv()
db = pickledb.load('device_detector.db', False)
# Get args for CSV path
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
processPath = subparsers.add_parser('process')
processPath.add_argument('path')
args = parser.parse_args()
IN_FILENAME = args.path
# Read CSV input
OUT_FILENAME = "user-agent.csv"
# Open CSVs for reading / writing
def main():
with codecs.open(IN_FILENAME, "r") as rp:
reader = csv.DictReader(rp)
with codecs.open(OUT_FILENAME, "w") as wp:
# read CSV headers
headers = next(reader)
data = getDeviceData(headers['user_agent'])
headers.update(data)
# Set up writer
writer = csv.DictWriter(wp, fieldnames=headers)
# Write headers and initial record to CSV
writer.writeheader()
writer.writerow(headers)
for row in tqdm(reader):
ua = row['user_agent']
data = db.get(ua)
if data is False:
data = getDeviceData(ua)
db.set(ua, row)
row.update(data)
writer.writerow(row)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment