Skip to content

Instantly share code, notes, and snippets.

@subhash-sreenivasachar
Last active July 12, 2021 09:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save subhash-sreenivasachar/526221a4ede6053b1d576e666db8ec87 to your computer and use it in GitHub Desktop.
Save subhash-sreenivasachar/526221a4ede6053b1d576e666db8ec87 to your computer and use it in GitHub Desktop.
PySpark-LocalStack
def add_to_bucket(bucket_name: str, file_name: str):
try:
# host.docker.internal
s3 = boto3.client('s3',
endpoint_url="http://host.docker.internal:4566",
use_ssl=False,
aws_access_key_id='mock',
aws_secret_access_key='mock',
region_name='us-east-1')
s3.create_bucket(Bucket=bucket_name)
file_key = f'{os.getcwd()}/{file_name}'
with open(file_key, 'rb') as f:
s3.put_object(Body=f, Bucket=bucket_name, Key=file_name)
print(file_name)
return s3
except Exception as e:
print(e)
return None
def create_testing_pyspark_session():
print('creating pyspark session')
sparksession = (SparkSession.builder
.master('local[2]')
.appName('pyspark-demo')
.enableHiveSupport()
.getOrCreate())
hadoop_conf = sparksession.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
hadoop_conf.set("com.amazonaws.services.s3a.enableV4", "true")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", "mock")
hadoop_conf.set("fs.s3a.secret.key", "mock")
hadoop_conf.set("fs.s3a.session.token", "mock")
hadoop_conf.set("fs.s3a.endpoint", "http://host.docker.internal:4566")
return sparksession
version: '2'
services:
glue-service:
image: amazon/aws-glue-libs:glue_libs_1.0.0_image_01
container_name: "glue_ontainer_demo"
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
volumes:
- .:/opt
links:
- localstack-s3
environment:
S3_ENDPOINT: http://localstack:4566
localstack-s3:
image: localstack/localstack
container_name: "localstack_container_demo"
volumes:
- ./stubs/s3:/tmp/localstack
environment:
- SERVICES=s3
- DEFAULT_REGION=us-east-1
- HOSTNAME=localstack
- DATA_DIR=/tmp/localstack/data
- HOSTNAME_EXTERNAL=localstack
ports:
- "4566:4566"
FROM python:3.6.10
WORKDIR /opt
# By copying over requirements first, we make sure that Docker will cache
# our installed requirements rather than reinstall them on every build
COPY requirements.txt /opt/requirements.txt
RUN pip install -r requirements.txt
# Now copy in our code, and run it
COPY . /opt
import boto3
import os
from pyspark.sql import SparkSession
test_bucket = 'dummypysparkbucket'
# Write to S3 bucket
add_to_bucket(bucket_name=test_bucket, file_name='dummy.csv')
spark_session = create_testing_pyspark_session()
file_path = f's3a://{test_bucket}/dummy.csv'
# Read from s3 bucket
data_df = spark_session.read.option('delimiter', ',').option('header', 'true').option('inferSchema',
'False').format('csv').load(
file_path)
print(data_df.show())
moto[all]==2.0.5
# Write to S3 as parquet
write_path = f's3a://{test_bucket}/testparquet/'
data_df.write.parquet(write_path, mode='overwrite')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment