Created
January 4, 2019 08:45
-
-
Save schocco/2368140a14711d97addf06c51e6584df to your computer and use it in GitHub Desktop.
Import JSON Lines in batches into MongoDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import json | |
import os | |
from concurrent.futures import ProcessPoolExecutor | |
from pymongo import MongoClient | |
from gpx import Track | |
def to_document(base_dir, item): | |
try: | |
file_item = item["files"][0] | |
file_path = os.path.join(base_dir, file_item['path']) | |
track = Track(open(file_path)) | |
doc = { | |
"gpx": track.to_xml(), | |
"max_elevation": track.max_elevation, | |
"min_elevation": track.min_elevation, | |
"downhill": track.downhill, | |
"max_speed": track.max_speed, | |
"length_2d": track.length_2d, | |
"length_3d": track.length_3d, | |
"moving_time": track.moving_time, | |
"uphill": track.uphill, | |
"name": item["name"], | |
"url": item["url"], | |
"user": item["user"], | |
"start_time": track.start_time, | |
"end_time": track.end_time, | |
"difficulty": item["difficulty"], | |
"bounds": { | |
"min": {"type": "Point", "coordinates": [track.max_longitude, track.max_latitude]}, | |
"max": {"type": "Point", "coordinates": [track.min_longitude, track.min_latitude]} | |
} | |
} | |
return doc | |
except Exception as e: | |
print("Could not read {}".format(item["files"][0]), e) | |
return None | |
class JsonLinesImporter: | |
def __init__(self, file, batch_size=30, mongo_uri='mongodb://localhost:27017', db='tracks', collection='tracks'): | |
self.file = file | |
self.base_dir = os.path.dirname(file) | |
self.batch_size = batch_size | |
self.client = MongoClient(mongo_uri) | |
self.db = db | |
self.collection = collection | |
def read_lines(self): | |
with open(self.file) as f: | |
batch = [] | |
for line in f: | |
batch.append(json.loads(line)) | |
if len(batch) == self.batch_size: | |
yield batch | |
batch.clear() | |
yield batch | |
def save_to_mongodb(self): | |
db = self.client[self.db] | |
collection = db[self.collection] | |
for idx, batch in enumerate(self.read_lines()): | |
print("inserting batch", idx) | |
collection.insert_many(self.prepare_documents(batch)) | |
def prepare_documents(self, batch): | |
documents = [] | |
with ProcessPoolExecutor() as executor: | |
for document in executor.map(to_document, [self.base_dir] * len(batch), batch): | |
if document is not None: | |
documents.append(document) | |
return documents | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-i', '--input', required=True, help="input file in JSON Lines format") | |
parser.add_argument('-c', '--collection', required=True, | |
help="name of the mongodb collection where the tracks should be stored") | |
args = parser.parse_args() | |
importer = JsonLinesImporter(args.input, collection=args.collection) | |
importer.save_to_mongodb() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment