Skip to content

Instantly share code, notes, and snippets.

@etheleon
Last active May 18, 2021 08:03
Show Gist options
  • Save etheleon/caa944b36077f83b7a448b9b03779216 to your computer and use it in GitHub Desktop.
Save etheleon/caa944b36077f83b7a448b9b03779216 to your computer and use it in GitHub Desktop.
accompanying gist for datalake article
-- ALTER TABLE schema.table DROP IF EXISTS PARTITION (year='2021', month='01', day='11', hour='01')
ALTER TABLE pricing.demand_tbl ADD
PARTITION (year='2021', month='01', day='11', hour='01')
LOCATION 's3://datascience-bucket/wesley.goi/data/pricing/demand_tbl/year=2021/month=01/day=11/hour=01'
MSCK REPAIR TABLE schema.table
CACHE TABLE <tablename>
SELECT /*+ REPARTITION(100), COALESCE(500), REPARTITION_BY_RANGE(3, c) */ * FROM t
spark._jsparkSession.catalog().tableExists(schema, table)
-- DROP TABLE IF EXISTS schema.table
CREATE EXTERNAL TABLE IF NOT EXISTS pricing.demand_tbl
(
country_id bigint,
city_id bigint,
utcDate string,
user_id bigint
)
PARTITIONED BY (year string, month string, day string hour string)
LOCATION 's3a://datascience-bucket/wesley.goi/data/pricing/demand_tbl/' -- base path
STORED AS PARQUET
import pyspark
from pyspark.sql import SparkSession
conf = (
pyspark.SparkConf()
.set("spark.executor.instances", num_executors)
)
spark = (
SparkSession.builder
.appName("my_app_name")
.config(conf=conf)
.enableHiveSupport()
.getOrCreate()
)
library(sparklyr)
conf <- spark_config()
conf$spark.executor.memory = "16G"
conf$spark.executor.instance = num_executors
spark <- spark_connect(
master = "local",
version = "2.3",
appName = "my_app_name"
conf = conf
)
CREATE TEMPORARY VIEW my_table_name IF NOT EXISTS AS
<QUERY>
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
def some_func(a: int, b: int) -> int:
return a+b
# pyspark
udf_some_func = F.udf(some_func, FloatType())
# in SQL
# SELECT *, UDF_SOME_FUNC(col_a, col_b) FROM table
spark.udf.register("udf_some_func", some_func, StringType())
[
{
"description": "[DESCRIPTION]",
"name": "[NAME]",
"type": "[TYPE]",
"mode": "[MODE]"
}
....
]
-- spark2
-- SELECT /*+ REPARTITION 3000 */ *
SELECT /*+ REPARTITION(2000, col1, col2, col3) */ *
FROM table
# Infer column data types from dataframe
## partition columns
partition_cols_dict = {
"year": "string",
"month": "string",
"day": "string",
"hour": "string"
}
partitions = ", ".join([f"{key} {partition_cols_dict[key]}" for key in partition_cols_dict])
# print(partition_cols)
# year string, month string, day string, hour string
# non-partition columns
partition_cols = [key for key in partition_cols_dict]
columns = ", \n".join(
spark.sql("describe temp")
.rdd.filter(lambda row: row.col_name not in partition_cols)
.map(lambda row: row[0] + " " + row[1])
.collect()
)
# print(columns)
# country_id bigint, city_id bigint, utcDate string, user_id bigint
SELECT *
FROM table
WHERE year||month||day BETWEEN 20210301 AND 20210415
df = spark.read.parquet("s3://<bucket>/<suffix>/year=*/month=*/day=*/hour=*")
REFRESH TABLE pricing.demand_tbl
PARTITION (year='2021', month='01', day='11', hour='01')
from spark.sql import Dataframe
def save_to_tfrecord(df: DataFrame, path):
"""
Saves Spark dataframe to tfrecord in S3
Parameter
------
df: DataFrame
spark dataframe
path: string
file path, if it's S3 eg.
s3://<bucket>/some/path/tfrecord.
It'll save the part files under the this folder gzipped
"""
(
df.write
.format("tfrecords")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.mode("overwrite")
.save(path)
)
SELECT *
FROM table
WHERE date BETWEEN 20210301 AND 20210415
# enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
# enable shuffle partitions optimisation
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment