Skip to content

Instantly share code, notes, and snippets.

@cankush625
Last active August 2, 2021 12:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cankush625/0be45100b99e0d2ec85797b0998e14ad to your computer and use it in GitHub Desktop.
Save cankush625/0be45100b99e0d2ec85797b0998e14ad to your computer and use it in GitHub Desktop.
import requests
import concurrent.futures
import json
import logging
class AwsEc2Price:
# Retrieving EC2 instance prices from AWS EC2 Pricing API
def get_ec2_prices(self, reserved_instance_plan_args):
url = f"https://a0.p.awsstatic.com/pricing/1.0/ec2/region/{reserved_instance_plan_args[0]}/reserved-instance/linux/index.json"
prices_list = []
try:
response = requests.get(url)
try:
pricing_data = response.json()
prices = pricing_data["prices"]
for price in prices:
if self.is_valid_input(price, reserved_instance_plan_args):
prices_list.append(price)
except json.decoder.JSONDecodeError:
logging.exception("JSON Decode Error", exc_info=False)
except KeyError:
logging.exception("Key not found in JSON response", exc_info=False)
except requests.exceptions.InvalidSchema:
logging.exception("Invalid URL. No connection adapters found", exc_info=False)
except requests.exceptions.HTTPError as errorCode:
logging.exception(f"HTTP error {errorCode}")
except requests.exceptions.ConnectionError:
logging.exception("Failed to establish a new connection", exc_info=False)
except requests.exceptions.RequestException as e:
logging.exception(e)
return prices_list
@staticmethod
def is_valid_input(price, reserved_instance_plan_args):
ec2_term_offering_class = price["attributes"]["aws:offerTermOfferingClass"]
ec2_term_length = price["attributes"]["aws:offerTermLeaseLength"]
ec2_payment_option = price["attributes"]["aws:offerTermPurchaseOption"]
ec2_operating_system = price["attributes"]["aws:ec2:operatingSystem"]
ec2_tenancy = price["attributes"]["aws:ec2:tenancy"]
is_valid = (ec2_term_offering_class == reserved_instance_plan_args[1] and
ec2_term_length == reserved_instance_plan_args[2] and
ec2_payment_option == reserved_instance_plan_args[3] and
ec2_operating_system == reserved_instance_plan_args[4] and
ec2_tenancy == reserved_instance_plan_args[5])
return is_valid
@staticmethod
def save_prices_in_file(prices_list, file_id):
prices = json.dumps(prices_list)
try:
with open(f'./prices/ec2_price_{file_id}.json', 'w') as prices_file:
prices_file.write(prices)
except FileNotFoundError:
logging.exception("The prices directory is not present", exc_info=False)
# Running get_ec2_prices and save_prices_in_file function using this function
# This will enable us to run both functions in a single thread as both functions are dependent on each other
# This will create new file for every input using file_id
def get_and_save_ec2_prices(self, reserved_instance_plan_args, file_id):
prices_list = self.get_ec2_prices(reserved_instance_plan_args)
# If prices_list is not empty then save the prices_list data into the file
if len(prices_list) != 0:
self.save_prices_in_file(prices_list, file_id)
ec2_price = AwsEc2Price()
# List of inputs
# list_of_input[i] = ["region", "term_offering_class", "term_length", "payment_option", "operating_system", "tenancy"]
list_of_input = [["ap-south-1", "standard", "1yr", "No Upfront", "Linux", "Shared"],
["us-east-1", "convertible", "1yr", "No Upfront", "Linux", "Shared"]]
# Get prices for all inputs in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
for i, list_of_arguments in enumerate(list_of_input):
executor.submit(ec2_price.get_and_save_ec2_prices, list_of_arguments, i)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment