Skip to content

Instantly share code, notes, and snippets.

@schocco
Created January 4, 2019 08:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save schocco/2368140a14711d97addf06c51e6584df to your computer and use it in GitHub Desktop.
Save schocco/2368140a14711d97addf06c51e6584df to your computer and use it in GitHub Desktop.
Import JSON Lines in batches into MongoDB
#!/usr/bin/env python3
import argparse
import json
import os
from concurrent.futures import ProcessPoolExecutor
from pymongo import MongoClient
from gpx import Track
def to_document(base_dir, item):
try:
file_item = item["files"][0]
file_path = os.path.join(base_dir, file_item['path'])
track = Track(open(file_path))
doc = {
"gpx": track.to_xml(),
"max_elevation": track.max_elevation,
"min_elevation": track.min_elevation,
"downhill": track.downhill,
"max_speed": track.max_speed,
"length_2d": track.length_2d,
"length_3d": track.length_3d,
"moving_time": track.moving_time,
"uphill": track.uphill,
"name": item["name"],
"url": item["url"],
"user": item["user"],
"start_time": track.start_time,
"end_time": track.end_time,
"difficulty": item["difficulty"],
"bounds": {
"min": {"type": "Point", "coordinates": [track.max_longitude, track.max_latitude]},
"max": {"type": "Point", "coordinates": [track.min_longitude, track.min_latitude]}
}
}
return doc
except Exception as e:
print("Could not read {}".format(item["files"][0]), e)
return None
class JsonLinesImporter:
def __init__(self, file, batch_size=30, mongo_uri='mongodb://localhost:27017', db='tracks', collection='tracks'):
self.file = file
self.base_dir = os.path.dirname(file)
self.batch_size = batch_size
self.client = MongoClient(mongo_uri)
self.db = db
self.collection = collection
def read_lines(self):
with open(self.file) as f:
batch = []
for line in f:
batch.append(json.loads(line))
if len(batch) == self.batch_size:
yield batch
batch.clear()
yield batch
def save_to_mongodb(self):
db = self.client[self.db]
collection = db[self.collection]
for idx, batch in enumerate(self.read_lines()):
print("inserting batch", idx)
collection.insert_many(self.prepare_documents(batch))
def prepare_documents(self, batch):
documents = []
with ProcessPoolExecutor() as executor:
for document in executor.map(to_document, [self.base_dir] * len(batch), batch):
if document is not None:
documents.append(document)
return documents
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input', required=True, help="input file in JSON Lines format")
parser.add_argument('-c', '--collection', required=True,
help="name of the mongodb collection where the tracks should be stored")
args = parser.parse_args()
importer = JsonLinesImporter(args.input, collection=args.collection)
importer.save_to_mongodb()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment