Last active
March 24, 2022 14:56
-
-
Save geekmoss/eb9316721c208a801d51dab17c239f6a to your computer and use it in GitHub Desktop.
PySpark Geoip2 udf for get country & least specific region
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext, SparkFiles | |
from pyspark.sql import SparkSession | |
from pyspark.sql.types import * | |
from pyspark.sql.functions import DataFrame, udf, col | |
from geoip2 import database | |
from geoip2.errors import AddressNotFoundError | |
from geoip2.models import City | |
sc = SparkContext() | |
spark = SparkSession(sc) | |
sc.addFile("hdfs://path/to/GeoLite2-City.mmdb") # or another geo db & can by used http(s)://, etc instead of hdfs:// | |
df = spark.read.json("/path/to/data") # or csv, avro, etc. | |
schema = StructType([ | |
StructField("country", StringType(), True), | |
StructField("region", StringType(), True), | |
]) | |
@udf(returnType=schema) | |
def geoip(ip): | |
geo = database.Reader(SparkFiles.get("GeoLite2-City.mmdb")) | |
try: | |
result = geo.city(ip) | |
pass | |
except AddressNotFoundError: | |
return {"country": None, "region": None} | |
specific = result.subdivisions | |
# [0] is for "Kraj" in Czech Republic, because python geoip2 does not have `subdivisions.least_specific` as in Java lib | |
# http://maxmind.github.io/GeoIP2-java/doc/v2.6.0/com/maxmind/geoip2/model/AbstractCityResponse.html#getLeastSpecificSubdivision-- | |
return {"country": result.country.name, "region": specific[0].name if specific else None} | |
df.withColumn("geoip", geoip("ip")).select("ip", "geoip.*").show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment