Skip to content

Instantly share code, notes, and snippets.

@amitu
Last active December 24, 2015 07:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amitu/6766418 to your computer and use it in GitHub Desktop.
Save amitu/6766418 to your computer and use it in GitHub Desktop.
Upload some test data to keen.io
# find the list of backup files
# for each file see the position till where we have read the file from "stack"
# for each file see if there is more in it.
# if there is more, start reading and writing to keen
# find total size of all files
# go thru in ls -t order so we always traverse them in same order
from progressbar import ProgressBar
import keen, pdb, json, time
from path import path
from datetime import datetime
from datautils.stacker import STACK
import logging
logger = logging.getLogger('keen')
hdlr = logging.FileHandler('keen.log')
formatter = logging.Formatter(
'%(asctime)s %(levelname)s: %(filename)s:%(lineno)s %(message)s'
)
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.DEBUG)
MARKER = ':Event::'
keen.project_id = "..."
keen.write_key = "..."
keen.read_key = "..."
def find_backup_files():
#return path("old").files()
return path("/mnt/logs/all.old/").files()
def find_total_bytes():
size = 0
for f in find_backup_files():
size += f.size
return size
def incr(progress, amnt):
progress.update(progress.currval + amnt)
EVENT_BUFFER = []
def buffer_event(evt):
EVENT_BUFFER.append(evt)
if len(EVENT_BUFFER) > 4000:
return flush_events()
def flush_events():
logger.info("flushing %s events" % len(EVENT_BUFFER))
if EVENT_BUFFER:
d = {}
for evt in EVENT_BUFFER:
name = evt.pop("name")
data = {}
for k, v in evt.items():
data[k.replace(".", "_")] = v
d.setdefault(name, []).append(data)
start = time.time()
keen.add_events(d)
logger.info("flushed in %s" % (time.time() - start))
#print json.dumps(d, sort_keys=True, indent=4, separators=(',', ': '))
del EVENT_BUFFER[:]
return True
def handle_line(line):
if MARKER in line:
ts, data = line.split(MARKER, 1)
try:
evt = json.loads(data)
except Exception:
return
name = evt["name"]
if name.startswith("Email."):
evt["kind"] = name.split(".", 1)[1]
name = "EmailSent"
elif name.startswith("EmailClick."):
evt["kind"] = name.split(".", 1)[1]
name = "EmailClick"
elif name.startswith("EmailOpened."):
evt["kind"] = name.split(".", 1)[1]
name = "EmailOpened"
elif name.startswith("TrackingResource."):
evt["kind"] = name.split(".", 1)[1]
name = "TrackingResource"
else:
#print "dropping", name
return
#data = {"name": name, "kind": evt["kind"]}
evt["name"] = name
month, day, ts = ts.split()[:3]
hh, mm, ss = ts.split(":")
hh, mm, ss = int(hh), int(mm), int(ss)
try:
d = datetime(2013, {"Aug": 8}.get(month, 9), int(day), hh, mm, ss)
except Exception:
pdb.set_trace()
evt["keen"] = {"timestamp": d.isoformat()}
#print evt
return buffer_event(evt)
def upload_file(f, progress):
# find till where it is done
abspath = f.abspath()
position = STACK.get(abspath, 0)
handle = f.open()
if position:
incr(progress, position)
handle.seek(position, 0)
count = 0
logger.info("processing %s" % f)
for line in handle.xreadlines():
count += 1
if count % 10000 == 0:
logger.info("%s records in %s" % (count, f))
written = handle_line(line)
position = handle.tell()
incr(progress, len(line))
if written: STACK[abspath] = position
flush_events()
STACK[abspath] = position
def main():
progress = ProgressBar(maxval=find_total_bytes())
progress.start()
for f in find_backup_files():
#print f
upload_file(f, progress)
def test():
keen.add_event("sign_ups", {
"username": "lloyd2",
"referred_by": "harry"
})
keen.add_events({
"sign_ups2": [
{ "username": "nameuser1" },
{ "username": "nameuser2" }
],
"purchases2": [
{ "price": 5 },
{ "price": 6 }
]
})
if __name__ == "__main__":
main()
#test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment