Skip to content

Instantly share code, notes, and snippets.

@vijayanandrp
Last active December 3, 2018 12:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vijayanandrp/5bf32b34a71da4a0e19f880a7ae1ce69 to your computer and use it in GitHub Desktop.
Save vijayanandrp/5bf32b34a71da4a0e19f880a7ae1ce69 to your computer and use it in GitHub Desktop.
# *-* coding: utf-8 *-*
import requests
try:
from pymongo import MongoClient
except ImportError:
raise ImportError('PyMongo is not installed')
try:
from bs4 import BeautifulSoup
except Exception as err:
raise ImportError('Bs4 is not imported correctly. - {}'.format(err))
class MongoDB(object):
def __init__(self, host='localhost', port=27017, database_name=None,
collection_name=None, drop_n_create=False):
try:
self._connection = MongoClient(host=host, port=port, maxPoolSize=200)
except Exception as error:
raise Exception(error)
if drop_n_create:
self.drop_db(database_name)
self._database = None
self._collection = None
if database_name:
self._database = self._connection[database_name]
if collection_name:
self._collection = self._database[collection_name]
@staticmethod
def check_state(obj):
if not obj:
return False
else:
return True
def check_db(self):
if not self.check_state(self._database):
# validate the database name
raise ValueError('Database is empty/not created')
def check_collection(self):
# validate the collection name
if not self.check_state(self._collection):
raise ValueError('Collection is empty/not created')
def get_overall_details(self):
# get overall connection information
client = self._connection
details = dict((db, [collection for collection in client[db].collection_names()])
for db in client.database_names())
return details
def insert(self, post):
# add/append/new single record
self.check_collection()
post_id = self._collection.insert_one(post).inserted_id
return post_id
def insert_many(self, posts):
# add/append/new multiple records
self.check_collection()
result = self._collection.insert_many(posts)
return result.inserted_ids
if __name__ == '__main__':
url = 'http://climatedataapi.worldbank.org/climateweb/rest/v1/country/cru/tas/year/CAN.csv'
response = requests.get(url)
data = response.text
if response.status_code != 200:
print('Failed to get data:', response.status_code)
else:
print('First 100 characters of data are')
print(data[:100])
print('[*] Parsing response text')
data = data.split('\n')
data_list = list()
for value in data:
if 'year,data' not in value:
if value:
value = value.split(',')
data_list.append({'year': int(value[0]), 'data': float(value[1])})
print(data_list)
print('[*] Pushing data to MongoDB ')
mongo_db = MongoDB(database_name='Climate_DB', collection_name='climate_data')
for collection in data_list:
print('[!] Inserting - ', collection)
mongo_db.insert(collection)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment