-
-
Save BertrandBordage/9892556 to your computer and use it in GitHub Desktop.
# coding: utf-8 | |
""" | |
Converts Paradox databases to Python objects or CSV. | |
You don't need any dependency (except Python) to make this module work. | |
This module is incomplete but reads most Paradox `.DB` files. | |
If this module is not fast or complete enough for you, consider using pxview. | |
""" | |
from __future__ import division | |
import csv | |
from datetime import date, datetime, timedelta | |
from struct import unpack | |
try: | |
force_text = unicode | |
except NameError: | |
force_text = str | |
__author__ = 'Bertrand Bordage' | |
__copyright__ = 'Copyright © 2014 Bertrand Bordage' | |
__license__ = 'MIT' | |
field_types = { | |
1: 'A', # string | |
2: 'D', # date | |
3: 'S', # short integer | |
4: 'I', # integer | |
5: '$', # money | |
6: 'N', # number | |
9: 'L', # logical (boolean) | |
12: 'M', # memo | |
20: 'T', # time | |
21: '@', # timestamp | |
22: '+', # autoincrement | |
} | |
def unpack_signed(fmt, s, complement): | |
v = unpack(fmt, s)[0] | |
if v == 0: | |
return | |
if v < 0: | |
return v + complement | |
return v - complement | |
I_COMPLEMENT = 1 << (4*8 - 1) | |
H_COMPLEMENT = 1 << (2*8 - 1) | |
B_COMPLEMENT = 1 << (1*8 - 1) | |
def unpack_i(s): | |
return unpack_signed('>i', s, I_COMPLEMENT) | |
def unpack_h(s): | |
return unpack_signed('>h', s, H_COMPLEMENT) | |
def unpack_b(s): | |
return bool(unpack_signed('>b', s, B_COMPLEMENT)) | |
def unpack_d(s): | |
return -unpack('>d', s)[0] | |
def to_date(s): | |
ordinal = unpack_i(s) | |
if ordinal is None: | |
return | |
try: | |
return date.fromordinal(ordinal) | |
except ValueError: | |
return | |
def to_time(s): | |
seconds = unpack_i(s) | |
if seconds is None: | |
return | |
seconds //= 1000 | |
return (datetime.min + timedelta(seconds=seconds)).time() | |
SECONDS_PER_DAY = 60*60*24 | |
def to_datetime(s): | |
seconds = int(unpack_d(s) // 1000) | |
ordinal, seconds = divmod(seconds, SECONDS_PER_DAY) | |
try: | |
d = datetime.fromordinal(ordinal) | |
return d + timedelta(seconds=seconds) | |
except ValueError: | |
return | |
def decode_data(t, v, input_encoding): | |
if t == 'A': | |
return v.strip('\x00').decode(input_encoding) | |
if t == 'S': | |
return unpack_h(v) | |
if t == 'L': | |
return unpack_b(v) | |
if t == 'I': | |
return unpack_i(v) | |
if t == 'D': | |
return to_date(v) | |
if t == 'T': | |
return to_time(v) | |
if t == '@': | |
return to_datetime(v) | |
if t in ('N', '$'): | |
return unpack_d(v) | |
if t == 'M': | |
return '' # TODO: handle memos | |
if t == '+': | |
return unpack_i(v) | |
return v | |
def read(filename, dict_output=False, input_encoding='iso-8859-1'): | |
""" | |
Converts a Paradox .DB file to Python objects. | |
""" | |
db_file = open(filename) | |
header = db_file.read(4) | |
header_size = unpack('>H', header[2:4])[0] | |
db_file.seek(0) | |
header = db_file.read(header_size*1024//4) | |
block_size = ord(header[5]) | |
n_fields = ord(header[33]) | |
parts = header.split(filename.split('/')[-1]) | |
headers, data = parts | |
defs = headers[120:] | |
columns = data.strip('\x00').split('\x00')[:n_fields] | |
fields = [] | |
for i, column in enumerate(columns): | |
t = field_types[ord(defs[i*2])] | |
length = ord(defs[i*2+1]) | |
fields.append((column, t, length)) | |
record_size = sum([length for _, _, length in fields]) | |
blank_record = '\x00' * record_size | |
# FIXME: I don't know why, but this is not working without dividing | |
# header size by 4. | |
data_offset = header_size*1024//4 | |
rows = [] | |
block_index = 0 | |
previous_record = None | |
while True: | |
offset = data_offset + block_index * block_size * 1024 | |
db_file.seek(offset) | |
block_header = db_file.read(6) | |
offset += 6 | |
next_block_offset = data_offset + (block_index+1) * block_size * 1024 | |
while offset + record_size < next_block_offset: | |
db_file.seek(offset) | |
record = db_file.read(record_size) | |
if len(record) < record_size or record == blank_record: | |
break | |
if record == previous_record: | |
offset += record_size | |
continue | |
previous_record = record | |
row = [] | |
for k, t, size in fields: | |
db_file.seek(offset) | |
v = db_file.read(size) | |
offset += size | |
v = decode_data(t, v, input_encoding) | |
row.append((k, v)) | |
rows.append(row) | |
is_last_block = unpack('>H', block_header[0:2])[0] == 0 | |
if is_last_block: | |
break | |
block_index += 1 | |
if dict_output: | |
rows = map(dict, rows) | |
return fields, rows | |
def to_csv(filename, output_filename=None): | |
""" | |
Converts a Paradox .DB to a CSV file. | |
""" | |
fields, data = read(filename) | |
fieldnames = [c for c, _, _ in fields] | |
if output_filename is None: | |
output_filename = filename + '.csv' | |
d = csv.DictWriter(open(output_filename, 'w'), fieldnames) | |
d.writeheader() | |
d.writerows([dict([(k, force_text(v).encode('utf-8')) for k, v in row]) | |
for row in data]) |
According to the specification, the field type for floats is N
. How I dealt with this data type is written line 69. I didn’t use unpack_signed
in this case because of some inconsistency with the floating point standard. If I remember well, the sign was the opposite of what it should be or something like this. I ended up doing a bad workaround, but since there was no negative float in the only Paradox DB I saw in my life, I let it like this.
So you have to find a solution by yourself, and it’s line 69 that everything happens.
The solution is explained here : http://stackoverflow.com/questions/27360892/python2-7-difficulty-to-convert-a-binary-signed-double-to-ascii/27416776#27416776. Thanks to you.
Thanks for this solution, unfortunately it didn't work for me. I ended up implementing complete Python bindings to the pxlib library: https://github.com/mherrmann/pypxlib
Install with
pip install pypxlib
Then you can for instance do:
from pypxlib import Table
table = Table('my-paradox.db')
try:
for row in table:
print(row)
finally:
table.close()
i think the line 131 should change into db_file = open(filename, 'rb'). read as binary file to solve the problem that can not read specific length. if not, the contents of some line will lost (break, at line 165)
Thanks for the gist. I had to extract data from paradox database this helped a lot. I applied the previous fix suggested by @zicowarn and that worked in my case.
Hey guys! Inspired by @mherrmann:
https://github.com/javifr/paradoxdbsplitter
@BertrandBordage, can you help me? My Language Driver is set to ANSII850, I try to change the line 127 of your code to
def read(filename, dict_output=False, input_encoding='ansii850'):
But it does not work. You know what can be wrong?
@danielpcamara It’s a very old script I wrote probably for Python 2, which opened in binary mode by default if I remember correctly. pass 'rb'
to open
and that should do it.
Thanks @BertrandBordage, this still didn't work for me, but I was able to find out another way to extract my DataBase in CSV using the app Paradox Data Editor.
how to write rows of table in pandas dataframe?
@BertrandBordage any idea why I get the following error
"TypeError: ord() expected string of length 1, but int found"
for line 136? I already added the "rb" as suggested above.
Thanks for this solution, unfortunately it didn't work for me. I ended up implementing complete Python bindings to the pxlib library: https://github.com/mherrmann/pypxlib
Install with
pip install pypxlib
Then you can for instance do:
from pypxlib import Table table = Table('my-paradox.db') try: for row in table: print(row) finally: table.close()
This is what worked for me
Hi,
Thanks for this useful code. But how about if the paradox value a signed double like -111,00 (whereas 111,00 is fine). I didn't manage to add a fix.
Thank you