Skip to content

Instantly share code, notes, and snippets.

View peacing's full-sized avatar

Paul Singman peacing

View GitHub Profile
select sessions,
sessions_with_purchase,
sessions_with_like,
ROUND(sessions_with_purchase/sessions::FLOAT *100, 2) AS percent_sessions_with_purchase,
ROUND(sessions_with_like/sessions::FLOAT *100, 2) AS percent_sessions_with_like
FROM (
SELECT COUNT(DISTINCT global_session_id) AS sessions,
COUNT(DISTINCT CASE WHEN event_action = 'purchase' THEN global_session_id ELSE NULL END) AS sessions_with_purchase,
COUNT(DISTINCT CASE WHEN event_action = 'like' THEN global_session_id ELSE NULL END) AS sessions_with_like
FROM (
---
name: Validate Loans and Payments
description: Run a series of data integrity checks on the loans and loan_payments tables
on:
pre-merge:
branches:
- main
hooks:
- id: execute_loans_validation_job
import os
import time
import requests
from flask import Flask, request
app = Flask(__name__)
def run_job(domain, token, job_id):
api_response = requests.post(
f'{domain}/api/2.0/jobs/run-now',
from airflow.decorators import dag
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
@dag()
def workflow():
spark_submit = SparkSubmitOperator(
task_id='spark_submit',
application='path/to/job/script.py',
application_args=['s3://example-bucket/path/to/input',
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from lakefs_provider.operators.create_branch_operator import CreateBranchOperator
from lakefs_provider.operators.merge_operator import MergeOperator
from lakefs_provider.operators.commit_operator import CommitOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
'owner': 'lakeFS', 'branch': 'example-branch', 'repo': 'example-repo',
@peacing
peacing / data_lake_processing.py
Last active March 26, 2021 11:57
example of bad data lake processing code
# ingest_filepath = s3://bucket/prod/users_dim/dt=2021-03-21/uuid.csv
filepath_tokens = ingest_filepath.split('/')
file_suffix = ingest_fileapth.spllit('.')[-1]
if file_suffix == 'csv':
copy_to_data_lake(ingest_filepath)
else: # don't move to lake unless len 5
if len(filepath_tokens) >= 5:
import boto3
import time
import json, datetime
import pandas as pd
import argparse
import os
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
transcribe = boto3.client('transcribe', region_name='us-east-1')
import awswrangler as wr
ipmort boto3
dynamo = boto3.resource('dynamodb')
table = dynamo.Table('data-services-dev-testing')
sqs = boto3.client('sqs')
def lambda_handler(event, context):
for record in event['Records']:
import pandas as pd
import boto3
df = pd.read_csv(file_path)
dynamo = boto3.resource('dynamodb')
table = dynamo.Table('data-services-dev-testing')
with table.batch_writer() as bw:
for i, record in enumerate(df.to_dict("records")):
import pandas as pd
import requests
error_list = []
df = pd.read_csv('promos.csv')
for i, row in df.iterrows():
if i % 100 == 0: