Skip to content

Instantly share code, notes, and snippets.

@afranzi
afranzi / redshift_lineage.sql
Created February 28, 2020 09:17
Redshift Interaction between tables
DROP TABLE IF EXISTS target_tables;
CREATE TEMP TABLE target_tables AS (
SELECT
DISTINCT tbl AS target_table_id,
sti.schema AS target_schema,
sti.table AS target_table,
sti.database AS cluster,
query
FROM stl_insert
JOIN SVV_TABLE_INFO sti ON sti.table_id = tbl
@afranzi
afranzi / column_description.sql
Created February 24, 2020 14:03
Redshift - Column description
SELECT
c.table_catalog as cluster,
c.table_schema as schema_name,
c.table_name as table_name,
c.column_name as col_name,
c.data_type as col_type ,
pgcd.description as col_description,
ordinal_position as col_sort_order
FROM
INFORMATION_SCHEMA.COLUMNS c
@afranzi
afranzi / table_description.sql
Created February 24, 2020 14:02
Redshift - Retrieve table descriptions
SELECT
t.table_catalog as cluster,
t.table_schema as schema_name,
t.table_name as table_name,
t.table_type as table_type,
pt.tableowner as table_owner,
pgtd.description as description
FROM
information_schema.tables as t
INNER JOIN pg_catalog.pg_statio_all_tables as st on
def personality_udf(personality_mapping: dict):
@udf(returnType=IntegerType())
def inner(cat):
return personality_mapping.get(cat, -1)
return inner
def compute_gvector_udf(personality_mapping: dict):
@udf(returnType=ArrayType(DoubleType()))
def inner(ngvector, cgvector, suma):
# Install quinn>=0.3.1
from quinn.extensions.dataframe_ext import DataFrame
def with_idx(id_col: str, output_col: str) -> Callable[[DataFrame], DataFrame]:
def inner(df: DataFrame) -> DataFrame:
window = Window.orderBy(id_col)
unique_activity_ids = df \
.select(id_col).distinct() \
.withColumn(output_col, F.row_number().over(window))
trait SparkWriter {
def write(df: DataFrame, mode: WriteMode = Overwrite): Unit
}
class SparkJsonWriter(path: String, partitions: Int = 1) extends SparkWriter {
def write(df: DataFrame, mode: WriteMode = Overwrite): Unit = {
df
.coalesce(partitions)
.write
.mode(mode)
object Filters {
def filter(filters: Seq[Column])(df: DataFrame): DataFrame = {
filters.foldLeft(df)((df, filter) => df.filter(filter))
}
}
trait SparkReader {
protected def execute(reader: DataFrameReader): DataFrame
def read(schema: Option[StructType] = None, filters: Seq[Column] = Seq.empty)(implicit sparkSession: SparkSession): DataFrame = {
class ActivityInsightsJob(activityReader: SparkReader,
analyticsReader: SparkReader,
insightsWriter: SparkWriter
)(implicit val sparkSession: SparkSession) extends SparkTask {
def run(): Unit = {
val metricsDF = analyticsReader.read(Some(AnalyticsSchema))
.transform(Events.isActivityImpression)
.transform(Events.isActivityView)
.transform(Events.isBookmarked)
@afranzi
afranzi / pyspark.sh
Created May 21, 2019 11:42
PySpark with org.apache.hadoop:hadoop-aws:2.8.5
export SPARK_OPTS='--packages org.apache.hadoop:hadoop-aws:2.8.5 --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider'
pyspark
@afranzi
afranzi / SparkWithHadoop.md
Last active February 18, 2024 18:45
Spark 2.4.0 with Hadoop 2.8.5

Setup Environmnet variables for Hadoop.

export HADOOP_VERSION=2.8.5
export HADOOP_HOME=${HOME}/hadoop-$HADOOP_VERSION
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=${HADOOP_HOME}/bin:$PATH

Download Hadoop files.