Skip to content

Instantly share code, notes, and snippets.

@michael-erasmus
Last active July 19, 2017 01:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michael-erasmus/618ab13a3493d4471b00c63337a7e2f6 to your computer and use it in GitHub Desktop.
Save michael-erasmus/618ab13a3493d4471b00c63337a7e2f6 to your computer and use it in GitHub Desktop.
import os
import grpc
import json
from rsdf import redshift
from buda.entities.funnel_pb2 import Funnel
from buda.entities.funnel_event_pb2 import FunnelEvent
from buda.entities.link_pb2 import Link
from buda.entities.uuid_pb2 import Uuid
import buda.services.events_collector_pb2_grpc as collector_grpc
def cursor_iter(cursor, chunk_size=500):
while True:
results = cursor.fetchmany(chunk_size)
if not results:
break
for result in results:
yield result
def make_funnel(record):
event = Funnel(
id=Uuid(id=record['id']),
user_id=Uuid(id=record['user_id']),
name=record['name'] or ''
)
event.created_at.FromDatetime(record['created_at'])
if record['tags']:
tags = json.loads(record['tags'])
for k,v in tags.items():
event.tags[k] = v or ''
return event
def make_funnel_event(record):
event = FunnelEvent(
id=Uuid(id=record['id']),
funnel_id=Uuid(id=record['funnel_id']),
funnel_step_id=Uuid(id=record['funnel_step_id']),
user_id=Uuid(id=record['user_id']),
funnel_end=(record['funnel_end'])
)
event.created_at.FromDatetime(record['created_at'])
if record['tags']:
tags = json.loads(record['tags'])
for k,v in tags.items():
event.tags[k] = v or ''
return event
ip = os.environ.get('EVENTS_COLLECTOR_HOSTNAME', 'events-collector')
channel = grpc.insecure_channel(ip + ':50051')
stub = collector_grpc.EventsCollectorStub(channel)
#funnels
query = """
select
json_extract_path_text(value, 'id') as id
, date as created_at
, user_visitor_id as user_id
, json_extract_path_text(value, 'name') as name
, json_extract_path_text(value, 'tags') as tags
from event_data
where type = 'funnel_created'
order by date desc
limit 2000
"""
engine = redshift.get_engine()
connection = engine.connect()
cursor = connection.execute(query)
for record in cursor_iter(cursor):
event = make_funnel(record)
stub.CollectFunnel(event)
#funnel events
query = """
select
date as created_at
, user_visitor_id as user_id
, md5(json_extract_path_text(value, 'funnel_id') || date_text) id
, json_extract_path_text(value, 'funnel_id') as funnel_id
, json_extract_path_text(value, 'funnel_step_id') as funnel_step_id
, ( case nullif(json_extract_path_text(value, 'funnel_end'), '')
when 'true' then 1
else 0
end
)::bool funnel_end
, nullif(json_extract_path_text(value, 'stripe_event_id'),'') as subscription_event_id
, json_extract_path_text(value, 'tags') as tags
from event_data
where type like 'funnel_event'
-- check for malformed data
and nullif(json_extract_path_text(value, 'state'), '') in ('0', '1', '2', '3', '4', '5')
and len(json_extract_path_text(value, 'funnel_id')) < 41
and len(json_extract_path_text(value, 'funnel_step_id')) < 41
order by date desc
limit 2000
"""
engine = redshift.get_engine()
connection = engine.connect()
cursor = connection.execute(query)
for record in cursor_iter(cursor):
event = make_funnel_event(record)
stub.CollectFunnelEvent(event)
#funnels
query = """
select
json_extract_path_text(value, 'id') as id
, date as created_at
, user_visitor_id as user_id
, json_extract_path_text(value, 'name') as name
, json_extract_path_text(value, 'tags') as tags
from event_data
where type = 'funnel_created'
order by date desc
limit 2000
"""
engine = redshift.get_engine()
connection = engine.connect()
cursor = connection.execute(query)
for record in cursor_iter(cursor):
event = make_funnel(record)
stub.CollectFunnel(event)
FROM python:3.6
ENV GRPC_PYTHON_VERSION 1.4.0
RUN python -m pip install --upgrade pip
RUN pip install grpcio==${GRPC_PYTHON_VERSION} grpcio-tools==${GRPC_PYTHON_VERSION}
COPY requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir -r /tmp/requirements.txt
WORKDIR /usr/src/app
COPY client.py /usr/src/app/client.py
CMD ["python", "client.py"]
IMAGE_NAME:=bufferapp/events-collector:backfill-funnels
.PHONY: all
all: run
.PHONY: run
run:
docker run -e EVENTS_COLLECTOR_HOSTNAME=localhost --env-file .env --net=host $(IMAGE_NAME)
.PHONY: build
build:
docker build . -t $(IMAGE_NAME)
https://github.com/bufferapp/buda-protobufs/releases/download/0.1.1/buda-python-0.1.1.tar.gz
git+https://github.com/bufferapp/rsdf
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment