Skip to content

Instantly share code, notes, and snippets.

@nfarah86
Created May 18, 2020 05:36
Show Gist options
  • Save nfarah86/85caee5b14639e238e34715094cc5436 to your computer and use it in GitHub Desktop.
Save nfarah86/85caee5b14639e238e34715094cc5436 to your computer and use it in GitHub Desktop.
from settings import *
from mongo_config import weather_data_collection, pollution_data_collection
from timeloop import Timeloop
from datetime import timedelta
import requests
import json
tl = Timeloop()
def get_weather_data():
""" get weather data from climacell """
url = "https://api.climacell.co/v3/weather/realtime"
querystring = {"lat":"39.9042","lon":"116.4074","unit_system":"us","fields":"precipitation,wind_gust,humidity,wind_direction,precipitation_type,visibility,cloud_cover,cloud_base,cloud_ceiling,weather_code,feels_like,temp","apikey":CLIMACELL_API_KEY}
weather_response = requests.request("GET", url, params=querystring)
return weather_response.json()
def get_air_pollution_data():
""" get air quality data from climacell """
url = "https://api.climacell.co/v3/weather/realtime"
querystring = {"lat":"39.9042","lon":"116.4074","unit_system":"us","fields":"o3,so2,co,no2,pm10,pm25","apikey":CLIMACELL_API_KEY}
air_pollution_response = requests.request("GET", url, params=querystring)
return air_pollution_response.json()
@tl.job(interval=timedelta(seconds=5))
def sample_job_every_120s():
weather_response = get_weather_data()
air_pollution_data = get_air_pollution_data()
insert_to_mongo(weather_response, air_pollution_data)
def insert_to_mongo(weather_data, air_pollution_data):
""" insert weather data and traffic data to the proper collections """
weather_data_collection.insert_one(weather_data)
pollution_data_collection.insert_one(air_pollution_data)
def main():
sample_job_every_120s()
if __name__ == "__main__":
tl.start(block=True)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment