Skip to content

Instantly share code, notes, and snippets.

Avatar

Paul Singman peacing

View GitHub Profile
View validation_hooks.yaml
---
name: Validate Loands and Payments
description: Run a series of data integrity checks on the loans and loan_payments tables
on:
pre-merge:
branches:
- main
hooks:
- id: execute_loans_validation_job
View lakefs_hooks_server.py
import os
import time
import requests
from flask import Flask, request
app = Flask(__name__)
def run_job(domain, token, job_id):
api_response = requests.post(
f'{domain}/api/2.0/jobs/run-now',
View airflow-dag-simple.py
from airflow.decorators import dag
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
@dag()
def workflow():
spark_submit = SparkSubmitOperator(
task_id='spark_submit',
application='path/to/job/script.py',
application_args=['s3://example-bucket/path/to/input',
View airflow-lakefs-final.py
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from lakefs_provider.operators.create_branch_operator import CreateBranchOperator
from lakefs_provider.operators.merge_operator import MergeOperator
from lakefs_provider.operators.commit_operator import CommitOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
'owner': 'lakeFS', 'branch': 'example-branch', 'repo': 'example-repo',
@peacing
peacing / data_lake_processing.py
Last active Mar 26, 2021
example of bad data lake processing code
View data_lake_processing.py
# ingest_filepath = s3://bucket/prod/users_dim/dt=2021-03-21/uuid.csv
filepath_tokens = ingest_filepath.split('/')
file_suffix = ingest_fileapth.spllit('.')[-1]
if file_suffix == 'csv':
copy_to_data_lake(ingest_filepath)
else: # don't move to lake unless len 5
if len(filepath_tokens) >= 5:
View transcribe.py
import boto3
import time
import json, datetime
import pandas as pd
import argparse
import os
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
transcribe = boto3.client('transcribe', region_name='us-east-1')
View recursive_dynamo_write.py
import awswrangler as wr
ipmort boto3
dynamo = boto3.resource('dynamodb')
table = dynamo.Table('data-services-dev-testing')
sqs = boto3.client('sqs')
def lambda_handler(event, context):
for record in event['Records']:
View simple_dynamo_write.py
import pandas as pd
import boto3
df = pd.read_csv(file_path)
dynamo = boto3.resource('dynamodb')
table = dynamo.Table('data-services-dev-testing')
with table.batch_writer() as bw:
for i, record in enumerate(df.to_dict("records")):
View promo_v4.py
import pandas as pd
import requests
error_list = []
df = pd.read_csv('promos.csv')
for i, row in df.iterrows():
if i % 100 == 0:
View promo_v3.py
import pandas as pd
import requests
import time
df = pd.read_csv('promos.csv')
for i, row in df.iterrows():
time.sleep(1)