Skip to content

Instantly share code, notes, and snippets.

@Shinichi-Nakagawa
Last active October 14, 2022 15:31
Show Gist options
  • Save Shinichi-Nakagawa/67052d107249a86e9c6ad6ef107f6ee4 to your computer and use it in GitHub Desktop.
Save Shinichi-Nakagawa/67052d107249a86e9c6ad6ef107f6ee4 to your computer and use it in GitHub Desktop.
PyCon JP 2022資料用のスニペット
"""
1. Sessionを作る
"""
from pyspark.sql import SparkSession
# BigQueryを使う&Stringの範囲を広げる
spark: SparkSession = SparkSession \
.builder \
.appName(‘app_yakiu')\
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.25.2.jar') \
.config('spark.sql.debug.maxToStringFields', 2000) \
.getOrCreate()
# これはDataproc特有の定義, 実行時のtemporary領域
spark.conf.set('temporaryGcsBucket', 'GCSのバケット名')
"""
2. Schemaを作る
"""
from pyspark.sql.types import StructType, StructField, DoubleType, DateType, StringType, LongType
# schema設定(ちょっと長い)
STATCAST_SCHEMA: StructType = StructType(
[
StructField("pitch_type", StringType(), False),
StructField("game_date", DateType(), False),
StructField("release_speed", DoubleType(), False),
StructField("release_pos_x", DoubleType(), False),
StructField("release_pos_z", DoubleType(), False),
# 長すぎるので省略(91項目分ある)
StructField("spin_axis", DoubleType(), False),
StructField("delta_home_win_exp", DoubleType(), False),
StructField("delta_run_exp", DoubleType(), False)
]
)
"""
3. CSVを読み込む
"""
from pyspark.sql import DataFrame as SparkDataFrame
# 最初に生成したsparkセッションからread関数をCSV Format指定で動かす
def read_csv(date: str, filename: str, schema: StructType = None) -> SparkDataFrame:
try:
return spark.read \
.format('csv') \
.options(header="true", inferSchema="true") \
.load(f'gs://your-bucket-name/path/{date}/{filename}', schema=schema)
except AnalysisException:
return None # type: ignore
sdf: SparkDataFrame = read_csv('2022-10-15', 'batter.csv', schema=STATCAST_SCHEMA)
"""
4. BigQuery保存
"""
from pyspark.sql import DataFrame as SparkDataFrame
# BigQueryに保存する関数
def save_bigquery(sdf: SparkDataFrame, table_name: str) -> None:
sdf.write\
.mode('append') \
.format('bigquery') \
.option('project', 'your project name') \
.option('table', f'dataset.{table_name}') \
.option('temporaryGcsBucket', 'GCSのバケット名') \
.option('createDisposition', 'CREATE_NEVER') \
.save()
# Spark DataFrameとBigQueryのテーブル名を指定
save_bigquery(sdf, 'batting_data')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment