Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Writing Predictions to MongoDB using Kafka and Structured Streaming
# Make the prediction
predictions = rfc.transform(final_vectorized_features)
# Drop the features vector and prediction metadata to give the original fields
predictions = predictions.drop("Features_vec")
final_predictions = predictions.drop("indices").drop("values").drop("rawPrediction").drop("probability")
# Store the results to MongoDB
class MongoWriter:
def open(self, partition_id, epoch_id):
print(f"Opened partition id: {partition_id}, epoch: {epoch_id}")
self.mongo_client = pymongo.MongoClient()
print(f"Opened MongoClient: {self.mongo_client}")
return True
def process(self, row):
print(f"Processing row: {row}")
as_dict = row.asDict()
print(f"Inserting row.asDict(): {as_dict}")
id = self.mongo_client.agile_data_science.flight_delay_classification_response.insert_one(as_dict)
print(f"Inserted row, got ID: {id.inserted_id}")
self.mongo_client.close()
return True
def close(self, error):
print("Closed with error: %s" % str(error))
return True
query = final_predictions.writeStream.foreach(MongoWriter()).start()
query.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.