Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Jerakin/1839dc7e2be01bc813893c0dc0aa870d to your computer and use it in GitHub Desktop.
Save Jerakin/1839dc7e2be01bc813893c0dc0aa870d to your computer and use it in GitHub Desktop.
events_to_csv_by_category.py
#!/usr/bin/python
from __future__ import print_function
import os.path
import csv
import json
import sys
import gzip
import ntpath
_json_loads = None
_json_module_name = None
def get_json_loads():
global _json_module_name, _json_loads
if _json_loads is None:
try:
import ujson
_json_loads = ujson.decode
_json_module_name = "ujson"
except ImportError:
try:
import cjson
_json_loads = cjson.decode
_json_module_name = "cjson"
except ImportError:
_json_loads = json.loads
_json_module_name = "json"
return _json_loads
def write_event(prefix, event_type, headers, event):
data = prepare_data(headers, event)
path = "%s_%s.csv" % (prefix, event_type)
with open(path, 'ab') as fp:
cw = csv.writer(fp, lineterminator="\n", quoting=csv.QUOTE_NONNUMERIC)
# write headers only to empty file
if os.stat(path)[6] == 0:
cw.writerow(headers)
cw.writerow(data)
def prepare_data(headers, event):
result = []
for header in headers:
if "." in header:
# handle 2nd level fields
keys = header.split(".")
if keys[0] in event:
if keys[1] in event[keys[0]]:
value = event[keys[0]][keys[1]]
result.append(value)
else:
result.append("null")
else:
result.append("null")
else:
# handle 1st level fields
if header in event:
value = event[header]
result.append(value)
else:
result.append("null")
return tuple(result)
def get_csv_header(event_type):
header = []
default_annotations_fields = [
"data.category", "data.sdk_version", "data.manufacturer", "data.session_id", "data.user_id",
"data.connection_type", "data.category", "data.client_ts", "data.build", "data.device",
"data.engine_version", "data.os_version", "data.platform", "data.ios_idfa", "data.google_aid", "data.v"
]
user_meta_fields = [
"user_meta.install_ts", "user_meta.first_build", "user_meta.is_converting", "user_meta.origin", "user_meta.is_paying",
"user_meta.pay_ft"
]
other_server_annotation = [
"country_code", "arrival_ts", "game_id", "ip", # "first_in_batch" not needed
]
header += default_annotations_fields
header += user_meta_fields
header += other_server_annotation
if event_type == "design":
header += ["data.event_id", "data.value"]
elif event_type == "error":
header += ["data.severity", "data.message"]
elif event_type == "user":
header += ["user_meta.install_ts", "user_meta.install_hour", "user_meta.cohort_week", "user_meta.cohort_month"]
elif event_type == "resource":
header += ["data.event_id", "data.amount"]
elif event_type == "business":
header += ["data.event_id", "data.amount", "data.currency", "data.transaction_num", "data.cart_type", "data.receipt_info"]
header += ["user_meta.receipt_status", "user_meta.revenue"]
elif event_type == "progression":
header += ["data.event_id", "data.attempt_num", "data.score"]
elif event_type == "session_end":
header += ["data.length"]
elif event_type == "sdk_error":
header += ["data.sdk_version", "data.v", "data.category", "data.type", "data.error_category", "data.error_area", "data.error_action"]
header = [x for x in header if x not in default_annotations_fields and x not in user_meta_fields] # sdk_error events are not a subject to standard default annotation or user_meta fields
return header
def main():
if len(sys.argv) < 2:
print("Usage: python events_to_csv_by_category.py <source_file>")
else:
get_json_loads()
source_name = sys.argv[1]
if os.path.isfile(source_name):
print("Uncompressing file...")
with gzip.open(source_name) as f:
file_prefix = os.path.splitext(
ntpath.basename(source_name))[0].replace(".json", "")
print("Reading events from file...")
i = 0
for raw_event in f:
event = _json_loads(raw_event.encode("ascii", "ignore"))
headers = get_csv_header(event["data"]["category"])
# report progress
sys.stdout.write("\r%d rows written..." % i)
sys.stdout.flush()
write_event(file_prefix, event["data"]["category"], headers, event)
i = i + 1
print("\nDone")
else:
print("Supplied source file does not exists!")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment