Skip to content

Instantly share code, notes, and snippets.

@Zir0-93
Created January 29, 2019 16:09
Show Gist options
  • Save Zir0-93/b7d2cf47ae54e53100357e0cebae4a57 to your computer and use it in GitHub Desktop.
Save Zir0-93/b7d2cf47ae54e53100357e0cebae4a57 to your computer and use it in GitHub Desktop.
Script to migrate psql or mysql record to influxDB in python
### MySQL DB info ###
#import MySQLdb
#conn = MySQLdb.connect(host="localhost", # your host, usually localhost
# user="john", # your username
# passwd="megajonhy", # your password
# db="jonhydb") # name of the data base
### PostgreSQL DB info ###
import psycopg2
import psycopg2.extras
postgresql_table_name = ""
conn = psycopg2.connect("dbname=<DB_NAME> user=<USER_NAME>" +
"password=<USER_PASSWORD> host=<POSTGRESQL_HOST>")
### InfluxDB info ####
from influxdb import InfluxDBClient
influx_db_name = ""
influxClient = InfluxDBClient("<INFLUX_HOST>", "<INFLUX_PORT>")
influxClient.delete_database(influx_db_name)
influxClient.create_database(influx_db_name)
# dictates how columns will be mapped to key/fields in InfluxDB
schema = {
"time_column": "", # the column that will be used as the time stamp in influx
"columns_to_fields" : ["",...], # columns that will map to fields
"columns_to_tags" : ["",...], # columns that will map to tags
"table_name_to_measurement" : "", # table name that will be mapped to measurement
}
'''
Generates an collection of influxdb points from the given SQL records
'''
def generate_influx_points(records):
influx_points = []
for record in records:
tags = {}, fields = {}
for tag_label in schema['columns_to_tags']:
tags[tag_label] = record[tag_label]
for field_label in schema['columns_to_fields']:
fields[field_label] = record[field_label]
influx_points.append({
"measurement": schema['table_name_to_measurement'],
"tags": tags,
"time": record[schema['time_column']],
"fields": fields
})
return influx_points
# query relational DB for all records
curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
# curr = conn.cursor(dictionary=True)
curr.execute("SELECT * FROM " + schema['table_name_to_measurement'] + "ORDER BY " + schema['column_to_time'] + " DESC;")
row_count = 0
# process 1000 records at a time
while True:
print("Processing row #" + (row_count + 1))
selected_rows = curr.fetchmany(1000)
influxClient.write_points(generate_influx_points(selected_rows))
row_count += 1000
if len(selected_rows) < 1000:
break
conn.close()
@sabharee97
Copy link

Hi, Is there any updated script available for this task?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment