|
#!/usr/local/bin/python |
|
# encoding: utf-8 |
|
""" |
|
*Testing how quickly we can import ZTF avro packet contents into a MySQL database table* |
|
|
|
:Author: |
|
David Young |
|
|
|
:Date Created: |
|
September 13, 2019 |
|
|
|
Usage: |
|
avro_to_mysql_import_speed_tests <directory> |
|
|
|
Options: |
|
directory directory containing avro packets |
|
|
|
-h, --help show this help message |
|
-v, --version show version |
|
-s, --settings the settings file |
|
""" |
|
################# GLOBAL IMPORTS #################### |
|
import sys |
|
import os |
|
from fundamentals import tools |
|
from fundamentals.mysql import insert_list_of_dictionaries_into_database_tables |
|
|
|
|
|
def main(arguments=None): |
|
""" |
|
*The main function used when ``avro_to_mysql_import_speed_tests.py`` is run as a single script from the cl* |
|
""" |
|
|
|
# SETUP THE COMMAND-LINE UTIL SETTINGS |
|
su = tools( |
|
arguments=arguments, |
|
docString=__doc__, |
|
logLevel="WARNING", |
|
options_first=False, |
|
projectName=False |
|
) |
|
arguments, settings, log, dbConn = su.setup() |
|
|
|
# UNPACK REMAINING CL ARGUMENTS USING `EXEC` TO SETUP THE VARIABLE NAMES |
|
# AUTOMATICALLY |
|
for arg, val in arguments.iteritems(): |
|
if arg[0] == "-": |
|
varname = arg.replace("-", "") + "Flag" |
|
else: |
|
varname = arg.replace("<", "").replace(">", "") |
|
if isinstance(val, str) or isinstance(val, unicode): |
|
exec(varname + " = '%s'" % (val,)) |
|
else: |
|
exec(varname + " = %s" % (val,)) |
|
if arg == "--dbConn": |
|
dbConn = val |
|
log.debug('%s = %s' % (varname, val,)) |
|
|
|
# GET ALL AVRO FILES FROM FOLDER |
|
packets = [] |
|
packets = [] |
|
packets[:] = [os.path.join(directory, d) for d in os.listdir( |
|
directory) if os.path.isfile(os.path.join(directory, d)) and ".avro" in d] |
|
allPackets = len(packets) |
|
|
|
# mysql -u root --password=root |
|
# CREATE DATABASE avro_speed_test; |
|
# CREATE USER 'avroUser'@'%' IDENTIFIED by 'avroPass'; |
|
# GRANT ALL PRIVILEGES ON *.* TO 'avroUser'@'%' WITH GRANT OPTION; |
|
# GRANT SELECT ON *.* TO 'avroUser'@'%'; |
|
# mysql_config_editor set --login-path=avroSpeedTest --host=127.0.0.1 |
|
# --user=avroUser --password |
|
|
|
dbSettings = { |
|
'host': '127.0.0.1', |
|
'loginPath': 'avroSpeedTest', |
|
'user': 'avroUser', |
|
'password': 'avroPass', |
|
'db': 'avro_speed_test' |
|
} |
|
|
|
# SETUP ALL DATABASE CONNECTIONS |
|
from fundamentals.mysql import database |
|
dbConn = database( |
|
log=log, |
|
dbSettings=dbSettings |
|
).connect() |
|
|
|
from fundamentals.mysql import writequery |
|
sqlQuery = """CREATE TABLE `candidates` ( |
|
`primaryId` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'An internal counter', |
|
`dateCreated` datetime DEFAULT CURRENT_TIMESTAMP, |
|
`dateLastModified` datetime DEFAULT CURRENT_TIMESTAMP, |
|
`updated` tinyint(4) DEFAULT '0', |
|
`aimage` double DEFAULT NULL, |
|
`aimagerat` double DEFAULT NULL, |
|
`bimage` double DEFAULT NULL, |
|
`bimagerat` double DEFAULT NULL, |
|
`candid` bigint(20) DEFAULT NULL, |
|
`chinr` double DEFAULT NULL, |
|
`chipsf` double DEFAULT NULL, |
|
`classtar` double DEFAULT NULL, |
|
`clrcoeff` double DEFAULT NULL, |
|
`clrcounc` double DEFAULT NULL, |
|
`clrmed` double DEFAULT NULL, |
|
`clrrms` double DEFAULT NULL, |
|
`decl` double DEFAULT NULL COMMENT 'original keyword: dec', |
|
`decnr` double DEFAULT NULL, |
|
`diffmaglim` double DEFAULT NULL, |
|
`distnr` double DEFAULT NULL, |
|
`distpsnr1` double DEFAULT NULL, |
|
`distpsnr2` double DEFAULT NULL, |
|
`distpsnr3` double DEFAULT NULL, |
|
`drb` double DEFAULT NULL, |
|
`dsdiff` double DEFAULT NULL, |
|
`dsnrms` double DEFAULT NULL, |
|
`elong` double DEFAULT NULL, |
|
`exptime` double DEFAULT NULL, |
|
`fid` tinyint(4) DEFAULT NULL, |
|
`field` int(11) DEFAULT NULL, |
|
`fwhm` double DEFAULT NULL, |
|
`jd` double DEFAULT NULL, |
|
`jdendhist` double DEFAULT NULL, |
|
`jdendref` double DEFAULT NULL, |
|
`jdstarthist` double DEFAULT NULL, |
|
`jdstartref` double DEFAULT NULL, |
|
`magap` double DEFAULT NULL, |
|
`magapbig` double DEFAULT NULL, |
|
`magdiff` double DEFAULT NULL, |
|
`magfromlim` double DEFAULT NULL, |
|
`maggaia` double DEFAULT NULL, |
|
`maggaiabright` double DEFAULT NULL, |
|
`magnr` double DEFAULT NULL, |
|
`magpsf` double DEFAULT NULL, |
|
`magzpsci` double DEFAULT NULL, |
|
`magzpscirms` double DEFAULT NULL, |
|
`magzpsciunc` double DEFAULT NULL, |
|
`mindtoedge` double DEFAULT NULL, |
|
`nbad` int(11) DEFAULT NULL, |
|
`ncovhist` int(11) DEFAULT NULL, |
|
`ndethist` int(11) DEFAULT NULL, |
|
`neargaia` double DEFAULT NULL, |
|
`neargaiabright` double DEFAULT NULL, |
|
`nframesref` int(11) DEFAULT NULL, |
|
`nid` int(11) DEFAULT NULL, |
|
`nmatches` int(11) DEFAULT NULL, |
|
`nmtchps` int(11) DEFAULT NULL, |
|
`nneg` int(11) DEFAULT NULL, |
|
`objectidps1` bigint(20) DEFAULT NULL, |
|
`objectidps2` bigint(20) DEFAULT NULL, |
|
`objectidps3` bigint(20) DEFAULT NULL, |
|
`pid` bigint(20) DEFAULT NULL, |
|
`programid` tinyint(4) DEFAULT NULL, |
|
`ra` double DEFAULT NULL, |
|
`ranr` double DEFAULT NULL, |
|
`rb` double DEFAULT NULL, |
|
`rcid` tinyint(4) DEFAULT NULL, |
|
`rfid` int(11) DEFAULT NULL, |
|
`scorr` double DEFAULT NULL, |
|
`seeratio` double DEFAULT NULL, |
|
`sgmag1` double DEFAULT NULL, |
|
`sgmag2` double DEFAULT NULL, |
|
`sgmag3` double DEFAULT NULL, |
|
`sgscore1` double DEFAULT NULL, |
|
`sgscore2` double DEFAULT NULL, |
|
`sgscore3` double DEFAULT NULL, |
|
`sharpnr` double DEFAULT NULL, |
|
`sigmagap` double DEFAULT NULL, |
|
`sigmagapbig` double DEFAULT NULL, |
|
`sigmagnr` double DEFAULT NULL, |
|
`sigmapsf` double DEFAULT NULL, |
|
`simag1` double DEFAULT NULL, |
|
`simag2` double DEFAULT NULL, |
|
`simag3` double DEFAULT NULL, |
|
`sky` double DEFAULT NULL, |
|
`srmag1` double DEFAULT NULL, |
|
`srmag2` double DEFAULT NULL, |
|
`srmag3` double DEFAULT NULL, |
|
`ssdistnr` double DEFAULT NULL, |
|
`ssmagnr` double DEFAULT NULL, |
|
`ssnrms` double DEFAULT NULL, |
|
`sumrat` double DEFAULT NULL, |
|
`szmag1` double DEFAULT NULL, |
|
`szmag2` double DEFAULT NULL, |
|
`szmag3` double DEFAULT NULL, |
|
`tblid` int(11) DEFAULT NULL, |
|
`tooflag` int(11) DEFAULT NULL, |
|
`xpos` double DEFAULT NULL, |
|
`ypos` double DEFAULT NULL, |
|
`zpclrcov` double DEFAULT NULL, |
|
`zpmed` double DEFAULT NULL, |
|
`objectId` varchar(45) DEFAULT NULL, |
|
`drbversion` varchar(100) DEFAULT NULL, |
|
`isdiffpos` varchar(100) DEFAULT NULL, |
|
`pdiffimfilename` varchar(100) DEFAULT NULL, |
|
`programpi` varchar(100) DEFAULT NULL, |
|
`rbversion` varchar(100) DEFAULT NULL, |
|
`ssnamenr` varchar(100) DEFAULT NULL, |
|
`dec` double DEFAULT NULL, |
|
PRIMARY KEY (`primaryId`), |
|
UNIQUE KEY `objectid_jd` (`objectId`,`jd`) |
|
) ENGINE=Innodb AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE utf8_general_ci; |
|
""" % locals() |
|
writequery( |
|
log=log, |
|
sqlQuery=sqlQuery, |
|
dbConn=dbConn |
|
) |
|
|
|
# READ THE AVRO PACKET |
|
|
|
from fastavro import reader |
|
|
|
batchSize = 10000 |
|
total = len(packets[1:]) |
|
batches = int(total / batchSize) |
|
|
|
start = 0 |
|
end = 0 |
|
theseBatches = [] |
|
for i in range(batches + 1): |
|
end = end + batchSize |
|
start = i * batchSize |
|
thisBatch = packets[start:end] |
|
theseBatches.append(thisBatch) |
|
|
|
index = 1 |
|
for batch in theseBatches: |
|
|
|
histories = [] |
|
candidates = [] |
|
for packet in batch: |
|
with open(packet, 'rb') as fo: |
|
|
|
# PRINT THE PACKET SCHEMA NAMES |
|
avro_reader = reader(fo) |
|
schema = avro_reader.schema |
|
for record in avro_reader: |
|
objectid = record["objectId"] |
|
history = record["prv_candidates"] |
|
candidate = record["candidate"] |
|
# HISTORY IS MULTIPLE ENTRIES |
|
if history: |
|
for h in history: |
|
h["objectId"] = objectid |
|
histories += history |
|
# HISTORY IS SINGLE ENTRY |
|
candidate[u"objectId"] = objectid |
|
candidates.append(candidate) |
|
|
|
# history.append(candidate) |
|
|
|
# USE dbSettings TO ACTIVATE MULTIPROCESSING - INSERT LIST OF |
|
# DICTIONARIES INTO DATABASE |
|
insert_list_of_dictionaries_into_database_tables( |
|
dbConn=dbConn, |
|
log=log, |
|
dictList=candidates, |
|
dbTableName="candidates", |
|
uniqueKeyList=["objectId", "jd"], |
|
dateModified=False, |
|
dateCreated=False, |
|
batchSize=2500, |
|
replace=False, |
|
dbSettings=dbSettings |
|
) |
|
|
|
insert_list_of_dictionaries_into_database_tables( |
|
dbConn=dbConn, |
|
log=log, |
|
dictList=histories, |
|
dbTableName="candidates", |
|
uniqueKeyList=["objectId", "jd"], |
|
dateModified=False, |
|
dateCreated=False, |
|
batchSize=2500, |
|
replace=False, |
|
dbSettings=dbSettings |
|
) |
|
|
|
if index > 1: |
|
# Cursor up one line and clear line |
|
sys.stdout.write("\x1b[1A\x1b[2K") |
|
percent = (float(index) / float(batches)) * 100. |
|
thisIndex = batchSize * index |
|
print '%(thisIndex)s/%(allPackets)s avro files ingested to sql database (%(percent)1.1f%% done)' % locals() |
|
index += 1 |
|
|
|
return |
|
|
|
if __name__ == '__main__': |
|
main() |