Last active
December 24, 2015 07:49
-
-
Save amitu/6766418 to your computer and use it in GitHub Desktop.
Upload some test data to keen.io
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# find the list of backup files | |
# for each file see the position till where we have read the file from "stack" | |
# for each file see if there is more in it. | |
# if there is more, start reading and writing to keen | |
# find total size of all files | |
# go thru in ls -t order so we always traverse them in same order | |
from progressbar import ProgressBar | |
import keen, pdb, json, time | |
from path import path | |
from datetime import datetime | |
from datautils.stacker import STACK | |
import logging | |
logger = logging.getLogger('keen') | |
hdlr = logging.FileHandler('keen.log') | |
formatter = logging.Formatter( | |
'%(asctime)s %(levelname)s: %(filename)s:%(lineno)s %(message)s' | |
) | |
hdlr.setFormatter(formatter) | |
logger.addHandler(hdlr) | |
logger.setLevel(logging.DEBUG) | |
MARKER = ':Event::' | |
keen.project_id = "..." | |
keen.write_key = "..." | |
keen.read_key = "..." | |
def find_backup_files(): | |
#return path("old").files() | |
return path("/mnt/logs/all.old/").files() | |
def find_total_bytes(): | |
size = 0 | |
for f in find_backup_files(): | |
size += f.size | |
return size | |
def incr(progress, amnt): | |
progress.update(progress.currval + amnt) | |
EVENT_BUFFER = [] | |
def buffer_event(evt): | |
EVENT_BUFFER.append(evt) | |
if len(EVENT_BUFFER) > 4000: | |
return flush_events() | |
def flush_events(): | |
logger.info("flushing %s events" % len(EVENT_BUFFER)) | |
if EVENT_BUFFER: | |
d = {} | |
for evt in EVENT_BUFFER: | |
name = evt.pop("name") | |
data = {} | |
for k, v in evt.items(): | |
data[k.replace(".", "_")] = v | |
d.setdefault(name, []).append(data) | |
start = time.time() | |
keen.add_events(d) | |
logger.info("flushed in %s" % (time.time() - start)) | |
#print json.dumps(d, sort_keys=True, indent=4, separators=(',', ': ')) | |
del EVENT_BUFFER[:] | |
return True | |
def handle_line(line): | |
if MARKER in line: | |
ts, data = line.split(MARKER, 1) | |
try: | |
evt = json.loads(data) | |
except Exception: | |
return | |
name = evt["name"] | |
if name.startswith("Email."): | |
evt["kind"] = name.split(".", 1)[1] | |
name = "EmailSent" | |
elif name.startswith("EmailClick."): | |
evt["kind"] = name.split(".", 1)[1] | |
name = "EmailClick" | |
elif name.startswith("EmailOpened."): | |
evt["kind"] = name.split(".", 1)[1] | |
name = "EmailOpened" | |
elif name.startswith("TrackingResource."): | |
evt["kind"] = name.split(".", 1)[1] | |
name = "TrackingResource" | |
else: | |
#print "dropping", name | |
return | |
#data = {"name": name, "kind": evt["kind"]} | |
evt["name"] = name | |
month, day, ts = ts.split()[:3] | |
hh, mm, ss = ts.split(":") | |
hh, mm, ss = int(hh), int(mm), int(ss) | |
try: | |
d = datetime(2013, {"Aug": 8}.get(month, 9), int(day), hh, mm, ss) | |
except Exception: | |
pdb.set_trace() | |
evt["keen"] = {"timestamp": d.isoformat()} | |
#print evt | |
return buffer_event(evt) | |
def upload_file(f, progress): | |
# find till where it is done | |
abspath = f.abspath() | |
position = STACK.get(abspath, 0) | |
handle = f.open() | |
if position: | |
incr(progress, position) | |
handle.seek(position, 0) | |
count = 0 | |
logger.info("processing %s" % f) | |
for line in handle.xreadlines(): | |
count += 1 | |
if count % 10000 == 0: | |
logger.info("%s records in %s" % (count, f)) | |
written = handle_line(line) | |
position = handle.tell() | |
incr(progress, len(line)) | |
if written: STACK[abspath] = position | |
flush_events() | |
STACK[abspath] = position | |
def main(): | |
progress = ProgressBar(maxval=find_total_bytes()) | |
progress.start() | |
for f in find_backup_files(): | |
#print f | |
upload_file(f, progress) | |
def test(): | |
keen.add_event("sign_ups", { | |
"username": "lloyd2", | |
"referred_by": "harry" | |
}) | |
keen.add_events({ | |
"sign_ups2": [ | |
{ "username": "nameuser1" }, | |
{ "username": "nameuser2" } | |
], | |
"purchases2": [ | |
{ "price": 5 }, | |
{ "price": 6 } | |
] | |
}) | |
if __name__ == "__main__": | |
main() | |
#test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment