Skip to content

Instantly share code, notes, and snippets.

@zznathans
Created May 25, 2025 23:31
Show Gist options
  • Save zznathans/831538eb7933d4f09bee5acdd0faaf50 to your computer and use it in GitHub Desktop.
Save zznathans/831538eb7933d4f09bee5acdd0faaf50 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import requests
import csv
import json
import sys
import logging
import pandas as pd
import time
from botocore.exceptions import ClientError
from pymongo import MongoClient
import os
import re
my_env = str(os.getenv("ENVIRONMENT"))
if my_env == "production":
db_name = "simcountry"
else:
db_name = "simcountry_dev"
class JSONFormatter(logging.Formatter):
def format(self, record):
log_dict = {
'timestamp': record.created,
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'line_number': record.lineno,
'thread_name': record.threadName
}
return json.dumps(log_dict)
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
log.addHandler(handler)
start_year_map = {
"gr": 5239,
"fb": 5346,
"wg": 5161,
"kb": 5479,
"lu": 5252,
"ta": 2477,
}
def get_start_year(world):
return start_year_map.get(world)
data_types = [
"countrydata",
"countryindexes",
"countryscores",
"enterprisedata",
"enterpriseindexes",
"enterprisescores",
"marketprices",
"shareprices"
]
world_id_map = {
"gr": "04",
"fb": "02",
"wg": "03",
"kb": "01",
"lu": "05",
"ta": "06"
}
def get_world_id(world_name):
return world_id_map.get(world_name)
def mongo_connect():
return MongoClient(f"mongodb://{os.getenv("mongo_host")}:27017")
def get_if_data_exists(world, data_type, year, month):
client = mongo_connect()
db = client[db_name]
collection = db['data']
log.info(f"Checking if data exists for {world} {data_type} {year} {month}")
how_many = collection.count_documents({"year": int(year), "month": int(month), "data_type": data_type})
log.info(f"Found {how_many} records for {world} {data_type} {year} {month}")
if how_many > 0:
return True
return False
def scrape_month(world, data_type, year, month):
date_slug = f"{year}{month}"
url = f"https://sim{ get_world_id(world) }.simcountry.com/cgi-bin/{data_type}.csv?SN_ADDRESS=wwwCountry&SN_METHOD=downloadSheet&miDocument={data_type}.{date_slug}&e=.csv"
log.info(f"Getting url: {url}")
csv_data = requests.get(url).text
response = requests.get(url)
if response.status_code < 400:
with open("csv_data.csv", "w") as f:
f.write(response.text)
try:
df = pd.read_csv("csv_data.csv")
except Exception as exception: # We got bad data
log.error({"message": "Failed to scrape data", "date_slug": date_slug, "data_type": data_type, "exception": exception})
try:
data = json.loads(df.to_json(orient='records'))
mongo = mongo_connect()
simcountry = mongo[db_name]
my_collection = simcountry['data']
for each in data:
each['data_type'] = data_type
each['year'] = int(year)
each['month'] = int(month)
each['world'] = world
each['timestamp'] = int(time.time())
each['active'] = True
try:
my_collection.insert_many(data)
log.info(f"Inserted {len(data)} records into {world} {data_type} {year} {month}")
mongo.close()
return True
except Exception as exception:
log.error(exception)
except UnboundLocalError as exception: # empty set of data was provided
message = f"Failed to scrape data for {world} {data_type} {year} {month}"
log.error({"message": message, "exception": exception})
def convert_month_to_number(month):
month_map = {
"Jan": "01",
"Feb": "02",
"Mar": "03",
"Apr": "04",
"May": "05",
"Jun": "06",
"Jul": "07",
"Aug": "08",
"Sep": "09",
"Oct": "10",
"Nov": "11",
"Dec": "12"
}
return month_map.get(month)
def get_current_date(world):
url = f"https://sim{world}.simcountry.com/cgi-bin/cgiw?worldmain"
response = requests.get(url)
if response.status_code == 200:
match = re.search(r'\b[A-Z][a-z]{2} \d{1,2} \d{4}\b', response.text)
if match:
log.info(f"Current upstream date: {match.group(0)}")
return {
"world": world,
"date": match.group(0),
"year": match.group(0).split(" ")[2],
"month": convert_month_to_number(match.group(0).split(" ")[0]),
"day": match.group(0).split(" ")[1]
}
else:
log.error({"message": "Failed to get current date", "url": url, "status_code": response.status_code})
return None
def update_latest_date(world: str, year: int, month):
log.info(f"Updating latest date for {world} to {year} {month}")
try:
mongo = mongo_connect()
simcountry = mongo[db_name]
my_collection = simcountry['current_date']
my_collection.update_one({"world": world}, {"$set": {"year": year, "month": month}}, upsert=True)
mongo.close()
except Exception as exception:
log.error({"message": "Failed to update latest date", "exception": exception})
def main():
for world in world_id_map.keys():
date_info = get_current_date(get_world_id(world))
my_year = int(date_info["year"])
my_month = date_info["month"].zfill(2)
# Check if we need to scrape
for data_type in data_types:
if get_if_data_exists(world, data_type, my_year, my_month):
log.info(f"Data exists for {world} {data_type} {my_year} {my_month}")
else:
log.info(f"Data does not exist for {world} {data_type} {my_year} {my_month}")
log.info(f"Scraping {world} {data_type} {my_year} {my_month}")
scrape_month(world, data_type, my_year, my_month)
update_latest_date(world, my_year, my_month)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment