Skip to content

Instantly share code, notes, and snippets.

@gullevek
Last active February 18, 2021 02:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gullevek/c3baa7b777428ba90f62 to your computer and use it in GitHub Desktop.
Save gullevek/c3baa7b777428ba90f62 to your computer and use it in GitHub Desktop.
Writes all data from a given query to a csv file. Usefull for reading out large batches of data to avoid memory over usage. Uses async and CURSOR. Python version only uses CURSOR as named cursor and async do not work at the same time.
#!/usr/bin/perl
# AUTHOR: Clemens Schwaighofer
# DATE: 2015/8/6
# DSCRIPTION:
# Runs a query from a file or command line and outputs the data to a CSV file
# Runs query async/and CURSOR
use strict;
use warnings;
no strict 'refs'; # I need to allow dynamic references in this script
use utf8;
BEGIN {
use POSIX qw(floor);
use DBD::Pg ':async';
use Text::CSV_XS;
use Getopt::Long;
use Time::HiRes qw(time);
use Number::Format qw(format_number);
# assign a function to INTERRUPT
$SIG{INT} = \&tsktsk;
}
# in case a ^C is cought, cancel all remote running querries before exiting
sub tsktsk
{
# assign to itself again
$SIG{INT} = \&tsktsk;
warn "\nPGSQL async: ".$main::dbh->{'pg_async_status'}."\n";
# finish all asyncs
$main::dbh->pg_cancel() if ($main::dbh->{'pg_async_status'});
# finish all cursors
$main::dbh->do($main::close_query) || print "Cursor '".$main::cursor_name."' not yet declared\n"; # close any open cursors
# loop cursor close
foreach my $my_cur (keys %main::cur) {
$main::cur{$my_cur}->finish;
}
# close read cursor
$main::sth_read->finish if ($main::sth_read);
# close all DB connections
$main::dbh->disconnect();
# output data
close($main::CSV_DATA);
# die, so the script does not continue
die "Exit via ^C\n";
}
# METHOD: check_output_file
# PARAMS: file name
# RETURN: none
# DESC : removes the file if not data was written
sub check_output_file
{
my ($file_name) = @_;
if (! -s $file_name) {
print_out("Delete file $file_name because no data was found", 1);
unlink($file_name);
}
}
# METHOD: convert_time
# PARAMS: timestamp, 1/0 for micro time output
# RETURN: formated string of the input timestamp in days, hours, minutes, seconds and optional micorseconds
# DESC : make time string from seconds interval timestamp
sub convert_time
{
my ($timestamp, $show_micro) = @_;
my $ms = '';
# cut of the ms, but first round them up to four
$timestamp = sprintf("%.4f", $timestamp);
($timestamp, $ms) = split(/\./, $timestamp);
my @timegroups = (86400, 3600, 60, 1);
my @output = ();
for (my $i = 0; $i < @timegroups; $i ++) {
push(@output, floor($timestamp / $timegroups[$i]));
$timestamp = $timestamp % $timegroups[$i];
}
# output has days|hours|min|sec
$timestamp = $output[0] ? $output[0].'d ' : ''; # days
$timestamp .= ($output[1] || $output[0]) ? $output[1].'h ' : ''; # hours
$timestamp .= ($output[2] || $output[1] || $output[0]) ? $output[2].'m ' : ''; # minutes
$timestamp .= $output[3].'s'; # seconds
$timestamp .= $show_micro ? ' '.(!$ms ? 0 : $ms).'ms' : ''; # microseconds
# return string
return $timestamp;
}
# converts bytes to human readable format
sub convert_number
{
my ($number) = @_;
my $pos; # the original position in the labels array
$number = 0 if (!$number);
# divied number until its division would be < 1024. count that position for label usage
for ($pos = 0; $number > 1024; $pos ++) {
$number = $number / 1024;
}
# before we return it, we format it [rounded to 2 digits, if has decimals, else just int]
# we add the right label to it and return
return sprintf(!$pos ? '%d' : '%.2f', $number).qw(B KB MB GB TB PB EB)[$pos];
}
# METHOD: print_out
# PARAMS: message, verbose level, no line break
# RETURN: n/a
# DESC : prints out the message based on the global verbose level
sub print_out
{
my ($message, $v_level, $no_lb) = @_;
# debug data is only printend, when debug flag is on
print $message.(!$no_lb ? "\n" : '') if ($main::verbose >= $v_level);
}
# no buffering for output
$| ++;
binmode STDOUT, ":encoding(utf8)";
binmode STDIN, ":encoding(utf8)";
binmode STDERR, ":encoding(utf8)";
my $error = 0;
my %opt = ();
our $verbose = 0;
my $query = '';
my $output_file = '';
my $db_connect_string = '';
my $no_async = 0;
my $no_declare = 0;
my $wait_time = 10; # wait time for a finish check. defaul it is 10 seconds
# add prompt bundeling (eg -qqq
Getopt::Long::Configure ("bundling");
# command line
my $result = GetOptions(\%opt,
'q|query=s' => \$query,
'o|output=s' => \$output_file,
'd|db=s' => \$db_connect_string,
'w|wait=s' => \$wait_time,
'no-async' => \$no_async, # do not run querys async
#'no-declare' => \$no_declare, # do no collect data server side
'verbose|v+' => \$verbose,
'help' # just help
) || exit 1;
if ($opt{'help'}) {
print "Possible options\n";
print "--query|--q <file or query>\tCan either be a file that has the query inside or the query itself in a string\n";
print "--output|--o <output file name>\tThe data read from the query is written into this file including headers\n";
print "--db|-d <db connect string>\tConnection data in the following format: user=;pass=;dbname=;host=;port=\n";
print "--wait|-w <time in seconds>\tOverride the default wait time of 10 seconds, Needs to be in range of 1 to 60\n";
print "--no-async\tDon't run the query in async form\n";
#print "--no-declare\tDon't run DECLARE on the server and collect data local\n";
print "--verbose|-v [--verbose|-v ...]\tShow more info, at least one -v has to be given to see standard percent output. not needed for --list\n";
print "--help\t this page\n";
print "\n";
exit 0;
}
if (! -f $query && -t STDIN) {
print "Please give a file with the query, use STDIN (pipe) or the query itself with the --query parameter\n";
$error = 1;
}
if (!$output_file) {
print "Please give a target output file with the --output paramter\n";
$error = 1;
}
if (!$db_connect_string) {
print "Please give the db connection string with the --db paramter\n";
$error = 1;
}
if ($db_connect_string !~ /user=([^;.]*);?/ && $db_connect_string !~ /dbname=([^;.]*);?/) {
print "The db connection string needs at least a username and database name\n";
$error = 1;
}
if ($wait_time < 1 || $wait_time > 60) {
print "Wait time needs to be a value between 1 and 60\n";
$error = 1;
}
# exit if error
exit 1 if ($error);
# input/output encoding for files
my $encoding = 'utf8';
# db stuff
my $dsn;
my $db_user;
my $db_pass;
our $dbh; # crm
# query
my %_query = ();
our %cur = ();
our $sth_read;
# cursor with timestamp connected
our $cursor_name = 'csr_'.join('_', split(/\./, time()));
# for cursor query, this is done for the big reads
my $q_name = 'read_data';
my $do_query = "DECLARE ".$cursor_name." CURSOR WITH HOLD FOR "; # the cursor declaration is always the smae
my $move_all_query = "MOVE ALL ".$cursor_name; # for getting the count
my $move_first_query = "MOVE ABSOLUTE 0 ".$cursor_name; # move back to the top (move first moves to the first entry and a fetch will then get from the 2nd on, we need to go back to the top), BACKWARD ALL also possible
my $fetch_query = "FETCH 10000 FROM ".$cursor_name; # 10000 rows per fetch should be ok in size
our $close_query = "CLOSE ".$cursor_name; # close the cursor at the end
my $run_do_query = ''; # combined do_query + read query for declaration run
# various variables
my $result_rows;
my $rows_read;
my @csv_header = ();
# count and stats
my $first_run = 1;
my $start_run;
my $start_read;
my $count = 0;
my %count_detail = ();
my $percent;
my $_percent = -1;
# csv file handlers
our $CSV_DATA;
# load the query or set the query
$_query{$q_name} = '';
# if it is a readable file, assume we read the query data from the file
if (-f $query) {
print_out("Reading query from file $query", 1);
my $FP;
open($FP, '<:encoding('.$encoding.')', $query) || die ("Can't open $query file for reading query data: $!\n");
while (<$FP>) {
chomp $_;
# skip any line that starts with --
if ($_ !~ /^(\s+)?--/) {
# strip out any data post a -- in the string
$_ =~ s/--.*//g;
# with safty space in front
$_query{$q_name} .= ' ' if ($_query{$q_name});
$_query{$q_name} .= $_;
}
}
close($FP);
} elsif (! -t STDIN) {
while (<STDIN>) {
chomp $_;
# skip any line that starts with --
if ($_ !~ /^\s+?--/) {
# strip out any -- that is in the string
$_ =~ s/--.*//g;
# with safty space in front
$_query{$q_name} .= ' ' if ($_query{$q_name});
$_query{$q_name} .= $_;
}
}
} else {
print_out("Setting query from command line", 1);
$_query{$q_name} = $query;
}
# strip any ; from the query. anywhere, they are no used
$_query{$q_name} =~ s/;//g;
# if the query is not starting a select it is invalid
if ($_query{$q_name} !~ /^(\s+)?(with|select)/i) {
print "!!! Query needs to start with a SELECT or WITH statement\n";
# should soft fail. could be with too?
# but just in case, now it needs to be select
# should be more strict check: no update, delete, insert, etc in subquery
exit;
}
# open database connection
print_out("Logging into Database: ".$db_connect_string, 1);
if ($db_connect_string =~ /user=([^;.]*);?/) {
$db_user = $1;
$db_connect_string =~ s/user=([^;.]*);?//;
}
if ($db_connect_string =~ /pass=([^;.]*);?/) {
$db_pass = $1;
$db_connect_string =~ s/pass=([^;.]*);?//;
}
$dsn = "DBI:Pg:".$db_connect_string;
$dbh = DBI->connect($dsn, $db_user, $db_pass) || die ("Can't connect to db $db_connect_string with user $db_user\n");
# prepare queries
foreach my $key (keys %_query) {
my $query = $_query{$key};
# all csv queries are cursor queries, everything else is normal
# if no declare is flagged then do not prefix it with a declare
$run_do_query = (!$no_declare ? $do_query : '').$query;
# if we have async yes and no override flag, set this query to be async
if (!$no_async) {
$cur{$key} = $dbh->prepare(qq{ $run_do_query }, {pg_async => PG_ASYNC}) || die "[$key] Can't prepare $DBI::errstr\n";
} else {
$cur{$key} = $dbh->prepare(qq{ $run_do_query }) || die "[$key] Can't prepare $DBI::errstr\n";
}
}
# open output file
open($CSV_DATA, '>:encoding('.$encoding.')', $output_file) || die ("Can't open $output_file for writing: $!\n");
# for the export list data
my $csv = Text::CSV_XS->new ({
'binary' => 1,
'eol' => "\r\n"
});
print_out("Execute query [$q_name] ...", 1);
# set overall read time before the query is executed
$start_read = time();
$cur{$q_name}->execute() || die ("Canot execute: ".$cur{$q_name}->errstr."\n");
if (!$no_async) {
print_out("Waiting for query to execute {$q_name} [", 1, 1);
my $show_count = 1;
while (!$dbh->pg_ready) {
# print a dot only for every 10 (% mod)
print_out(".", 1, 1) if (!($show_count % 10));
# show time since read start about every 10 min, (runs depend on wait time)
if ($show_count == int(600 / $wait_time)) {
print_out("(".convert_time(time() - $start_read).")", 1, 1);
$show_count = 0;
}
$show_count ++;
sleep $wait_time; # wait n seconds
}
my $end_data = $cur{$q_name}->pg_result || die ("Cannot call pg result: ".$cur{$q_name}->errstr."\n"); # needs to be called?
print_out("] {Status (".$end_data.") ", 1, 1);
}
# get the row count that will be returned
if (!$no_declare) {
$result_rows = $dbh->do($move_all_query) || die ("Cannot move all: ".$DBI::errstr."\n");
} else {
$result_rows = $cur{$q_name}->rows;
}
# set to 0, if it has an 0 result of 0E0
$result_rows = 0 if ($result_rows eq '0E0');
$dbh->do($move_first_query) || die ("Cannot move first ".$DBI::errstr."\n") if (!$no_declare);
print_out("Returned: ".format_number($result_rows).", Run for: ".convert_time(time() - $start_read).((!$no_async) ? '}' : ''), 1);
# if nothing was found, abort
if (!$result_rows) {
print "Could not find any data, aborting run\n";
close($CSV_DATA);
check_output_file($output_file);
$dbh->do($close_query);
$dbh->disconnect();
exit 0;
}
$start_run = time();
print_out("Reading data [$q_name]...", 1);
## TODO: if no_declare is set, we need to loop with the basic loop and not with a double exit loop
# prepare the fetch query
$sth_read = $dbh->prepare($fetch_query) || die ("Cannot prepare fetch: ".$DBI::errstr."\n");
while (1) {
# first one will be time consuming
$sth_read->execute() || die ("Cannot execute fetch: ".$sth_read->errstr."\n");
# returned rows to see if we should end
$rows_read = $sth_read->rows;
last if 0 == $rows_read;
print_out("... Reading ".format_number($rows_read)." rows of ".format_number($result_rows), 2);
while (my @data = $sth_read->fetchrow_array) {
# if we are in a first read pos, read that in as the header for the csv, also used as the loop reader for the columns
# ALTERNATIVE: use the sth read + name as the looper
if ($first_run) {
print_out("N in sth p exc: ".join(',', @{$sth_read->{NAME}}), 3);
foreach my $column (@{$sth_read->{NAME}}) {
push(@csv_header, $column);
}
$csv->combine(@csv_header);
print $CSV_DATA $csv->string();
# first run sequence done
$first_run = 0;
}
# count processed data
$count ++;
$csv->combine(@data);
print $CSV_DATA $csv->string();
# some progress output here if verbose is 2 or more, just % data next to each other without linebreaks
$percent = sprintf("%d", ($count / $result_rows) * 100);
if ($percent != $_percent) {
$_percent = $percent;
print_out("$percent% ", 1, 1);
}
} # outer read loop for declared cursor
}
# add line break after percent output
print_out("", 1);
# close all DB connections
print_out("Close DB connection", 2);
$sth_read->finish;
foreach my $my_cur (keys %cur) {
$cur{$my_cur}->finish;
}
$dbh->do($close_query);
$dbh->disconnect();
$count_detail{$q_name}{'lines'} = $count;
$count_detail{$q_name}{'start_time'} = $start_run;
$count_detail{$q_name}{'end_time'} = time();
$count_detail{$q_name}{'time_run'} = $count_detail{$q_name}{'end_time'} - $count_detail{$q_name}{'start_time'};
# check if we have written anything, either empty file or 0 lines, if not unlink the open file
close($CSV_DATA);
check_output_file($output_file);
my $stats = "\n";
$stats .= "* Dump query output to csv finished.\n";
foreach my $q_name (sort keys %count_detail) {
$stats .= "< Input query: ".$query."\n";
$stats .= "> Output file: ".$output_file."\n";
$stats .= "| - Lines written : ".format_number($count_detail{$q_name}{'lines'})." (".convert_number(-s $output_file).")\n";
$stats .= "| - Data write time: ".convert_time($count_detail{$q_name}{'time_run'}, 1)."\n";
$stats .= "| - Process speed : ".sprintf("%s lines/s", format_number($count_detail{$q_name}{'lines'} / $count_detail{$q_name}{'time_run'}, 2))."\n";
$stats .= "| Overall run time : ".convert_time($count_detail{$q_name}{'end_time'} - $start_read)."\n";
}
print_out($stats, 0);
__END__
#!/usr/bin/python3
# AUTHOR: Clemens Schwaighofer
# DATE: 2015/8/6
# DSCRIPTION:
# Runs a query from a file or command line and outputs the data to a CSV file
# Runs query as a CURSOR (not async, doesn't work at the same time)
# Python version of perl script query_to_csv.pl
from math import floor
import argparse
import os
import sys
import re
import csv
import signal
from datetime import datetime
import psycopg2
import psycopg2.extensions
import psycopg2.extras
# ^C abort handler
def signal_handler(signal, frame):
print("Cought an abort on signal {}".format(signal))
RemoveOutputFile(args.output_file.name)
try:
cursor.cursor()
except cursor.cursor():
print("No open cursor")
try:
dbh.close()
except dbh.close():
print("No open database connection")
sys.exit(0)
# for argparse
# call: type=IntRange(n, m)
# custom defined range for n to m where data outside it is false, plus print only start and end for error
class IntRange(object):
def __init__(self, start, stop=None):
if stop is None:
start, stop = 0, start
self.start, self.stop = start, stop
def __call__(self, value):
value = int(value)
if (value < self.start or value > self.stop):
raise argparse.ArgumentTypeError('value out of of range between {} and {}'.format(self.start, self.stop))
return value
# METHOD: ConvertTimestamp
# PARAMS: timestamp, 1/0 for micro time output
# RETURN: formated string of the input timestamp in days, hours, minutes, seconds and optional micorseconds
# DESC : make time string from seconds interval timestamp
def ConvertTimestamp(timestamp, show_micro=0):
# cut of the ms, but first round them up to four
timestamp = str(round(float(timestamp), 4))
(timestamp, ms) = timestamp.split('.')
timestamp = int(timestamp)
ms = int(ms)
output = []
for i in [86400, 3600, 60, 1]:
output.append(int(floor(timestamp / i)))
timestamp = timestamp % i
# output has days|hours|min|sec ms
time_string = ''
if output[0]:
time_string = '%sd' % output[0]
if output[0] or output[1]:
time_string += '%sh ' % output[1]
if output[0] or output[1] or output[2]:
time_string += '%sm ' % output[2]
time_string += '%ss' % output[3]
if show_micro:
time_string += ' %sms' % ms if ms else ' 0ms'
return time_string
# METHOD: FormatBytes
# PARAMS: bytes data in numeric
# RETURN: formated string
# DESC : convert bytes into human readable format
def FormatBytes(num, suffix='B'):
if not num:
num = 0
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
# METHOD: wait
# PARAMS: connection
# RETURN: none
# DESC : if the connection is in ASYNC mode, this has to be called after connection and each query execution
def wait(conn):
import select
status = ''
print("Async Status: ", end='')
while 1:
state = dbh.poll()
if state == psycopg2.extensions.POLL_OK:
print("READY({}/{})".format(state, psycopg2.extensions.POLL_OK))
break
elif state == psycopg2.extensions.POLL_WRITE:
status = "w({})".format(state)
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
status = ("r({})".format(state))
select.select([conn.fileno()], [], [])
else:
print("[!]({})".format(state))
raise psycopg2.OperationalError("poll() returned %s" % state)
print("{}".format(status), end='')
# METHOD: RemoveOutputFile
# PARAMS: file name
# RETURN: none
# DESC : checks if file is 0 bytes, and if yes removes it
def RemoveOutputFile(file_name):
if (os.stat(file_name).st_size == 0):
print("[!] Empty output file: {}. Will remove this file.".format(file_name))
os.unlink(file_name)
# add signal catcher
signal.signal(signal.SIGINT, signal_handler)
# ==== ARGUMENT PARSE ====
# launch parser
parser = argparse.ArgumentParser(
description='Reads query from commandline or file and processes it in a cursor based run and prints out the result to the given output file as CSV.',
epilog='Default run uses asynchronous query. Use --no-async to turn off.'
)
# the options
# query string or file
parser.add_argument(
'-q',
'--query',
default=sys.stdin,
metavar='QUERY OR FILE NAME',
help='Can either be a file that has the query inside or the query itself in a string'
)
# the output csv file
parser.add_argument(
'-o',
'--output',
required=True,
type=argparse.FileType(mode='w', encoding='utf-8'),
dest='output_file',
metavar='OUTPUT FILE NAME',
help='The data read from the query is written into this file including headers'
)
# database connect string
parser.add_argument(
'-d',
'--db',
required=True,
dest='db_connect_string',
metavar='DATABASE CONNECT STRING',
help='Connection data in the following format: user=;pass=;dbname=;host=;port='
)
# wait time for async check
parser.add_argument(
'-w',
'--wait',
type=IntRange(1, 60),
default=10,
dest='wait_time',
metavar='TIME IN SECONDS',
help='Override the default wait time of 10 seconds, Needs to be in range of 1 to 60'
)
# no async flag
parser.add_argument(
'--no-async',
action='store_true',
dest='no_async',
help='Don\'t run the query in async form'
)
# verbose
parser.add_argument (
'-v',
'--verbose',
action='count',
default=0,
help='verbose setting'
)
# read in the argumens
args = parser.parse_args()
# ==== ARGUMENT CHECK ====
error = 0
# need basic check on connect string for at least user and dbname
if not re.match('user=([^;.]*);?', args.db_connect_string) and not re.match('dbname=([^;.]*);?', args.db_connect_string):
print("The db connection string needs at least a user and database name in the format \"user=;dbname=\"")
error = 1
if error == 1:
sys.exit(2)
# ==== QUERY PARAMETER CHECK ====
# check query input
# should write to new variable for work?
query_data_is_file = False
query_data_is_stdin = False
# if (os.stat(args.query).st_size == 0):
if (sys.stdin and hasattr(args.query, 'name')):
print("* Assume query is STDIN")
query_data_is_file = True
query_data_is_stdin = True
elif (not os.path.isfile(args.query)):
print("* Assume direct query: {}".format(args.query))
elif (os.path.isfile(args.query) and os.stat(args.query).st_size > 0):
print("* Assume query is file: {}".format(args.query))
args.query = open(args.query, encoding='utf-8')
query_data_is_file = True
else:
print("! Query is not defined: EXIT ({})".format(args.query))
sys.exit(1)
# close any open query reads
query_data = ''
if query_data_is_file is True:
regex_comment = re.compile('^(\s+)?--')
regex_inline_comment = re.compile('--.*')
# open better "with", but only can work with "file on parameter" not iwth std in
for line in args.query:
# reges for not reading anything that starts with --
# if not re.match('^(\s+)?--', line) and len(line) > 0:
if not regex_comment.match(line) and len(line) > 0:
# remove any part after a -- in the line
# line = re.sub(r'--.*', '', line)
line = regex_inline_comment.sub('', line)
# add a space between the joined lines
if query_data:
query_data += ' '
# remove any trailing white space (line breaks, etc)
query_data += line.rstrip()
# END IF MATCH
# close
args.query.close()
else:
# for direct data, as is
query_data = args.query
# if input is file, close file
if query_data_is_file is True:
args.query.close()
# replace and ; inside
query_data = re.sub(r';', '', query_data)
# check that query is a select, ignore all others for now (also with calls)
if not re.match('^(\s+)?(select|with)', query_data, flags=re.IGNORECASE):
print("[!] Query needs to start with a SELECT or WITH statement: {}".format(query_data))
RemoveOutputFile(args.output_file.name)
sys.exit(1)
# print ("Query: {}".format(query_data))
# ==== CSV FILE OPEN FOR WRITE ====
# open CSV for writing
csvWrite = csv.writer(args.output_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
# ==== POSTGRESQL OPEN ====
# open postgesql connection
# prepare connect string: replace ; with space and pass= with password=
args.db_connect_string = re.sub(r';', ' ', args.db_connect_string)
args.db_connect_string = re.sub(r'pass=', 'password=', args.db_connect_string)
# try connection
try:
print("> Logging into database: {}".format(args.db_connect_string))
dbh = psycopg2.connect(args.db_connect_string, async=False)
except psycopg2.Error as e:
print("[!] Connection could not be established with: {}, Code: {}, Error: {}".format(args.db_connect_string, e.pgcode, e.pgerror))
sys.exit(1)
# do with a loop (like wait_select + status info)
wait(dbh)
# ==== POSTGRESQL CURSOR ====
print("Execute query ... ", end='', flush=True)
start_time_query = datetime.now().timestamp()
# cursor is csr_ + timestamp _ microtime
cursor = dbh.cursor('csr_'.str(datetime.now().timestamp()).replace('.', '_'), scrollable=True)
try:
cursor.execute(query_data)
except psycopg2.Error as e:
print("\n[!] Query could not be executed\nCode: {}\nError: {}".format(e.pgcode, e.pgerror))
# close database and file
RemoveOutputFile(args.output_file.name)
dbh.close()
sys.exit(1)
# wait(cursor.connection)
# scroll +1 to do the final execution
print("| ", end='', flush=True)
cursor.scroll(1)
end_time_query = datetime.now().timestamp()
print("[DONE] ({})".format(ConvertTimestamp(end_time_query - start_time_query, 1)), flush=True)
# get max row counts via scroll
# scroll unless you get 0 in the statusmessage
# sum up all previous counts = max rows
print("Reading max rows from cursor ... ", end='', flush=True)
start_time_rows = datetime.now().timestamp()
move_cursor_value = 100000
max_rows = 0
# move to top
try:
cursor.scroll(0, mode='absolute')
except:
print("Cannot scroll this cursor")
while not re.match('MOVE 0', cursor.statusmessage) or not max_rows:
cursor.scroll(move_cursor_value, mode='relative')
m = re.search('MOVE (\d+)', cursor.statusmessage)
# if max rows is 0 and the return of the m.group is also 0, we abort
if max_rows == 0 and int(m.group(1)) == 0:
max_rows = -1
else:
max_rows += int(m.group(1))
# if the max_rows is -1; we have an error and exit
if max_rows == -1:
print("Could not find any data, aborting run")
# close cursor, dbh and file
RemoveOutputFile(args.output_file.name)
cursor.close()
dbh.close()
# and exit
sys.exit(1)
# move to top
cursor.scroll(0, mode='absolute')
end_time_rows = datetime.now().timestamp()
print("[DONE] {0:,} rows ({1})".format(max_rows, ConvertTimestamp(end_time_rows - start_time_rows, 1)), flush=True)
print("Reading data into CSV file:", flush=True)
start_time_run = datetime.now().timestamp()
# set iteration size to a higher value
iter_cursor_value = 100000
cursor.itersize = iter_cursor_value
# row print & csv write
first_run = 0
row_count = 0
_percent = 0
for row in cursor:
# first run sets header in CSV file
if first_run == 0:
csv_header = []
# create header names based on col names from the query
for col in cursor.description:
csv_header.append(col.name)
# write them to the CSV file
csvWrite.writerow(csv_header)
first_run = 1
# write normal data to csv file
csvWrite.writerow(row)
# row counter
row_count += 1
# procent output
percent = "{0:d}".format(round((row_count / max_rows) * 100), 0)
if percent != _percent:
_percent = percent
print("{}% ".format(percent), end='', flush=True)
cursor.close()
end_time_run = datetime.now().timestamp()
print("\n[DONE]", flush=True)
print("- Close DB connection and CSV file handler")
# close DB connection
dbh.close()
# close csv file handler
args.output_file.close()
# ==== POST CLEAN UP ====
# post clean up
# unlink output_file if it is empty
RemoveOutputFile(args.output_file.name)
# ==== STATS OUTPUT ====
# calculte the run times for each set
query_time = end_time_query - start_time_query
count_time = end_time_rows - start_time_rows
run_time = end_time_run - start_time_run
print("")
print("* Dump query output to csv finished.")
print("+ Run from {} to {}".format(datetime.fromtimestamp(start_time_query).strftime('%Y-%m-%d %H:%M:%S'), datetime.fromtimestamp(end_time_run).strftime('%Y-%m-%d %H:%M:%S')))
print("< Input query: {}".format(query_data if query_data_is_stdin or not query_data_is_file else args.query.name))
print("> Output file: {}".format(args.output_file.name))
# alt format(row_count, ',d')
print("|- Lines written : {0:,} ({1})".format(row_count, FormatBytes(os.stat(args.output_file.name).st_size)))
print("|- Query run time : {}".format(ConvertTimestamp(query_time, 1)))
print("|- Rows count time: {}".format(ConvertTimestamp(count_time, 1)))
print("|- Data write time: {}".format(ConvertTimestamp(run_time, 1)))
print("|- Process speed : {0:,.2f} lines/s".format(float(row_count) / float(run_time)))
print("| Overall run time: {}".format(ConvertTimestamp(query_time + count_time + run_time, 1)))
# __END__
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment