Skip to content

Instantly share code, notes, and snippets.

@mooperd
Created November 27, 2016 19:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mooperd/368e3453c29694c8b2c038d6b7b4413a to your computer and use it in GitHub Desktop.
Save mooperd/368e3453c29694c8b2c038d6b7b4413a to your computer and use it in GitHub Desktop.
import boto3
import ujson
import arrow
import sys
import os
import getpass
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import *
sc = SparkContext()
sqlContext = SQLContext(sc)
user = getpass.getuser()
version = sys.version
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
LOGGER.info("Python Version: " + version)
LOGGER.info("User: " + user)
os.environ['HTTPS_PROXY'] = 'https://webproxy.aws.db.de:8080'
s3_list = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('time-waits-for-no-man')
for object in my_bucket.objects.filter(Prefix='1971-01-1'):
s3_list.append(object.key)
def add_timestamp(dict):
dict['timestamp'] = arrow.get(
int(dict['year']),
int(dict['month']),
int(dict['day']),
int(dict['hour']),
int(dict['minute']),
int(dict['second'])
).timestamp
return dict
def distributedJsonRead(s3Key):
LOGGER.info("Running distributedJsonRead")
s3obj = boto3.resource('s3').Object(bucket_name='time-waits-for-no-man', key=s3Key)
contents = s3obj.get()['Body'].read().decode()
meow = contents.splitlines()
result_wo_timestamp = map(ujson.loads, meow)
result_wi_timestamp = map(add_timestamp, result_wo_timestamp)
return result_wi_timestamp
schema = StructType(sorted([
StructField("timestamp", IntegerType(), True),
StructField("year", StringType(), True),
StructField("month", StringType(), True),
StructField("day", StringType(), True),
StructField("hour", StringType(), True),
StructField("minute", StringType(), True),
StructField("second", StringType(), True)]))
sqlContext = SQLContext(sc)
job = sc.parallelize(s3_list)
foo = job.flatMap(distributedJsonRead)
df = sqlContext.createDataFrame(foo, schema)
#df = foo.toDF()
df.show()
#df.printSchema()
#df.write.parquet('dates_by_seconds')
#df.write.parquet('dates_by_seconds', mode="overwrite", partitionBy=["second"])
sc.stop()
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment