Skip to content

Instantly share code, notes, and snippets.

@ShanikaNishadhi
Created May 13, 2022 11:40
Show Gist options
  • Save ShanikaNishadhi/52c377492a838c01d850101204a03e23 to your computer and use it in GitHub Desktop.
Save ShanikaNishadhi/52c377492a838c01d850101204a03e23 to your computer and use it in GitHub Desktop.
import redis
import sys
import requests
from datetime import timedelta
import json
# Create connectivity to the Redis server
def redis_connect():
try:
client = redis.Redis(host="localhost",port=6379,db=0,socket_timeout=5,)
ping = client.ping()
if ping is True:
return client
except redis.AuthenticationError:
print("AuthenticationError")
sys.exit(1)
# Create client
redis_client = redis_connect()
# Request to get data from API
def get_weather_from_api(location: str) -> str:
api_key = "3fa366fac8ab4460ab374551221205"
request_url = "http://api.weatherapi.com/v1/current.json"
try:
weather_data = requests.get(f"{request_url}?key={api_key}&q={location}&aqi=no")
return weather_data
except Exception as error:
print(f"API Error : {error}")
# Check Redis for cached data from the key
def get_weather_data_from_cache(key: str) -> str:
try:
cache_data = redis_client.get(key)
return cache_data
except Exception as error:
print(f"Error Occured : {error}")
# Add data to the redis cache with 3600s expiery
def put_weather_data_to_cache(key: str, value: str) -> bool:
try:
state = redis_client.setex(key, timedelta(seconds=3600), value=value)
return state
except Exception as error:
print(f"Error Occured : {error}")
# Obtain weather data
def get_data(coordinates: str) -> dict:
try:
# Check data availability in Redis
data = get_weather_data_from_cache(key=coordinates)
# If Aavailabe serve from cache
if data is not None:
print("Data Obtained From Cache")
data = json.loads(data)
return data
else:
# Get data from API
data = get_weather_from_api(coordinates)
# Check response
if data.status_code == requests.codes.ok:
print("Data Obtained From API")
data = data.content
# Add data to cache
state = put_weather_data_to_cache(key=coordinates, value=data)
if state is True:
print("Data Added to Redis")
return data
except Exception as error:
print(f"Error Occured : {error}")
# Call the get_data function with the given argument
if __name__ == '__main__':
weather_data = get_data(sys.argv[1])
print(weather_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment