Skip to content

Instantly share code, notes, and snippets.

@thespacedoctor
Last active May 5, 2021 16:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thespacedoctor/565cc64291b1737daa1a92df6c43e9ed to your computer and use it in GitHub Desktop.
Save thespacedoctor/565cc64291b1737daa1a92df6c43e9ed to your computer and use it in GitHub Desktop.
[AVRO to MySQL Import Speed Test] #mysql #kafka #avro #ztf #lsst

AVRO to MySQL Import Speed Test

This script will run on a folder of ZTF avro alerts (e.g. an unarchived bundle from the ZTF nightly archives).

Create a conda environment within which to run the script:

conda create -n avro_speed_test python=2.7 pip

Then setup a database to test the imports:

CREATE DATABASE avro_speed_test;
CREATE USER 'avroUser'@'%' IDENTIFIED by 'avroPass';
GRANT ALL PRIVILEGES ON *.* TO 'avroUser'@'%' WITH GRANT OPTION;
GRANT SELECT ON *.* TO 'avroUser'@'%';

and geenrate a mysql_config for the database:

mysql_config_editor set --login-path=avroSpeedTest --host=127.0.0.1 --user=avroUser --password

Download the script with:

wget https://gist.githubusercontent.com/thespacedoctor/565cc64291b1737daa1a92df6c43e9ed/raw/avro_to_mysql_import_speed_tests.py

Finally time the import with the command:

time python avro_to_mysql_import_speed_tests <directory>
#!/usr/local/bin/python
# encoding: utf-8
"""
*Testing how quickly we can import ZTF avro packet contents into a MySQL database table*
:Author:
David Young
:Date Created:
September 13, 2019
Usage:
avro_to_mysql_import_speed_tests <directory>
Options:
directory directory containing avro packets
-h, --help show this help message
-v, --version show version
-s, --settings the settings file
"""
################# GLOBAL IMPORTS ####################
import sys
import os
from fundamentals import tools
from fundamentals.mysql import insert_list_of_dictionaries_into_database_tables
def main(arguments=None):
"""
*The main function used when ``avro_to_mysql_import_speed_tests.py`` is run as a single script from the cl*
"""
# SETUP THE COMMAND-LINE UTIL SETTINGS
su = tools(
arguments=arguments,
docString=__doc__,
logLevel="WARNING",
options_first=False,
projectName=False
)
arguments, settings, log, dbConn = su.setup()
# UNPACK REMAINING CL ARGUMENTS USING `EXEC` TO SETUP THE VARIABLE NAMES
# AUTOMATICALLY
for arg, val in arguments.iteritems():
if arg[0] == "-":
varname = arg.replace("-", "") + "Flag"
else:
varname = arg.replace("<", "").replace(">", "")
if isinstance(val, str) or isinstance(val, unicode):
exec(varname + " = '%s'" % (val,))
else:
exec(varname + " = %s" % (val,))
if arg == "--dbConn":
dbConn = val
log.debug('%s = %s' % (varname, val,))
# GET ALL AVRO FILES FROM FOLDER
packets = []
packets = []
packets[:] = [os.path.join(directory, d) for d in os.listdir(
directory) if os.path.isfile(os.path.join(directory, d)) and ".avro" in d]
allPackets = len(packets)
# mysql -u root --password=root
# CREATE DATABASE avro_speed_test;
# CREATE USER 'avroUser'@'%' IDENTIFIED by 'avroPass';
# GRANT ALL PRIVILEGES ON *.* TO 'avroUser'@'%' WITH GRANT OPTION;
# GRANT SELECT ON *.* TO 'avroUser'@'%';
# mysql_config_editor set --login-path=avroSpeedTest --host=127.0.0.1
# --user=avroUser --password
dbSettings = {
'host': '127.0.0.1',
'loginPath': 'avroSpeedTest',
'user': 'avroUser',
'password': 'avroPass',
'db': 'avro_speed_test'
}
# SETUP ALL DATABASE CONNECTIONS
from fundamentals.mysql import database
dbConn = database(
log=log,
dbSettings=dbSettings
).connect()
from fundamentals.mysql import writequery
sqlQuery = """CREATE TABLE `candidates` (
`primaryId` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'An internal counter',
`dateCreated` datetime DEFAULT CURRENT_TIMESTAMP,
`dateLastModified` datetime DEFAULT CURRENT_TIMESTAMP,
`updated` tinyint(4) DEFAULT '0',
`aimage` double DEFAULT NULL,
`aimagerat` double DEFAULT NULL,
`bimage` double DEFAULT NULL,
`bimagerat` double DEFAULT NULL,
`candid` bigint(20) DEFAULT NULL,
`chinr` double DEFAULT NULL,
`chipsf` double DEFAULT NULL,
`classtar` double DEFAULT NULL,
`clrcoeff` double DEFAULT NULL,
`clrcounc` double DEFAULT NULL,
`clrmed` double DEFAULT NULL,
`clrrms` double DEFAULT NULL,
`decl` double DEFAULT NULL COMMENT 'original keyword: dec',
`decnr` double DEFAULT NULL,
`diffmaglim` double DEFAULT NULL,
`distnr` double DEFAULT NULL,
`distpsnr1` double DEFAULT NULL,
`distpsnr2` double DEFAULT NULL,
`distpsnr3` double DEFAULT NULL,
`drb` double DEFAULT NULL,
`dsdiff` double DEFAULT NULL,
`dsnrms` double DEFAULT NULL,
`elong` double DEFAULT NULL,
`exptime` double DEFAULT NULL,
`fid` tinyint(4) DEFAULT NULL,
`field` int(11) DEFAULT NULL,
`fwhm` double DEFAULT NULL,
`jd` double DEFAULT NULL,
`jdendhist` double DEFAULT NULL,
`jdendref` double DEFAULT NULL,
`jdstarthist` double DEFAULT NULL,
`jdstartref` double DEFAULT NULL,
`magap` double DEFAULT NULL,
`magapbig` double DEFAULT NULL,
`magdiff` double DEFAULT NULL,
`magfromlim` double DEFAULT NULL,
`maggaia` double DEFAULT NULL,
`maggaiabright` double DEFAULT NULL,
`magnr` double DEFAULT NULL,
`magpsf` double DEFAULT NULL,
`magzpsci` double DEFAULT NULL,
`magzpscirms` double DEFAULT NULL,
`magzpsciunc` double DEFAULT NULL,
`mindtoedge` double DEFAULT NULL,
`nbad` int(11) DEFAULT NULL,
`ncovhist` int(11) DEFAULT NULL,
`ndethist` int(11) DEFAULT NULL,
`neargaia` double DEFAULT NULL,
`neargaiabright` double DEFAULT NULL,
`nframesref` int(11) DEFAULT NULL,
`nid` int(11) DEFAULT NULL,
`nmatches` int(11) DEFAULT NULL,
`nmtchps` int(11) DEFAULT NULL,
`nneg` int(11) DEFAULT NULL,
`objectidps1` bigint(20) DEFAULT NULL,
`objectidps2` bigint(20) DEFAULT NULL,
`objectidps3` bigint(20) DEFAULT NULL,
`pid` bigint(20) DEFAULT NULL,
`programid` tinyint(4) DEFAULT NULL,
`ra` double DEFAULT NULL,
`ranr` double DEFAULT NULL,
`rb` double DEFAULT NULL,
`rcid` tinyint(4) DEFAULT NULL,
`rfid` int(11) DEFAULT NULL,
`scorr` double DEFAULT NULL,
`seeratio` double DEFAULT NULL,
`sgmag1` double DEFAULT NULL,
`sgmag2` double DEFAULT NULL,
`sgmag3` double DEFAULT NULL,
`sgscore1` double DEFAULT NULL,
`sgscore2` double DEFAULT NULL,
`sgscore3` double DEFAULT NULL,
`sharpnr` double DEFAULT NULL,
`sigmagap` double DEFAULT NULL,
`sigmagapbig` double DEFAULT NULL,
`sigmagnr` double DEFAULT NULL,
`sigmapsf` double DEFAULT NULL,
`simag1` double DEFAULT NULL,
`simag2` double DEFAULT NULL,
`simag3` double DEFAULT NULL,
`sky` double DEFAULT NULL,
`srmag1` double DEFAULT NULL,
`srmag2` double DEFAULT NULL,
`srmag3` double DEFAULT NULL,
`ssdistnr` double DEFAULT NULL,
`ssmagnr` double DEFAULT NULL,
`ssnrms` double DEFAULT NULL,
`sumrat` double DEFAULT NULL,
`szmag1` double DEFAULT NULL,
`szmag2` double DEFAULT NULL,
`szmag3` double DEFAULT NULL,
`tblid` int(11) DEFAULT NULL,
`tooflag` int(11) DEFAULT NULL,
`xpos` double DEFAULT NULL,
`ypos` double DEFAULT NULL,
`zpclrcov` double DEFAULT NULL,
`zpmed` double DEFAULT NULL,
`objectId` varchar(45) DEFAULT NULL,
`drbversion` varchar(100) DEFAULT NULL,
`isdiffpos` varchar(100) DEFAULT NULL,
`pdiffimfilename` varchar(100) DEFAULT NULL,
`programpi` varchar(100) DEFAULT NULL,
`rbversion` varchar(100) DEFAULT NULL,
`ssnamenr` varchar(100) DEFAULT NULL,
`dec` double DEFAULT NULL,
PRIMARY KEY (`primaryId`),
UNIQUE KEY `objectid_jd` (`objectId`,`jd`)
) ENGINE=Innodb AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE utf8_general_ci;
""" % locals()
writequery(
log=log,
sqlQuery=sqlQuery,
dbConn=dbConn
)
# READ THE AVRO PACKET
from fastavro import reader
batchSize = 10000
total = len(packets[1:])
batches = int(total / batchSize)
start = 0
end = 0
theseBatches = []
for i in range(batches + 1):
end = end + batchSize
start = i * batchSize
thisBatch = packets[start:end]
theseBatches.append(thisBatch)
index = 1
for batch in theseBatches:
histories = []
candidates = []
for packet in batch:
with open(packet, 'rb') as fo:
# PRINT THE PACKET SCHEMA NAMES
avro_reader = reader(fo)
schema = avro_reader.schema
for record in avro_reader:
objectid = record["objectId"]
history = record["prv_candidates"]
candidate = record["candidate"]
# HISTORY IS MULTIPLE ENTRIES
if history:
for h in history:
h["objectId"] = objectid
histories += history
# HISTORY IS SINGLE ENTRY
candidate[u"objectId"] = objectid
candidates.append(candidate)
# history.append(candidate)
# USE dbSettings TO ACTIVATE MULTIPROCESSING - INSERT LIST OF
# DICTIONARIES INTO DATABASE
insert_list_of_dictionaries_into_database_tables(
dbConn=dbConn,
log=log,
dictList=candidates,
dbTableName="candidates",
uniqueKeyList=["objectId", "jd"],
dateModified=False,
dateCreated=False,
batchSize=2500,
replace=False,
dbSettings=dbSettings
)
insert_list_of_dictionaries_into_database_tables(
dbConn=dbConn,
log=log,
dictList=histories,
dbTableName="candidates",
uniqueKeyList=["objectId", "jd"],
dateModified=False,
dateCreated=False,
batchSize=2500,
replace=False,
dbSettings=dbSettings
)
if index > 1:
# Cursor up one line and clear line
sys.stdout.write("\x1b[1A\x1b[2K")
percent = (float(index) / float(batches)) * 100.
thisIndex = batchSize * index
print '%(thisIndex)s/%(allPackets)s avro files ingested to sql database (%(percent)1.1f%% done)' % locals()
index += 1
return
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment