Skip to content

Instantly share code, notes, and snippets.

@theikkila
Last active September 11, 2018 15:12
Show Gist options
  • Save theikkila/a9fb9c7b6735e076454c0267d753322e to your computer and use it in GitHub Desktop.
Save theikkila/a9fb9c7b6735e076454c0267d753322e to your computer and use it in GitHub Desktop.
logs = spark.read.json("in/logs/*.json.gz").createOrReplaceTempView('logs')
users = spark.read.json('in/user_hostname_mapping.json').createOrReplaceTempView('users')
billing_stats = spark.sql("""
SELECT user,
sum(size) as total_bandwidth
FROM logs
INNER JOIN users ON logs.hostname = users.hostname
GROUP BY user""")
billing_stats.coalesce(1).write.json('out/billing_stats')
{"hostname": "kissa.com", "path": "/lol", "size": 12838}
{"hostname": "kissa.com", "path": "/lol", "size": 12838}
{"hostname": "kissa.com", "path": "/bal", "size": 12838}
{"hostname": "kissa.com", "path": "/bal", "size": 12838}
{"hostname": "karmes.com", "path": "/bal", "size": 4222}
{"hostname": "karmes.com", "path": "/bal", "size": 4222}
{"hostname": "karmes.com", "path": "/bal", "size": 4222}
{"hostname": "karmes.com", "path": "/bal", "size": 4222}
{"hostname": "karmes.com", "path": "/lol", "size": 4222}
{"hostname": "karmes.com", "path": "/lol", "size": 4222}
{"hostname": "karmes.com", "path": "/lol", "size": 4222}
{"hostname": "karmes.com", "path": "/lol", "size": 12838}
{"hostname": "karmes.com", "path": "/lol", "size": 12838}
{"hostname": "kissa.com", "path": "/lol", "size": 12838}
{"hostname": "kissa.com", "path": "/lol", "size": 12838}
{"hostname": "kissa.com", "path": "/lol", "size": 12838}
{"hostname": "kissa.com", "path": "/lol", "size": 12838}
{"hostname": "katti.com", "path": "/hei", "size": 8732}
{"hostname": "katti.com", "path": "/hei", "size": 8732}
{"user": "jamo", "hostname": "katti.com"}
{"user": "jamo", "hostname": "kissa.com"}
{"user": "karmemies", "hostname": "karmes.com"}
{"user": "joni", "hostname": "vuohi.com"}
# Read compressed logs
logs = spark.read.json("in/logs/*.json.gz")
# Read user-hostname mapping
users = spark.read.json('in/user_hostname_mapping.json')
# 1. Join user-mappings
# 2. Group by 'user'
# 3. Aggregate bandwidth_total from 'size' of each user requests
billing_stats = logs.join(users, 'hostname').groupBy('user').agg(F.sum('size').alias('total_bandwidth'))
# Write out in single JSON-file
billing_stats.coalesce(1).write.json('out/billing_stats')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment