Last active
October 14, 2022 15:31
-
-
Save Shinichi-Nakagawa/67052d107249a86e9c6ad6ef107f6ee4 to your computer and use it in GitHub Desktop.
PyCon JP 2022資料用のスニペット
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
1. Sessionを作る | |
""" | |
from pyspark.sql import SparkSession | |
# BigQueryを使う&Stringの範囲を広げる | |
spark: SparkSession = SparkSession \ | |
.builder \ | |
.appName(‘app_yakiu')\ | |
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.25.2.jar') \ | |
.config('spark.sql.debug.maxToStringFields', 2000) \ | |
.getOrCreate() | |
# これはDataproc特有の定義, 実行時のtemporary領域 | |
spark.conf.set('temporaryGcsBucket', 'GCSのバケット名') | |
""" | |
2. Schemaを作る | |
""" | |
from pyspark.sql.types import StructType, StructField, DoubleType, DateType, StringType, LongType | |
# schema設定(ちょっと長い) | |
STATCAST_SCHEMA: StructType = StructType( | |
[ | |
StructField("pitch_type", StringType(), False), | |
StructField("game_date", DateType(), False), | |
StructField("release_speed", DoubleType(), False), | |
StructField("release_pos_x", DoubleType(), False), | |
StructField("release_pos_z", DoubleType(), False), | |
# 長すぎるので省略(91項目分ある) | |
StructField("spin_axis", DoubleType(), False), | |
StructField("delta_home_win_exp", DoubleType(), False), | |
StructField("delta_run_exp", DoubleType(), False) | |
] | |
) | |
""" | |
3. CSVを読み込む | |
""" | |
from pyspark.sql import DataFrame as SparkDataFrame | |
# 最初に生成したsparkセッションからread関数をCSV Format指定で動かす | |
def read_csv(date: str, filename: str, schema: StructType = None) -> SparkDataFrame: | |
try: | |
return spark.read \ | |
.format('csv') \ | |
.options(header="true", inferSchema="true") \ | |
.load(f'gs://your-bucket-name/path/{date}/{filename}', schema=schema) | |
except AnalysisException: | |
return None # type: ignore | |
sdf: SparkDataFrame = read_csv('2022-10-15', 'batter.csv', schema=STATCAST_SCHEMA) | |
""" | |
4. BigQuery保存 | |
""" | |
from pyspark.sql import DataFrame as SparkDataFrame | |
# BigQueryに保存する関数 | |
def save_bigquery(sdf: SparkDataFrame, table_name: str) -> None: | |
sdf.write\ | |
.mode('append') \ | |
.format('bigquery') \ | |
.option('project', 'your project name') \ | |
.option('table', f'dataset.{table_name}') \ | |
.option('temporaryGcsBucket', 'GCSのバケット名') \ | |
.option('createDisposition', 'CREATE_NEVER') \ | |
.save() | |
# Spark DataFrameとBigQueryのテーブル名を指定 | |
save_bigquery(sdf, 'batting_data') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment