Skip to content

Instantly share code, notes, and snippets.

@zokis
Created February 9, 2017 18:09
Show Gist options
  • Save zokis/b957d7a4010f7ffd6b8cccdedb84125a to your computer and use it in GitHub Desktop.
Save zokis/b957d7a4010f7ffd6b8cccdedb84125a to your computer and use it in GitHub Desktop.
# coding: utf-8
import codecs
import csv
import cStringIO
import zipfile
from tempfile import NamedTemporaryFile
def force_text(s, encoding='utf-8', errors='strict'):
if issubclass(type(s), unicode):
return s.encode(encoding)
try:
if not issubclass(type(s), (str, unicode)):
if hasattr(s, '__unicode__'):
s = unicode(s)
else:
s = unicode(str(s), encoding, errors)
else:
s = s.decode(encoding, errors)
except:
return u''
return s.encode(encoding)
class UnicodeWriter(object):
def __init__(self, f):
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect='excel')
self.stream = f
self.encoder = codecs.getincrementalencoder("utf-8")()
def writerow(self, row):
self.writer.writerow(map(force_text, row))
self.stream.write(self.encoder.encode(self.queue.getvalue().decode("utf-8")))
self.queue.truncate(0)
class DictToCSVsZipFile(object):
def __init__(self, rowdicts, zipfilename=None, filename=None, _id='_uuid', fieldnames=None, subfieldnames=None):
self._id = _id
self.rowdicts = rowdicts
self.childrens = []
self.fieldnames = fieldnames
self.subfieldnames = subfieldnames
if filename:
self.filename = filename
else:
self.filename = 'principal'
if zipfilename:
self.zipfilename = zipfilename
else:
self.zipfilename = 'CSVs'
self.f = NamedTemporaryFile(delete=True, prefix=self.filename + '_', suffix='.csv')
self.writer = UnicodeWriter(self.f)
initial_fieldnames = bool(fieldnames)
initial_subfieldnames = bool(subfieldnames)
if not initial_fieldnames and not initial_subfieldnames:
self.subfieldnames = []
self.fieldnames = []
for d in self.rowdicts:
for key, value in d.items():
if key not in self.fieldnames:
if isinstance(value, list):
if len(value) > 0 and isinstance(value[0], dict) and key not in self.subfieldnames:
if not initial_subfieldnames:
self.subfieldnames.append(key)
if not initial_fieldnames:
self.fieldnames.append(key)
else:
if not initial_fieldnames:
self.fieldnames.append(key)
for subfieldname in self.subfieldnames:
while subfieldname in self.fieldnames:
self.fieldnames.remove(subfieldname)
def _dict_to_list(self, rowdict):
return (rowdict.get(key, "") for key in self.fieldnames)
def create_csv(self):
self.writer.writerow(self.fieldnames)
for rowdict in self.rowdicts:
self.writer.writerow(self._dict_to_list(rowdict))
self.f.seek(0)
return self.f
def get_csvs(self):
for rowdict in self.rowdicts:
for subfieldname in self.subfieldnames:
_id = (subfieldname + '_' + rowdict.get(self._id, self._id)).replace('/', '_')
try:
self.childrens.append(DictToCSVsZipFile(
rowdict[subfieldname],
_id=_id,
filename=_id
))
except KeyError:
continue
csvs = [self.create_csv()]
for child in self.childrens:
csvs += child.get_csvs()
return csvs
def get_zip_file(self):
tmp = NamedTemporaryFile(delete=True, prefix=self.zipfilename + '_', suffix='.zip')
with zipfile.ZipFile(tmp, 'w') as archive:
for csv_file in self.get_csvs():
archive.write(csv_file.name, csv_file.name.split('/')[-1])
csv_file.close()
tmp.seek(0)
return tmp
if __name__ == '__main__':
L = [{
u'_attachments': [],
u'_bamboo_dataset_id': u'',
u'_duration': 48.0,
u'_geolocation': [-23.157054, -45.790946999999996],
u'_id': 2,
u'_notes': [],
u'_status': u'submitted_via_web',
u'_submission_time': u'2016-09-29T18:18:07',
u'_submitted_by': u'zokis',
u'_tags': [],
u'_uuid': u'787db099-f985-4f9d-a467-272445ca11be',
u'_version': u'22',
u'_xform_id_string': u'form_test_coisas',
u'coisas': [{u'coisas/coisas_coisadas': u'1 2',
u'coisas/coisas_nome': u'Coisa 1',
u'coisas/coisas_qtd': 5},
{u'coisas/coisas_coisadas': u'3 5',
u'coisas/coisas_nome': u'Coisa 2',
u'coisas/coisas_qtd': 3}],
u'xptos': [{u'xptos/xptos_coisadas': u'1 2 3',
u'xptos/xptos_nome': u'XPTO 1',
u'xptos/xptos_qtd': 10},
{u'xptos/xptos_coisadas': u'3 8 5',
u'_uuid': '45',
u'xptos/xptos_nome': u'XPTO 2',
u'xptos/xptos_qtd': 15}],
u'data': u'2016-09-29',
u'deviceid': u'866393024090146',
u'endtime': u'2016-09-29T15:18:02.566-03',
u'estado': u'52',
u'formhub/uuid': u'8e9ac6cb4e1a422bbfa992e41b0cdbe0',
u'genero': u'1',
u'localizacao/ponto': u'-23.157054 -45.790946999999996 0.0 0.0',
u'meta/instanceID': u'uuid:787db099-f985-4f9d-a467-272445ca11be',
u's_or_n': u'nao',
u'simid': u'89550312000017399974',
u'starttime': u'2016-09-29T15:17:14.113-03',
u'subscriberid': u'724031201739997',
u'texto_simples': u'私はこの分野で書くことかわかりません'},
{
u'_attachments': [],
u'_bamboo_dataset_id': u'',
u'_duration': 48.0,
u'_geolocation': [-28.157054, -41.234234],
u'_id': 3,
u'_notes': [],
u'_status': u'submitted_via_web',
u'_submission_time': u'2016-09-30T18:18:07',
u'_submitted_by': u'zokis',
u'_tags': [],
u'_uuid': u'787db099-7890-4f9d-1593-a2b445ca11be',
u'_version': u'22',
u'_xform_id_string': u'form_test_coisas',
u'coisas': [{u'coisas/coisas_coisadas': u'3 9',
u'coisas/coisas_nome': u'Coisa 5',
u'coisas/coisas_qtd': 51},
{u'coisas/coisas_coisadas': u'8',
u'coisas/coisas_nome': u'Coisa 6',
u'coisas/coisas_qtd': 31}],
u'xptos': [{u'xptos/xptos_coisadas': u'11 12 13',
u'xptos/xptos_nome': u'XPTO 11',
u'xptos/xptos_qtd': 110},
{u'xptos/xptos_coisadas': u'13 18 15',
u'_uuid': '145',
u'xptos/xptos_nome': u'XPTO 12',
u'xptos/xptos_qtd': 115}],
u'data': u'2016-30-29',
u'deviceid': u'165798420114788',
u'endtime': u'2016-30-29T15:18:02.566-03',
u'estado': u'25',
u'formhub/uuid': u'8e9ac6cb4e1a422ccfa992e41b0cdbe0',
u'genero': u'2',
u'localizacao/ponto': u'-23.157054 -45.790946999999996 0.0 0.0',
u'meta/instanceID': u'uuid:787db099-7890-4f9d-1593-a2b445ca11be',
u's_or_n': u'sim',
u'simid': u'98788564700213054672',
u'starttime': u'2016-09-30T15:17:14.113-03',
u'subscriberid': u'2258769831400154',
u'texto_simples': u'私はイタリア語で話していますか?'}]
asd = DictToCSVsZipFile(L)
zip_file = asd.get_zip_file()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment