Skip to content

Instantly share code, notes, and snippets.

@xescuder
Created June 13, 2024 16:45
Show Gist options
  • Save xescuder/2ca528e8d94e23e7f95c897c1fd26a63 to your computer and use it in GitHub Desktop.
Save xescuder/2ca528e8d94e23e7f95c897c1fd26a63 to your computer and use it in GitHub Desktop.
Spark Load CSV into DB
import os
from urllib.request import urlopen
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from config import AppConfig
import findspark
from pipeline_utils import rename_cols, snake_case
from trading_data_pipeline.load_daily_bars_etl import JDBC_PROPERTIES
"""
LOAD ALL AVAILABLE EXCHANGES FROM EOD
NOT USED, exchanges are constrained to only defined in resources/indexes.csv
"""
findspark.init()
config = AppConfig(os.environ)
spark = SparkSession.builder \
.appName("Loading exchanges") \
.config("spark.jars", "./lib/postgresql-42.2.6.jar") \
.getOrCreate()
sc = spark.sparkContext
def extract():
schema = StructType([
StructField('Name', StringType(), True),
StructField('Code', StringType(), True),
StructField('CountryISO3', StringType(), True),
StructField('Currency', StringType(), True),
])
path = f'https://eodhd.com/api/exchanges-list/?api_token={config.EOD_API_TOKEN}'
data = urlopen(path).read().decode('utf-8')
rdd = sc.parallelize([data])
df = spark.read.json(path=rdd, schema=schema)
return df
def transform(ex_comp_df):
exchange_dict = {
"code": "symbol"
}
df = ex_comp_df.toDF(*[snake_case(c) for c in ex_comp_df.columns])
df = rename_cols(df, exchange_dict)
return df
def load(df):
df.write.jdbc(config.get_jdbc_url(), 'exchanges', mode='overwrite', properties=JDBC_PROPERTIES)
load(transform(extract()))
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment