Script to migrate psql or mysql record to influxDB in python
### MySQL DB info ### | |
#import MySQLdb | |
#conn = MySQLdb.connect(host="localhost", # your host, usually localhost | |
# user="john", # your username | |
# passwd="megajonhy", # your password | |
# db="jonhydb") # name of the data base | |
### PostgreSQL DB info ### | |
import psycopg2 | |
import psycopg2.extras | |
postgresql_table_name = "" | |
conn = psycopg2.connect("dbname=<DB_NAME> user=<USER_NAME>" + | |
"password=<USER_PASSWORD> host=<POSTGRESQL_HOST>") | |
### InfluxDB info #### | |
from influxdb import InfluxDBClient | |
influx_db_name = "" | |
influxClient = InfluxDBClient("<INFLUX_HOST>", "<INFLUX_PORT>") | |
influxClient.delete_database(influx_db_name) | |
influxClient.create_database(influx_db_name) | |
# dictates how columns will be mapped to key/fields in InfluxDB | |
schema = { | |
"time_column": "", # the column that will be used as the time stamp in influx | |
"columns_to_fields" : ["",...], # columns that will map to fields | |
"columns_to_tags" : ["",...], # columns that will map to tags | |
"table_name_to_measurement" : "", # table name that will be mapped to measurement | |
} | |
''' | |
Generates an collection of influxdb points from the given SQL records | |
''' | |
def generate_influx_points(records): | |
influx_points = [] | |
for record in records: | |
tags = {}, fields = {} | |
for tag_label in schema['columns_to_tags']: | |
tags[tag_label] = record[tag_label] | |
for field_label in schema['columns_to_fields']: | |
fields[field_label] = record[field_label] | |
influx_points.append({ | |
"measurement": schema['table_name_to_measurement'], | |
"tags": tags, | |
"time": record[schema['time_column']], | |
"fields": fields | |
}) | |
return influx_points | |
# query relational DB for all records | |
curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor) | |
# curr = conn.cursor(dictionary=True) | |
curr.execute("SELECT * FROM " + schema['table_name_to_measurement'] + "ORDER BY " + schema['column_to_time'] + " DESC;") | |
row_count = 0 | |
# process 1000 records at a time | |
while True: | |
print("Processing row #" + (row_count + 1)) | |
selected_rows = curr.fetchmany(1000) | |
influxClient.write_points(generate_influx_points(selected_rows)) | |
row_count += 1000 | |
if len(selected_rows) < 1000: | |
break | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment