Created
January 29, 2019 16:09
-
-
Save Zir0-93/b7d2cf47ae54e53100357e0cebae4a57 to your computer and use it in GitHub Desktop.
Script to migrate psql or mysql record to influxDB in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### MySQL DB info ### | |
#import MySQLdb | |
#conn = MySQLdb.connect(host="localhost", # your host, usually localhost | |
# user="john", # your username | |
# passwd="megajonhy", # your password | |
# db="jonhydb") # name of the data base | |
### PostgreSQL DB info ### | |
import psycopg2 | |
import psycopg2.extras | |
postgresql_table_name = "" | |
conn = psycopg2.connect("dbname=<DB_NAME> user=<USER_NAME>" + | |
"password=<USER_PASSWORD> host=<POSTGRESQL_HOST>") | |
### InfluxDB info #### | |
from influxdb import InfluxDBClient | |
influx_db_name = "" | |
influxClient = InfluxDBClient("<INFLUX_HOST>", "<INFLUX_PORT>") | |
influxClient.delete_database(influx_db_name) | |
influxClient.create_database(influx_db_name) | |
# dictates how columns will be mapped to key/fields in InfluxDB | |
schema = { | |
"time_column": "", # the column that will be used as the time stamp in influx | |
"columns_to_fields" : ["",...], # columns that will map to fields | |
"columns_to_tags" : ["",...], # columns that will map to tags | |
"table_name_to_measurement" : "", # table name that will be mapped to measurement | |
} | |
''' | |
Generates an collection of influxdb points from the given SQL records | |
''' | |
def generate_influx_points(records): | |
influx_points = [] | |
for record in records: | |
tags = {}, fields = {} | |
for tag_label in schema['columns_to_tags']: | |
tags[tag_label] = record[tag_label] | |
for field_label in schema['columns_to_fields']: | |
fields[field_label] = record[field_label] | |
influx_points.append({ | |
"measurement": schema['table_name_to_measurement'], | |
"tags": tags, | |
"time": record[schema['time_column']], | |
"fields": fields | |
}) | |
return influx_points | |
# query relational DB for all records | |
curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor) | |
# curr = conn.cursor(dictionary=True) | |
curr.execute("SELECT * FROM " + schema['table_name_to_measurement'] + "ORDER BY " + schema['column_to_time'] + " DESC;") | |
row_count = 0 | |
# process 1000 records at a time | |
while True: | |
print("Processing row #" + (row_count + 1)) | |
selected_rows = curr.fetchmany(1000) | |
influxClient.write_points(generate_influx_points(selected_rows)) | |
row_count += 1000 | |
if len(selected_rows) < 1000: | |
break | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, Is there any updated script available for this task?