Created
November 4, 2019 06:58
-
-
Save rjurney/ce1c607419e9b9b88674fe68211e8269 to your computer and use it in GitHub Desktop.
Writing Predictions to MongoDB using Kafka and Structured Streaming
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Make the prediction | |
predictions = rfc.transform(final_vectorized_features) | |
# Drop the features vector and prediction metadata to give the original fields | |
predictions = predictions.drop("Features_vec") | |
final_predictions = predictions.drop("indices").drop("values").drop("rawPrediction").drop("probability") | |
# Store the results to MongoDB | |
class MongoWriter: | |
def open(self, partition_id, epoch_id): | |
print(f"Opened partition id: {partition_id}, epoch: {epoch_id}") | |
self.mongo_client = pymongo.MongoClient() | |
print(f"Opened MongoClient: {self.mongo_client}") | |
return True | |
def process(self, row): | |
print(f"Processing row: {row}") | |
as_dict = row.asDict() | |
print(f"Inserting row.asDict(): {as_dict}") | |
id = self.mongo_client.agile_data_science.flight_delay_classification_response.insert_one(as_dict) | |
print(f"Inserted row, got ID: {id.inserted_id}") | |
self.mongo_client.close() | |
return True | |
def close(self, error): | |
print("Closed with error: %s" % str(error)) | |
return True | |
query = final_predictions.writeStream.foreach(MongoWriter()).start() | |
query.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment