Skip to content

Instantly share code, notes, and snippets.

View peacing's full-sized avatar

Paul Singman peacing

View GitHub Profile
select sessions,
sessions_with_purchase,
sessions_with_like,
ROUND(sessions_with_purchase/sessions::FLOAT *100, 2) AS percent_sessions_with_purchase,
ROUND(sessions_with_like/sessions::FLOAT *100, 2) AS percent_sessions_with_like
FROM (
SELECT COUNT(DISTINCT global_session_id) AS sessions,
COUNT(DISTINCT CASE WHEN event_action = 'purchase' THEN global_session_id ELSE NULL END) AS sessions_with_purchase,
COUNT(DISTINCT CASE WHEN event_action = 'like' THEN global_session_id ELSE NULL END) AS sessions_with_like
FROM (
@peacing
peacing / sessions.sql
Last active September 29, 2021 17:52
create table user_sessions
USING DELTA AS
( select user_id, event_date, event_action,
SUM(is_new_session) OVER (ORDER BY user_id, event_date) AS global_session_id,
SUM(is_new_session) OVER (PARTITION BY user_id ORDER BY event_date) AS user_session_id
FROM
( select *,
CASE WHEN unix_timestamp(event_date) - unix_timestamp(last_event) >= (24*60*60) OR
last_event is NULL THEN 1 ELSE 0 END AS is_new_session
from (
import os
import time
import requests
from flask import Flask, request
app = Flask(__name__)
def run_job(domain, token, job_id):
api_response = requests.post(
f'{domain}/api/2.0/jobs/run-now',
---
name: Validate Loans and Payments
description: Run a series of data integrity checks on the loans and loan_payments tables
on:
pre-merge:
branches:
- main
hooks:
- id: execute_loans_validation_job
import boto3
import pytest
from moto import mock_dynamodb2
from lambda_function import *
@mock_dynamodb2
def test_lambda_handler():
table_name = 'user_spending'
from airflow.decorators import dag
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
@dag()
def workflow():
spark_submit = SparkSubmitOperator(
task_id='spark_submit',
application='path/to/job/script.py',
application_args=['s3://example-bucket/path/to/input',
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from lakefs_provider.operators.create_branch_operator import CreateBranchOperator
from lakefs_provider.operators.merge_operator import MergeOperator
from lakefs_provider.operators.commit_operator import CommitOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
'owner': 'lakeFS', 'branch': 'example-branch', 'repo': 'example-repo',
@peacing
peacing / data_lake_processing.py
Last active March 26, 2021 11:57
example of bad data lake processing code
# ingest_filepath = s3://bucket/prod/users_dim/dt=2021-03-21/uuid.csv
filepath_tokens = ingest_filepath.split('/')
file_suffix = ingest_fileapth.spllit('.')[-1]
if file_suffix == 'csv':
copy_to_data_lake(ingest_filepath)
else: # don't move to lake unless len 5
if len(filepath_tokens) >= 5:
import boto3
import time
import json, datetime
import pandas as pd
import argparse
import os
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
transcribe = boto3.client('transcribe', region_name='us-east-1')
import awswrangler as wr
ipmort boto3
dynamo = boto3.resource('dynamodb')
table = dynamo.Table('data-services-dev-testing')
sqs = boto3.client('sqs')
def lambda_handler(event, context):
for record in event['Records']: