Created
April 6, 2024 23:36
-
-
Save tbbooher/d8d3a7ed49087b19d1ac129fa428a250 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import psycopg2 | |
from dotenv import load_dotenv | |
import os | |
from datetime import datetime | |
from tqdm import tqdm | |
# Load environment variables from .env file | |
load_dotenv() | |
# PostgreSQL database connection details | |
DB_HOST = os.getenv('DB_HOST') | |
DB_PORT = int(os.getenv('DB_PORT')) | |
DB_NAME = os.getenv('DB_NAME') | |
DB_USER = os.getenv('DB_USER') | |
DB_PASSWORD = os.getenv('DB_PASSWORD') | |
# Function to get the last processed row | |
def get_last_processed_row(cur): | |
cur.execute('SELECT MAX(id) FROM cell_towers') | |
result = cur.fetchone() | |
return result[0] if result[0] is not None else 0 | |
# Connect to the PostgreSQL database | |
conn = psycopg2.connect( | |
host=DB_HOST, | |
port=DB_PORT, | |
dbname=DB_NAME, | |
user=DB_USER, | |
password=DB_PASSWORD | |
) | |
cur = conn.cursor() | |
# Assuming there's an 'id' column in your table to keep track of the insertion order | |
cur.execute(''' | |
CREATE TABLE IF NOT EXISTS cell_towers ( | |
id SERIAL PRIMARY KEY, | |
"Radio" VARCHAR(255), | |
"MCC" INTEGER, | |
"MNC" INTEGER, | |
"LAC/TAC/NID" INTEGER, | |
"CID" INTEGER, | |
"Unknown" INTEGER, | |
"Longitude" DECIMAL(11, 8), | |
"Latitude" DECIMAL(10, 8), | |
"Range" INTEGER, | |
"Samples" INTEGER, | |
"Changeable" BOOLEAN, | |
"Created" TIMESTAMP, | |
"Updated" TIMESTAMP, | |
"AverageSignal" DECIMAL(5, 2) | |
) | |
''') | |
conn.commit() | |
last_row_processed = get_last_processed_row(cur) | |
print(f"Resuming from row {last_row_processed}") | |
with open('all_files_combined.csv', 'r') as file: | |
reader = csv.reader(file) | |
next(reader) # Skip header | |
for row_num, row in enumerate(tqdm(reader, desc="Inserting records"), start=1): | |
if row_num <= last_row_processed: | |
continue # Skip already processed rows | |
try: | |
cur.execute(''' | |
INSERT INTO cell_towers ( | |
"Radio", "MCC", "MNC", "LAC/TAC/NID", "CID", "Unknown", | |
"Longitude", "Latitude", "Range", "Samples", "Changeable", | |
"Created", "Updated", "AverageSignal" | |
) | |
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) | |
''', ( | |
row[0], int(row[1]), int(row[2]), int(row[3]), int(row[4]), int(row[5]), | |
float(row[6]), float(row[7]), int(row[8]), int(row[9]), row[10] == '1', | |
datetime.utcfromtimestamp(int(row[11])), datetime.utcfromtimestamp(int(row[12])), float(row[13]) | |
)) | |
if row_num % 1000 == 0: # Commit every 1000 rows | |
conn.commit() | |
except psycopg2.errors.NumericValueOutOfRange as e: | |
print(f"Error processing row {row_num}: {e}") | |
break # Break on error | |
conn.commit() # Final commit in case there were fewer than 1000 rows left | |
cur.close() | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment