Skip to content

Instantly share code, notes, and snippets.

@rjurney
Created November 4, 2019 06:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjurney/ce1c607419e9b9b88674fe68211e8269 to your computer and use it in GitHub Desktop.
Save rjurney/ce1c607419e9b9b88674fe68211e8269 to your computer and use it in GitHub Desktop.
Writing Predictions to MongoDB using Kafka and Structured Streaming
# Make the prediction
predictions = rfc.transform(final_vectorized_features)
# Drop the features vector and prediction metadata to give the original fields
predictions = predictions.drop("Features_vec")
final_predictions = predictions.drop("indices").drop("values").drop("rawPrediction").drop("probability")
# Store the results to MongoDB
class MongoWriter:
def open(self, partition_id, epoch_id):
print(f"Opened partition id: {partition_id}, epoch: {epoch_id}")
self.mongo_client = pymongo.MongoClient()
print(f"Opened MongoClient: {self.mongo_client}")
return True
def process(self, row):
print(f"Processing row: {row}")
as_dict = row.asDict()
print(f"Inserting row.asDict(): {as_dict}")
id = self.mongo_client.agile_data_science.flight_delay_classification_response.insert_one(as_dict)
print(f"Inserted row, got ID: {id.inserted_id}")
self.mongo_client.close()
return True
def close(self, error):
print("Closed with error: %s" % str(error))
return True
query = final_predictions.writeStream.foreach(MongoWriter()).start()
query.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment