Skip to content

Instantly share code, notes, and snippets.

@vsouza
Created October 5, 2016 02:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vsouza/03d092905dee189dc7df9ab76fe88123 to your computer and use it in GitHub Desktop.
Save vsouza/03d092905dee189dc7df9ab76fe88123 to your computer and use it in GitHub Desktop.
def toRedshift(time, rdd):
try:
sqlContext = getSqlContextInstance(rdd.context)
schema = StructType([
StructField('user_id', StringType(), True),
StructField('device_id', StringType(), True),
StructField('steps', IntegerType(), True),
StructField('battery_level', IntegerType(), True),
StructField('calories_spent', IntegerType(), True),
StructField('distance', FloatType(), True),
StructField('current_time', IntegerType(), True),
])
df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("activity_log")
df.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshiftURL.com:5439/database?user=USERNAME&password=PASSWORD") \
.option("dbtable", "activity_log") \
.option("tempdir", "s3n://spark-temp-data/") \
.mode("append") \
.save()
except Exception as e:
raise(e)
py_rdd.foreachRDD(process)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment