Setup Environmnet variables for Hadoop.
export HADOOP_VERSION=2.8.5
export HADOOP_HOME=${HOME}/hadoop-$HADOOP_VERSION
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=${HADOOP_HOME}/bin:$PATH
Download Hadoop files.
Setup Environmnet variables for Hadoop.
export HADOOP_VERSION=2.8.5
export HADOOP_HOME=${HOME}/hadoop-$HADOOP_VERSION
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=${HADOOP_HOME}/bin:$PATH
Download Hadoop files.
DROP TABLE IF EXISTS target_tables; | |
CREATE TEMP TABLE target_tables AS ( | |
SELECT | |
DISTINCT tbl AS target_table_id, | |
sti.schema AS target_schema, | |
sti.table AS target_table, | |
sti.database AS cluster, | |
query | |
FROM stl_insert | |
JOIN SVV_TABLE_INFO sti ON sti.table_id = tbl |
SELECT | |
c.table_catalog as cluster, | |
c.table_schema as schema_name, | |
c.table_name as table_name, | |
c.column_name as col_name, | |
c.data_type as col_type , | |
pgcd.description as col_description, | |
ordinal_position as col_sort_order | |
FROM | |
INFORMATION_SCHEMA.COLUMNS c |
SELECT | |
t.table_catalog as cluster, | |
t.table_schema as schema_name, | |
t.table_name as table_name, | |
t.table_type as table_type, | |
pt.tableowner as table_owner, | |
pgtd.description as description | |
FROM | |
information_schema.tables as t | |
INNER JOIN pg_catalog.pg_statio_all_tables as st on |
/* | |
* Copyright (c) 2018 Schibsted Media Group. All rights reserved | |
*/ | |
package com.saas.presto.access.authentication | |
import java.security.Principal | |
import java.util.concurrent.TimeUnit.MILLISECONDS | |
import com.facebook.presto.spi.security.{AccessDeniedException, BasicPrincipal, PasswordAuthenticator} | |
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} |
import argparse | |
import errno | |
import os | |
import sys | |
from jinja2 import Environment, FileSystemLoader, StrictUndefined | |
UTF8 = 'utf8' | |
TEMPLATE = 'template' |
def personality_udf(personality_mapping: dict): | |
@udf(returnType=IntegerType()) | |
def inner(cat): | |
return personality_mapping.get(cat, -1) | |
return inner | |
def compute_gvector_udf(personality_mapping: dict): | |
@udf(returnType=ArrayType(DoubleType())) | |
def inner(ngvector, cgvector, suma): |
# Install quinn>=0.3.1 | |
from quinn.extensions.dataframe_ext import DataFrame | |
def with_idx(id_col: str, output_col: str) -> Callable[[DataFrame], DataFrame]: | |
def inner(df: DataFrame) -> DataFrame: | |
window = Window.orderBy(id_col) | |
unique_activity_ids = df \ | |
.select(id_col).distinct() \ | |
.withColumn(output_col, F.row_number().over(window)) |
trait SparkWriter { | |
def write(df: DataFrame, mode: WriteMode = Overwrite): Unit | |
} | |
class SparkJsonWriter(path: String, partitions: Int = 1) extends SparkWriter { | |
def write(df: DataFrame, mode: WriteMode = Overwrite): Unit = { | |
df | |
.coalesce(partitions) | |
.write | |
.mode(mode) |
object Filters { | |
def filter(filters: Seq[Column])(df: DataFrame): DataFrame = { | |
filters.foldLeft(df)((df, filter) => df.filter(filter)) | |
} | |
} | |
trait SparkReader { | |
protected def execute(reader: DataFrameReader): DataFrame | |
def read(schema: Option[StructType] = None, filters: Seq[Column] = Seq.empty)(implicit sparkSession: SparkSession): DataFrame = { |