Skip to content

Instantly share code, notes, and snippets.

@recalde
Created March 13, 2024 01:28
Show Gist options
  • Save recalde/46c8027a7e568fc5d8eaaef2695b5ad6 to your computer and use it in GitHub Desktop.
Save recalde/46c8027a7e568fc5d8eaaef2695b5ad6 to your computer and use it in GitHub Desktop.
Export Data Script
{
"sql_server": {
"driver": "{SQL Server}",
"server": "your_server",
"database": "your_database",
"username": "your_username",
"password": "your_password"
},
"oracle": {
"dsn": "your_dsn",
"user": "your_user",
"password": "your_password"
},
"postgres": {
"host": "your_host",
"database": "your_database",
"user": "your_user",
"password": "your_password"
}
}
import json
import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine
def load_db_config(db_type):
"""Load database configuration for the specified database type."""
with open('db_config.json') as f:
config = json.load(f)
return config.get(db_type)
def create_db_engine(db_type, config):
"""Create a database engine connection string based on the db type."""
if db_type == 'sql_server':
connection_string = f"mssql+pyodbc://{config['username']}:{config['password']}@{config['server']}/{config['database']}?driver={config['driver']}"
elif db_type == 'oracle':
connection_string = f"oracle+cx_oracle://{config['user']}:{config['password']}@{config['dsn']}"
elif db_type == 'postgres':
connection_string = f"postgresql://{config['user']}:{config['password']}@{config['host']}/{config['database']}"
else:
raise ValueError(f"Unsupported database type: {db_type}")
return create_engine(connection_string)
def export_table_data(table_name, db_type, start_date, end_date, timestamp_field, frequency_type):
"""Export data from a table within a specific time period into a CSV file."""
config = load_db_config(db_type)
engine = create_db_engine(db_type, config)
# Format SQL query based on the frequency type
if frequency_type in ['daily', 'monthly', 'quarterly']:
query = f"""SELECT * FROM {table_name} WHERE {timestamp_field} >= '{start_date}' AND {timestamp_field} <= '{end_date}'"""
else:
raise ValueError("Frequency type must be 'daily', 'monthly', or 'quarterly'")
# Execute query and load data into a DataFrame
df = pd.read_sql(query, engine)
# Determine file name based on frequency type
date_format = {'daily': '%Y%m%d', 'monthly': '%Y%m', 'quarterly': '%YQQ'}[frequency_type]
file_name = f"{table_name}-{datetime.now().strftime(date_format)}.csv"
# Export DataFrame to CSV
df.to_csv(file_name, index=False)
print(f"Data exported to {file_name}")
if __name__ == "__main__":
# Example usage
export_table_data(
table_name="your_table_name",
db_type="postgres", # or "sql_server", "oracle"
start_date="2023-01-01",
end_date="2023-01-31",
timestamp_field="your_timestamp_column",
frequency_type="monthly" # or "daily", "quarterly"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment