Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Spark 2.X MongoDB
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("demography mapper") \
.getOrCreate()
df_user = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.input.uri", "mongodb://localhost:27017/raw.user").load()
print(df_user.printSchema())
df_user = df_user.repartition(2)
print(df_user.count())
df_demography = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.input.uri", "mongodb://localhost:27017/raw.demography").load()
print(df_demography.printSchema())
df_demography = df_demography.repartition(2)
print(df_demography.count())
df_result = df_user.join(df_demography, on="user", how="left_outer").cache()
print(df_result.printSchema())
print(df_result.count())
df_result.write.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/raw.user_demography")\
.save()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.