Skip to content

Instantly share code, notes, and snippets.

@alanhamlett
Last active February 26, 2024 15:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alanhamlett/57ecef296fe582408d387f6fd3319bdd to your computer and use it in GitHub Desktop.
Save alanhamlett/57ecef296fe582408d387f6fd3319bdd to your computer and use it in GitHub Desktop.
"""
wakatime.geocoding_utils
~~~~~~~~~~~~~~~~~~~~~~~~
Utils to get location from ip address.
"""
import traceback
import flag # https://pypi.org/project/emoji-country-flag
import requests
from sqlalchemy.sql.expression import or_
from wakatime import app, json, r
from wakatime.constants import (
CACHE_TIMEOUT_ONE_MONTH,
GEOCODE_IP_PROVIDER_IPAPI,
GEOCODE_IP_PROVIDER_IPINFO,
GEOCODE_IP_PROVIDER_IPSTACK,
IP_INFO_API_URL, # "https://ipinfo.io/{ip}?token={api_key}"
IPAPI_API_URL, # "https://api.ipapi.is/?q={ip}&key={api_key}"
IPSTACK_API_URL, # "https://api.ipstack.com/{ip}?access_key={api_key}"
)
from wakatime.models import City, Country
TIMEOUT = 2
MAX_RETRIES = 3
def geocode_ip(ip=None, provider=GEOCODE_IP_PROVIDER_IPAPI, _retries=0):
"""Return a dict json response of location for given IP address.
First tries ipapi.is up to 3 times, then tries ipstack.com 3 times, then tries ipinfo.io 1 time.
If all APIs are offline or have no results, returns None.
"""
ip = discover_ip(ip)
if not ip:
return None
resp = None
try:
if provider == GEOCODE_IP_PROVIDER_IPAPI:
resp = geocode_ip_using_ipapi(ip=ip)
elif provider == GEOCODE_IP_PROVIDER_IPSTACK:
resp = geocode_ip_using_ipstack(ip=ip)
elif provider == GEOCODE_IP_PROVIDER_IPINFO:
resp = geocode_ip_using_ipinfo(ip=ip)
except:
if provider == GEOCODE_IP_PROVIDER_IPINFO or _retries == MAX_RETRIES:
app.logger.error(f"geocode_ip {provider} error:\n{traceback.format_exc()}")
return None
if resp:
return resp
if provider != GEOCODE_IP_PROVIDER_IPINFO and _retries < MAX_RETRIES:
return geocode_ip(ip=ip, provider=provider, _retries=_retries + 1)
if provider == GEOCODE_IP_PROVIDER_IPAPI:
return geocode_ip(ip=ip, provider=GEOCODE_IP_PROVIDER_IPSTACK, _retries=0)
elif provider == GEOCODE_IP_PROVIDER_IPSTACK:
return geocode_ip(ip=ip, provider=GEOCODE_IP_PROVIDER_IPINFO, _retries=0)
return None
def geocode_ip_using_ipapi(ip=None):
"""Convert ipapi.is response into ipstack.com reponse."""
ip = discover_ip(ip)
if not ip:
return None
key = "geocode-ipapi-{}".format(ip)
try:
cache = r.get(key)
if cache:
data = json.loads(cache)
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH)
return data
except:
pass
url = IPAPI_API_URL.format(api_key=app.config["IPAPI_SECRET_KEY"], ip=ip)
headers = {"User-Agent": "WakaTime/1.0"}
response = requests.get(url, headers=headers, timeout=TIMEOUT)
try:
data = response.json()
except:
data = None
if not data or not data.get("ip") or not (data.get("location") or {}).get("country"):
raise Exception(f"Response {response.status_code}:\n{response.text}")
# transform response to have ipstack.com structure for backwards compatibility
data["ip"] = ip
data["type"] = "ipv6" if len(ip) > 15 else "ipv4"
data["country_code"] = data["location"]["country_code"]
data["country_name"] = data["location"]["country"]
data["city"] = data["location"]["city"]
data["location"]["country_flag_emoji"] = flag.flag(data["location"]["country_code"].upper())
data["time_zone"] = {"id": data["location"]["timezone"]}
data["provider"] = "ipapi.is"
try:
r.set(key, json.dumps(data))
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH)
except:
pass
return data
def geocode_ip_using_ipstack(ip=None):
ip = discover_ip(ip)
if not ip:
return None
key = "geocode-ipstack-{}".format(ip)
try:
cache = r.get(key)
if cache:
data = json.loads(cache)
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH)
return data
except:
pass
url = IPSTACK_API_URL.format(api_key=app.config["IPSTACK_SECRET_KEY"], ip=ip)
headers = {"User-Agent": "WakaTime/1.0"}
response = requests.get(url, headers=headers, verify=False, timeout=TIMEOUT)
try:
data = response.json()
except:
data = None
if not data or data.get("success") is False or not data.get("ip") or not data.get("country_name"):
raise Exception(f"Response {response.status_code}:\n{response.text}")
data["provider"] = "ipstack.com"
try:
r.set(key, json.dumps(data))
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH)
except:
pass
return data
def geocode_ip_using_ipinfo(ip=None):
"""Convert ipinfo.io response into ipstack.com reponse."""
ip = discover_ip(ip)
if not ip:
return None
key = "geocode-ipinfo-{}".format(ip)
try:
cache = r.get(key)
if cache:
data = json.loads(cache)
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH)
return data
except:
pass
url = IP_INFO_API_URL.format(api_key=app.config["IPINFO_SECRET_KEY"], ip=ip)
headers = {"User-Agent": "WakaTime/1.0"}
response = requests.get(url, headers=headers, timeout=TIMEOUT)
try:
data = response.json()
except:
data = None
if not data or not data.get("ip") or not data.get("country") or not data.get("city") or not data.get("timezone"):
raise Exception(f"Response {response.status_code}:\n{response.text}")
country = Country.query.filter_by(country_code=data["country"].upper()).first()
if not country:
raise Exception(f"geocode_ip ipinfo error missing country {data['country']}:\n{response.text}")
city = City.query.filter_by(country_code=data["country"].upper()).filter(or_(City.ascii_name == data["city"], City.name == data["city"])).first()
if not city:
city = (
City.query.filter_by(country_code=data["country"].upper()).filter(or_(City.ascii_name == country.capital, City.name == country.capital)).first()
)
if not city:
raise Exception(f"geocode_ip ipinfo error missing city {data['city']}:\n{response.text}")
transformed = {
"ip": ip,
"type": "ipv6" if len(ip) > 15 else "ipv4",
"country_code": data["country"].upper(),
"country_name": country.name,
"continent_code": country.continent,
"city": data["city"],
"zip": data.get("postal"),
"latitude": float(data["loc"].split(",")[0]),
"longitude": float(data["loc"].split(",")[1]),
"location": {
"geoname_id": city.geoname_id,
"capital": country.capital,
"country_flag_emoji": flag.flag(data["country"].upper()),
},
"time_zone": {"id": data["timezone"]},
"currency": {"code": country.currency_code},
"connection": {"isp": data["org"]},
"provider": "ipinfo.io",
}
try:
r.set(key, json.dumps(transformed))
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH)
except:
pass
return transformed
def discover_ip(ip):
if ip:
return ip
if app.config["DEV"]:
return None
return utils.get_ip()
def get_ip(internal=False):
"""IP Address of current request."""
if not has_request_context():
return None
real_ip = request.remote_addr
if not internal:
return str(real_ip)[:120] if real_ip else None
ips = list(request.access_route)
if real_ip not in ips:
ips.insert(0, real_ip)
ips = filter(lambda ip: ip, ips)
if not ips:
return None
return ",".join(ips)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment