Skip to content

Instantly share code, notes, and snippets.

@ngrilly
Created February 1, 2011 10:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ngrilly/805669 to your computer and use it in GitHub Desktop.
Save ngrilly/805669 to your computer and use it in GitHub Desktop.
It looks like copy_from does not stop reading after an error. When the input file is short, it is okay. But when the input file is very long, it is really boring to wait for the entire file to be read just to discover there is an error on the 10th row.
"""
It looks like copy_from does not stop reading after an error. When the input file is short,
it is okay. But when the input file is very long, it is really boring to wait for the entire file
to be read just to discover there is an error on the 10th row.
Given the same input file, it looks like psql \copy command behaves correctly and stops
just after the incorrect row, without reading the entire file. I have checked that just by looking
at the command execution time that seems proportional to the number of processed rows...
Here is a script to reproduce this bug (just create a database "test" and run the script).
Update:
I have ran the same script with pg8000, and it does not stop reading after an error either... Maybe it is not a bug, and just a limitation of the PostgreSQL protocol? Maybe the copy from protocol is not designed to return errors in the middle of a data stream, and I just have to split my data stream into many chunks and call copy_from for each chunk?
Related ticket here:
http://psycopg.lighthouseapp.com/projects/62710-psycopg/tickets/37-copy_from-does-not-stop-reading-after-an-error
"""
import traceback
import psycopg2
def main():
con = psycopg2.connect(database='test', user='postgres')
cur = con.cursor()
try:
cur.execute("create temp table test (id integer primary key, data text)")
rows = iter_rows()
copy_file = CopyFile(rows)
cur.copy_from(copy_file, 'test')
cur.execute("select count(*) from test")
print "{0} rows written to database".format(cur.fetchall()[0][0])
finally:
cur.close()
con.close()
def iter_rows():
random_data = 'x' * 100
for i in range(0, 500):
if i == 10:
i = 'Bad key'
print "Yield incorrect data row (copy_from should stop reading after that)"
else:
print "Yield correct data row"
yield '{0}\t{1}\n'.format(i, random_data)
class CopyFile(object):
def __init__(self, row_reader):
self.next_row = row_reader.next
self.buffer = ''
def read(self, limit=-1):
print "Read {0} bytes".format(limit)
try:
buffer = self.buffer
while limit < 0 or len(buffer) < limit:
try:
buffer += self.next_row()
except StopIteration:
break
if limit < 0:
self.buffer = ''
return buffer
else:
self.buffer = buffer[limit:]
return buffer[:limit]
except:
# Report unexpected errors because psycopg2 does not report them
traceback.print_exc()
raise
# Method readline is required by psycopg2 but actually never called
def readline(self, limit=-1):
raise NotImplementedError()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment