Skip to content

Instantly share code, notes, and snippets.

@rvilla87
Created August 6, 2017 16:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rvilla87/4ff36b9b78dd65947ed761ebb54a7bb6 to your computer and use it in GitHub Desktop.
Save rvilla87/4ff36b9b78dd65947ed761ebb54a7bb6 to your computer and use it in GitHub Desktop.
Inserting documents in MongoDB with Spark Connector (Dataframe vs Spark Structured Streaming)
// Dataframe (supported) - read 1 file, no streaming
// Step 1, create the Dataframe source
val fileDF = spark
.read // No streaming
.csv("file/file1.csv")
.selectExpr("CAST(key as String)", // more code with other casting...
)
// Out [1]: fileDF: org.apache.spark.sql.package.DataFrame = [key: string, country: string ... 6 more fields]
// Step 2, insert Dataframe into MongoDB
MongoSpark.save(fileDF.write.option("collection", "trends").mode("append"))
// inserts with no problem all the documents contained at the Dataframe with the correct structure
//////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////
// Spark Structured Streaming (not supported) - read multiple files, streaming
// Step 1, create the Dataframe source
val filesDS = spark
.readStream // Streaming
.csv("trendFiles/*.csv")
.selectExpr("CAST(key as String)", // more code with other casting...
)
// Out [2]: filesDS: org.apache.spark.sql.package.DataFrame = [key: string, country: string ... 6 more fields]
// Step 2, insert Dataframe into MongoDB
// I cannot see MongoSpark.save method for Stuctured Streaming.
@rhamzei
Copy link

rhamzei commented Feb 28, 2021

Do you have any solution for Sinking from Kafka to MongoDb using "PySpark Structured Streaming"?

@rvilla87
Copy link
Author

I opened a ticket in MongoDB's Jira some years ago:
https://jira.mongodb.org/browse/SPARK-134

You can see the details in the comments. I did not test it but it seems the solution is to use MongoConnector directly (Ross Lawley updated master and Christiaan Ras used it with no problem). Sorry if I could not provide more useful help.

@rhamzei
Copy link

rhamzei commented Mar 4, 2021

Many thanks, Amy example for structured streaming using python(pyspark)?

@rvilla87
Copy link
Author

rvilla87 commented Mar 9, 2021

No, sorry, I did not code structured streaming using Python.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment