Created
March 13, 2024 01:28
-
-
Save recalde/46c8027a7e568fc5d8eaaef2695b5ad6 to your computer and use it in GitHub Desktop.
Export Data Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"sql_server": { | |
"driver": "{SQL Server}", | |
"server": "your_server", | |
"database": "your_database", | |
"username": "your_username", | |
"password": "your_password" | |
}, | |
"oracle": { | |
"dsn": "your_dsn", | |
"user": "your_user", | |
"password": "your_password" | |
}, | |
"postgres": { | |
"host": "your_host", | |
"database": "your_database", | |
"user": "your_user", | |
"password": "your_password" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import pandas as pd | |
from datetime import datetime | |
from sqlalchemy import create_engine | |
def load_db_config(db_type): | |
"""Load database configuration for the specified database type.""" | |
with open('db_config.json') as f: | |
config = json.load(f) | |
return config.get(db_type) | |
def create_db_engine(db_type, config): | |
"""Create a database engine connection string based on the db type.""" | |
if db_type == 'sql_server': | |
connection_string = f"mssql+pyodbc://{config['username']}:{config['password']}@{config['server']}/{config['database']}?driver={config['driver']}" | |
elif db_type == 'oracle': | |
connection_string = f"oracle+cx_oracle://{config['user']}:{config['password']}@{config['dsn']}" | |
elif db_type == 'postgres': | |
connection_string = f"postgresql://{config['user']}:{config['password']}@{config['host']}/{config['database']}" | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
return create_engine(connection_string) | |
def export_table_data(table_name, db_type, start_date, end_date, timestamp_field, frequency_type): | |
"""Export data from a table within a specific time period into a CSV file.""" | |
config = load_db_config(db_type) | |
engine = create_db_engine(db_type, config) | |
# Format SQL query based on the frequency type | |
if frequency_type in ['daily', 'monthly', 'quarterly']: | |
query = f"""SELECT * FROM {table_name} WHERE {timestamp_field} >= '{start_date}' AND {timestamp_field} <= '{end_date}'""" | |
else: | |
raise ValueError("Frequency type must be 'daily', 'monthly', or 'quarterly'") | |
# Execute query and load data into a DataFrame | |
df = pd.read_sql(query, engine) | |
# Determine file name based on frequency type | |
date_format = {'daily': '%Y%m%d', 'monthly': '%Y%m', 'quarterly': '%YQQ'}[frequency_type] | |
file_name = f"{table_name}-{datetime.now().strftime(date_format)}.csv" | |
# Export DataFrame to CSV | |
df.to_csv(file_name, index=False) | |
print(f"Data exported to {file_name}") | |
if __name__ == "__main__": | |
# Example usage | |
export_table_data( | |
table_name="your_table_name", | |
db_type="postgres", # or "sql_server", "oracle" | |
start_date="2023-01-01", | |
end_date="2023-01-31", | |
timestamp_field="your_timestamp_column", | |
frequency_type="monthly" # or "daily", "quarterly" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment