Skip to content

Instantly share code, notes, and snippets.

@GlulkAlex
Created September 9, 2017 05:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GlulkAlex/daac4ba8bd51090556a7a7333defdd63 to your computer and use it in GitHub Desktop.
Save GlulkAlex/daac4ba8bd51090556a7a7333defdd63 to your computer and use it in GitHub Desktop.
PostgreSQL_XML_XSLT
#!/usr/bin/env python3
#!/usr/bin/python
from collections import namedtuple
import psycopg2
#from psycopg2 import sql
# note
# that we have to import the Psycopg2 extras library !
#import psycopg2.extras
from psycopg2 import extras
from psycopg2.extras import NamedTupleConnection
#from psycopg2.extras import execute_values
import sys
import os
import posix
#
import xml.etree.ElementTree as ET
# or ? <- not the same ?
from xml.etree.ElementTree import ElementTree
import xml.parsers.expat
#
from pprint import pprint, pformat
import array
#
import cProfile, pstats, io
#
#
def main(
items_Total_n: int,
connection_Str: str = "",
is_DeBug_Mode: bool = 1 == 0,
show_Limit: int = 80
):
"""
"""
task_Goal = """ @toDo:
- check DB table
- clear table if non-empty
- populate table|insert nItemsTotal items into it
(- read actual data from DB )
- store table (data) to|as XML file (1.xml)
<?xml version="1.0"?>
<entries>
<entry>
<field>значение поля field</field>
</entry>
...
<entries>
- read and modify stored file using XSLT
<?xml version="1.0"?>
<entries>
<entry field="значение поля field">
...
</entries>
- save modified data in a separate file (2.xml)
- read attributes from last file
- output attributes sum to a console as a result
Restrictions:
(overall) running time
for nItemsTotal <= pow( 10, 6 )
must be|expected to be <=|less then 5 minutes
"""
#print( "posix.environ:{}".format( pformat( posix.environ ) ) )
pr = None
if is_DeBug_Mode:
pr = cProfile.Profile()
#
# Identity element neutral to addition
field_Attribute_Sum = 0
#
if connection_Str == "":
# Define|initialize to default connection string
user = os.getenv(
#key,
# { b'USER', b'LOGNAME' }
"LOGNAME",
default = None
)
# DSN
connection_Str = (
(
#"host='localhost' " +
"dbname={} user={}"# +
#" password={}"
).format(
user, user
#, "indent"
)
)
#
# print the connection string we will use to connect
print( "Connecting to database\n\t>{}".format( connection_Str ) )
#
# How do I interrupt a long-running query in an interactive shell ?
# Starting from psycopg 2.6.2,
# the `wait_select callback` can handle a 'Ctrl-C' correctly.
### but: psycopg2.ProgrammingError: copy_from cannot be used with an asynchronous callback.
#psycopg2.extensions.set_wait_callback( psycopg2.extras.wait_select )
#
if is_DeBug_Mode:
pr.enable()
# get a connection,
# if a connect cannot be made
# an exception will be raised here
#
# Starting from version 2.5,
# psycopg2’s connections and cursors are context managers
# and can be used with the with statement:
with psycopg2.connect( connection_Str ) as connection:
#
# set_session(isolation_level=None, readonly=None, deferrable=None, autocommit=None)
connection.set_session( autocommit = True )
#
with connection.cursor(
cursor_factory = (
#psycopg2.extras.DictCursor
psycopg2.extras.NamedTupleCursor
)
) as cursor:
#
#cursor.execute( SQL )
#
# psycopg2.OperationalError: fe_sendauth: no password supplied
# psycopg2.OperationalError: FATAL: password authentication failed for user "?!?"
# connect(ion)
# The function connect() creates a new database session
# and returns a new connection instance.
#connection = psycopg2.connect( connection_Str )
#
# conn.cursor will return a cursor object,
# you can use this cursor to perform queries
#cursor = connection.cursor()
#
if is_DeBug_Mode and 1 == 0:
pprint( dir( psycopg2 ) )
if is_DeBug_Mode and 1 == 0:
pprint( dir( psycopg2.extras ) )
if is_DeBug_Mode and 1 == 0:
pprint( dir( psycopg2.extensions ) )
if is_DeBug_Mode and 1 == 0:
pprint( dir( psycopg2.extensions.cursor ) )
# note that
# we pass a `cursor_factory` argument
# as dictionary cursor
# so COLUMNS will be returned as a dictionary
# to give access to columns by their name instead of index.
skip_4 = """
cursor = connection.cursor(
cursor_factory = (
#psycopg2.extras.DictCursor
psycopg2.extras.NamedTupleCursor
)
)
"""
print( "Connected ! type( cursor ):{}".format( type( cursor ) ) )
# Connected ! type( cursor ):<class 'psycopg2.extensions.cursor'>
#
if 1 == 0:
print( "COPY test TO STDOUT WITH CSV HEADER:" )
# psycopg2.ProgrammingError: copy_expert cannot be used with an asynchronous callback.
cursor.copy_expert( "COPY test TO STDOUT WITH CSV HEADER", sys.stdout )
# field # <- header for empty table
#
query_Select_All = "SELECT * FROM test"
query_Count_All = "SELECT Count(*) FROM test"
# count: 0, (1 row) for empty table
query_Select_Top_1 = "SELECT * FROM test LIMIT 1"
# execute our Query
cursor.execute(
#query_Select_All
query_Select_Top_1
)
print( "cursor.query:", cursor.query )
#
### does this consumes|depletes the iterable content
### and changes the current cursor pointer ?
# Fetch the next row
# of a query result set,
# returning a single tuple,
# or None when no more data is available:
first_Record = cursor.fetchone()
print( "first_Record|cursor.fetchone():" )
print( first_Record )
# None # < for empty
if first_Record is not None:
#
cursor.execute( "DELETE FROM test" )
print( "cursor.query:", cursor.query )
#
print( "cursor is closed:", cursor.closed )
#
cursor.execute( query_Select_All )
print( "cursor.query:", cursor.query )
# fetchmany([size=cursor.arraysize])
# Fetch the next set of rows
# of a query result,
# returning a list of tuples.
# An empty list is returned
# when no more rows are available.
# arraysize
# This read/write attribute
# specifies the number of rows to fetch at a time with fetchmany().
# It defaults to 1 meaning to fetch a single row at a time.
#
# retrieve the records from the database
# Fetch all (remaining) rows of a query result,
# returning them as a list of tuples.
# An empty list is returned if there is no more record to fetch.
# psycopg2.ProgrammingError: no results to fetch
records = cursor.fetchall()
#
# print out the records using pretty print
# note that
# the NAMES of the columns are not shown,
# instead just indexes.
print( "records|cursor.fetchall():" )
pprint( records )
# [] # for empty
#
# New in version 2.7
# psycopg2.extras.execute_batch(cur, sql, argslist, page_size=100)
# Execute groups of statements in fewer server roundtrips.
# Execute sql several times,
# against all parameters set (sequences or mappings) found in argslist.
query_Insert_Into = "INSERT INTO test (field) VALUES($1)"
args_Tuple = ( ( 1, ), ) * items_Total_n
args_List = [ 1 ] * 3#items_Total_n
# field type
# smallint 2 bytes small-range integer -32768 to +32767
smallint_Max = 32767
byte_Max = 256
# Generator
args_Iter = ( i % smallint_Max for i in range( 0, items_Total_n, 1 ) )
if is_DeBug_Mode:
### iteration over iterator sets has_next to False
# AttributeError: 'generator' object has no attribute 'copy'
expected_Sum = sum(
#args_Iter
#tuple( args_Iter )
( i % smallint_Max for i in range( 0, items_Total_n, 1 ) )
)
# Bytes objects are immutable sequences of single bytes.
# Since many major binary protocols are based on the ASCII text encoding,
# bytes objects offer several methods
# that are only valid when working with ASCII compatible data
# and are closely related to string objects in a variety of other ways.
# for 10'000 * smallint, file.size: 47.7 KiB
# for 100'000 * smallint, file.size: 550.6 KiB
# for 1'000'000 * smallint, file.size: 5.4 MiB
bytes_Data = bytes( '\n'.join( map( str, args_Iter ) ), encoding = "utf-8" )
if is_DeBug_Mode and 1 == 1:
print( "bytes_Data:" )
pprint( bytes_Data[:show_Limit] )
# for big items_Total_n:
# ValueError: byte must be in range(0, 256)
args_ByteArray = bytearray(
b'\x00'
# it woks
#bytes( '\n'.join( map( str, args_Iter ) ), encoding = "utf-8" )
)
# bytearray(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08')
# \x1f !"#$%&\'()*+,-./0123456789:;<=>?@
# ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f
#
# for size: 1000 bytes, file.size: 3.9 KiB
# for size: 100000 bytes, file.size: 390.6 KiB
# for size: 1000000 bytes, file.size: 3.8 MiB
args_Array = array.array( 'I', tuple( args_Iter )[:show_Limit] )
if is_DeBug_Mode and 1 == 1:
#pprint( args_Tuple[:show_Limit] )
print( "args_Array.tobytes():" )
pprint( args_Array.tobytes()[:show_Limit] )
# >>> bytes('\n'.join(map(str,(257,258))), encoding="utf-8")
# b'257\n258'
# TypeError: sequence item 0: expected a bytes-like object, int found
#args_ByteArray = b'\n'.join( args_Array.tobytes() )
# TypeError: 'generator' object is not subscriptable
# -> cannot slice args_Iter[:3]
args_Iter = ( ( n, ) for n in range( 0, 3, 1 ) )
# array.tobytes()
# Convert the array to an array of machine values
# and return the bytes representation
# (the same sequence of bytes that would be written to a file by the tofile() method.)
# array.tofile(f)
# Write all items (as machine values) to the file object f.
#
#>>> m = memoryview(b"abc")
#>>> m.tobytes()
#b'abc'
#>>> bytes(m)
#b'abc'
#import array
#>>> a = array.array('I', [1, 2, 3, 4, 5])
#>>> b = array.array('d', [1.0, 2.0, 3.0, 4.0, 5.0])
#>>> c = array.array('b', [5, 3, 1])
#>>> x = memoryview(a)
# m.release()
# or
#>>> with memoryview(b'abc') as m:
#... m[0]
if is_DeBug_Mode and 1 == 1:
#pprint( args_Tuple[:show_Limit] )
print( "args_ByteArray:" )
pprint( args_ByteArray[:show_Limit] )
print( "args_Array:" )
pprint( args_Array[:show_Limit] )
# from: "https://www.postgresql.org/docs/current/static/sql-prepare.html"
# PREPARE name [ ( data_type [, ...] ) ] AS statement
# Create a prepared statement for an INSERT statement, and then execute it:
# PREPARE fooplan (int, text, bool, numeric) AS
# INSERT INTO foo VALUES($1, $2, $3, $4);
# EXECUTE fooplan(1, 'Hunter Valley', 't', 200.00);
# psycopg2.ProgrammingError: syntax error at or near "INTO"
skip_1 = """
cursor.execute( "PREPARE populate (int) AS {}".format( query_Insert_Into ) )
# AttributeError: 'module' object has no attribute 'execute_batch'
psycopg2.extras.execute_batch(
cursor,
"EXECUTE populate( %s, )",
#params_list
args_Tuple
)
cursor.execute( "DEALLOCATE populate" )
"""
# psycopg2.extras.execute_values(cur, sql, argslist, template=None, page_size=100)
# Execute a statement using VALUES with a sequence of parameters.
# argslist – sequence of sequences or dictionaries
# with the arguments to send to the query.
# The type and content must be consistent with template.
skip_1 = """
psycopg2.extras.execute_values(
cursor,
"INSERT INTO test (field) VALUES %s",
args_Tuple
)
"""
#cursor.mogrify( "INSERT INTO test (num, data) VALUES (%s, %s)", ( 42, 'bar' ) )
if is_DeBug_Mode and 1 == 0:
# TypeError: not all arguments converted during string formatting
print( cursor.mogrify( "INSERT INTO test (field) VALUES (%s)", args_Tuple ) )
#
###
### WARN: ridiculously slow even for|when inserting only 32767 smallint
###
if is_DeBug_Mode and 1 == 1:
print( "cursor.executemany ({}) INSERTs INTO test".format( items_Total_n ) )
# cur.executemany(sql, argslist)
# TypeError: not all arguments converted during string formatting
skip_2 = """
cursor.executemany(
"INSERT INTO test (field) VALUES (%s)",
# psycopg2.ProgrammingError: syntax error at or near "1"
# LINE 1: INSERT INTO test (field) VALUES 1
#"INSERT INTO test (field) VALUES %s",
#args_Tuple
#args_List
args_Iter
)
print( "cursor.query:", cursor.query )
"""
if 1 == 0:
step = 1
#
###
### WARN: noticeably slow even for|when inserting only 300 smallint
###
cursor.execute( "PREPARE populate (int) AS {}".format( query_Insert_Into ) )
#for i in range( 0, items_Total_n, step ):
for next_Val in args_Iter:
pass
skip_3 = """
cursor.execute(
"INSERT INTO test (field) VALUES (%s)",
#args_Tuple
args_List
)
"""
cursor.execute( "EXECUTE populate( %s )", next_Val )
cursor.execute( "DEALLOCATE populate" )
#
# ValueError: must have exactly one of create/read/write/append mode
with open( 'test_Data', 'wb') as f:
#
if 1 == 1:
if is_DeBug_Mode:
print( "about to write({}) bytes to 'test_Data' file", len( bytes_Data ) )
f.write(
#b'0123456789abcdef'
# it works
#args_ByteArray
bytes_Data
)
if 1 == 0:
args_Array.tofile(f)
#
# copy_from()
# copy_from(file, table, sep='\t', null='\\N', size=8192, columns=None)
# Reads data from a file-like object
# appending them to a database table (COPY table FROM file syntax).
# The source file must provide
# both read()
# and readline() method.
#f = StringIO("42\tfoo\n74\tbar\n")
#cur.copy_from(f, 'test', columns=('num', 'data'))
with open( 'test_Data', 'rb') as f:
#
# psycopg2.ProgrammingError: copy_from cannot be used with an asynchronous callback.
cursor.copy_from( f, 'test', columns = ( 'field', ) )
#
# (100) INSERTs INTO test
# Record(count=100)
# 311 function calls in 0.063 seconds
# (1000) INSERTs INTO test
# Record(count=1000)
# 1211 function calls in 0.056 seconds
# (10000) INSERTs INTO test
# Record(count=10000)
# 10211 function calls in 0.082 seconds
# (100000) INSERTs INTO test
# Record(count=100000)
# 100211 function calls in 0.393 seconds
# (1000000) INSERTs INTO test
# Record(count=1000000)
# 1000211 function calls in 2.520 seconds
#pr.disable()
#
# If the dataset is too large
# to be practically handled on the client side,
# it is possible to create a `server side cursor`.
# Using this kind of cursor
# it is possible
# to transfer to the client only a controlled amount of data,
# so that
# a large dataset can be examined
# without keeping it entirely in memory.
# Psycopg wraps the database `server side cursor` in `named cursors`.
# A named cursor is created using the cursor() method
# specifying the `name` parameter.
# iterating over a `named cursor`
# fetches `itersize` records at time from the backend.
# The default is 2000.
cursor.execute(
#query_Count_All
query_Select_All
)
print( "cursor.query:", cursor.query )
row_count = 0
root_Entries = ET.Element( 'entries' )
# Note: cursor objects are iterable,
# so, instead of calling explicitly fetchone() in a loop,
# the object itself can be used:
for row in cursor:
#
row_count += 1
#print( row )
# Record(field=0)
entry = ET.SubElement( root_Entries, 'entry')
field = ET.SubElement( entry, 'field')
field.text = str( row.field )
if is_DeBug_Mode:
print( "{} added field.text:{}", row_count, field.text )
#
# class xml.etree.ElementTree.TreeBuilder(element_factory=None)¶
# Generic element structure builder.
# This builder converts a sequence of
# start, data, and end method calls
# to a well-formed element structure.
# You can use this class
# to build an element structure
# using a custom XML parser,
# or a parser for some other XML-like format.
# element_factory, when given, must be a callable
# accepting two positional arguments:
# a tag and a dict of attributes.
# It is expected to return a new element instance.
# xml.etree.ElementTree.SubElement(parent, tag, attrib={}, **extra)
# Subelement factory.
# This function creates an element instance,
# and appends it to an existing element.
#>>> a = ET.Element('a')
#>>> b = ET.SubElement(a, 'b')
#>>> c = ET.SubElement(a, 'c')
#>>> d = ET.SubElement(c, 'd')
#>>> ET.dump(a)
#<a><b /><c><d /></c></a>
# class xml.etree.ElementTree.ElementTree(element=None, file=None)
# ElementTree wrapper class.
# This class represents an entire element hierarchy,
# and adds some extra support
# for serialization to and from standard XML.
# element is the root element.
# The tree is initialized with the contents of the XML file if given.
# AttributeError: 'xml.etree.ElementTree.Element' object has no attribute 'write'
tree = ET.ElementTree( element = root_Entries )
tree.write( '1.xml' )
#
# Close communication with the database
#cursor.close()
#connection.close()
#
# class xml.sax.xmlreader.XMLReader
# Base class which can be inherited by SAX parsers.
# class xml.sax.xmlreader.IncrementalParser
# class xml.sax.saxutils.XMLGenerator(out=None, encoding=’iso-8859-1’, short_empty_elements=False)
#tree = ET.parse('country_data.xml')
#root = tree.getroot()
# Or directly from a string:
#root = ET.fromstring(country_data_as_string)
#>>> root.tag
#'data'
#>>> root.attrib
#{}
#>>> for child in root:
#... print(child.tag, child.attrib)
# 20.5.1.5. Modifying an XML File
# ===
# ElementTree provides
# a simple way to build XML documents
# and write them to files.
# The ElementTree.write() method serves this purpose.
# Once created,
# an Element object may be manipulated
# by directly changing its fields (such as Element.text),
# adding and modifying attributes (Element.set() method),
# as well as adding new children (for example with Element.append()).
# Let’s say we want
# to add one to each country’s `rank`,
# and add an `updated` attribute to the `rank` element:
#>>> for rank in root.iter('rank'):
#... new_rank = int(rank.text) + 1
#... rank.text = str(new_rank)
#... rank.set('updated', 'yes')
#>>> tree.write('output.xml')
#
# We can remove elements using Element.remove().
# Let’s say we want
# to remove all `countries`
# with a `rank` higher than 50:
#>>> for country in root.findall('country'):
#... rank = int(country.find('rank').text)
#... if rank > 50:
#... root.remove(country)
#>>> tree.write('output.xml')
if 1 == 1:
tree = ElementTree()
tree.parse( "1.xml" )
root = tree.getroot()
#
#for entry in root.findall('entry'):
for entry in root.iter( 'entry' ):
#
# Children are nested,
# and we can access specific child nodes by index:
#>>> root[0][1].text
field = entry[0]
entry.set( 'field', field.text )
entry.remove( field )
#
tree.write( '2.xml' )
#
if 1 == 0:
root = None
entry = None
parser = (
ET.iterparse( source = "1.xml", events = [ 'start', 'end' ] )
)
#
for ( event, elem ) in parser:
#
#print( elem.tag, 'text=', elem.text )
if elem.tag == "entry":
#
pass
#entry = ET.SubElement( root, 'entry' )
elif elem.tag == "field" and event == 'end':
# xml.etree.ElementTree.SubElement(parent, tag, attrib={}, **extra)
pass
#entry.set( 'field', elem.text )
entry = ET.SubElement( root, 'entry', attrib = { 'field': elem.text } )
else:
if root is None:
# 'entries'
root = ET.Element( elem.tag )
else:
pass
#
tree = ET.ElementTree( element = root )
tree.write( '2.xml' )
# timing with both `iterparse`
# (1'000) INSERTs INTO test 70'569 function calls (67569 primitive calls) in 0.129 seconds
# (10'000) INSERTs INTO test 700'691 function calls (670691 primitive calls) in 0.838 seconds
# (100'000) INSERTs INTO test 7'001'955 function calls (6701955 primitive calls) in 8.330 seconds
# (1'000'000) INSERTs INTO test 70'014'547 function calls (67014547 primitive calls) in 85.291 seconds
#
if 1 == 1:
#tree = ElementTree()
tree.parse( "2.xml" )
root = tree.getroot()
#
#for entry in root.findall('entry'):
for entry in root.iter( 'entry' ):
#
if is_DeBug_Mode:
pass
#print( "field_Attribute_Sum was:", field_Attribute_Sum )
field_Attribute_Sum += int(
entry.attrib["field"]
)
if is_DeBug_Mode:
print( "field_Attribute_Sum updated to:", field_Attribute_Sum )
#
# timing without `iterparse`
# (100) INSERTs INTO test 5122 function calls (4822 primitive calls) in 0.116 seconds
# (10'000) INSERTs INTO test 4'60'522 function calls (430522 primitive calls) in 0.619 seconds
# (100'000) INSERTs INTO test 4'600'524 function calls (4300524 primitive calls) in 5.577 seconds
# (1'000'000) INSERTs INTO test 46'000'524 function calls (43000524 primitive calls) in 55.355 seconds
#pr.disable()
#
if 1 == 0:
# xml.etree.ElementTree.iterparse(source, events=None, parser=None)
# Parses an XML section into an element tree incrementally,
# and reports what’s going on
# to the user.
# source is a filename or file object
# containing XML data.
# events is a sequence of events to report back.
parser = (
#ET.XMLPullParser(['start', 'end'])
# AttributeError: type object 'ElementTree' has no attribute 'iterparse'
#ElementTree
ET.iterparse(
source = "2.xml",
events = [
'start', #'end'
]
)
)
#
# AttributeError: '_IterParseIterator' object has no attribute 'read_events'
for ( event, elem ) in parser:#.read_events():
#
#print( event )
#print( elem.tag, 'text=', elem.text )
if elem.tag == "entry":
#
field_Attribute_Sum += int(
elem.attrib["field"]
)
#
# timing with last `iterparse` only
# (100) INSERTs INTO test 6'142 function calls (5842 primitive calls) in 0.048 seconds
# (1'000) INSERTs INTO test 56'546 function calls (53546 primitive calls) in 0.152 seconds
# (10'000) INSERTs INTO test 560'594 function calls (530594 primitive calls) in 0.699 seconds
# (100'000) INSERTs INTO test 5'601'096 function calls (5301096 primitive calls) in 6.670 seconds
if is_DeBug_Mode:
pr.disable()
if is_DeBug_Mode:
#s = io.StringIO()
sortby = 'cumulative'
ps = pstats.Stats(
pr,
stream = (
#s
#sys.stdout
sys.stderr
)
).sort_stats( sortby )
ps.print_stats()
#
if is_DeBug_Mode:
assert expected_Sum == field_Attribute_Sum, (
"expected_Sum:{} != {}:field_Attribute_Sum".format( expected_Sum, field_Attribute_Sum )
)
print( "field_Attribute_Sum:", field_Attribute_Sum )
#
#
return field_Attribute_Sum#cursor
#
###*******************###
###*/// unit test ///*###
###*******************###
if __name__ == "__main__":
#
items_Total_n = int( input( "Type 'items_Total_n' as int:" ) )
# @toDo: request 'connection_Str' from stdIn
# The connection parameters can be specified as
# a `libpq connection string`
# using the dsn parameter:
# conn = psycopg2.connect("dbname=test user=postgres password=secret")
# or using a set of keyword arguments:
# conn = psycopg2.connect(dbname="test", user="postgres", password="secret")
# The basic connection parameters are:
# dbname – the database name (database is a deprecated alias)
# user – user name used to authenticate
# password – password used to authenticate
# host – database host address (defaults to UNIX socket if not provided)
# port – connection port number (defaults to 5432 if not provided)
connection_Str = input(
(
"Type Keyword/Value DB connection parameters as \n" +
"host='localhost' dbname='my_database' user='postgres' password='secret'\n" +
"(for some cases user='user_name' will be enough)\n" +
"or just hit `Enter` to use defaults (for current system user):"
)
)
#
main( items_Total_n, connection_Str )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment