Skip to content

Instantly share code, notes, and snippets.

@darkjh
Created August 31, 2014 22:32
Show Gist options
  • Save darkjh/5662c7583f444849a653 to your computer and use it in GitHub Desktop.
Save darkjh/5662c7583f444849a653 to your computer and use it in GitHub Desktop.
import dpark
import logging
import os
import re
logging.basicConfig(level=logging.DEBUG)
def list_files(dirpath, full_path=True, regexp=None):
"""List files in a local directory
:param dirpath: path to the directory
:type dirpath: str
:param full_path: should the listed files contains the full path
:type full_path: bool
:param regexp: regexp for filtering the listed files
:type regexp: list | tuple | str
:return: file_list
:rtype: list
"""
files = []
for f in os.listdir(dirpath):
basename = os.path.basename(f)
if (not regexp
or (isinstance(regexp, str) and re.match(regexp, basename))
or (isinstance(regexp, (list, tuple))
and any(re.match(r, basename) for r in regexp))):
if full_path:
files.append(os.path.join(dirpath, f))
else:
files.append(f)
return files
# Functions to be mapped
# cast function
# can't use like this, serialization error
# cast = Caster(ContentsStreamDef.HEADERS).cast_line
def cast_content(splits):
return [
int(splits[0]),
int(splits[1]),
int(splits[2]),
str(splits[3])
]
def print_partition_size(par):
size = 0
for _ in par:
size += 1
return [size]
# Calculation
content_pattern = r'urlcontents\.txt\.[0-9]+\.gz'
content_paths = list_files('./par_test/gzip', regexp=content_pattern)
contents = dpark.textFile(content_paths, splitSize=2 << 20).map(lambda l: l.split('\t'))
contents = contents.map(cast_content).map(lambda l: (l[0], l))
contents = contents.repartition(numSplits=32)
contents.saveAsCSVFile('/tmp/result', dialect='excel-tab')
# par_size = contents.mapPartitions(print_partition_size)
#
# sizes = par_size.collect()
# print sizes
# print sum(sizes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment