Created
August 31, 2014 22:32
-
-
Save darkjh/5662c7583f444849a653 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dpark | |
import logging | |
import os | |
import re | |
logging.basicConfig(level=logging.DEBUG) | |
def list_files(dirpath, full_path=True, regexp=None): | |
"""List files in a local directory | |
:param dirpath: path to the directory | |
:type dirpath: str | |
:param full_path: should the listed files contains the full path | |
:type full_path: bool | |
:param regexp: regexp for filtering the listed files | |
:type regexp: list | tuple | str | |
:return: file_list | |
:rtype: list | |
""" | |
files = [] | |
for f in os.listdir(dirpath): | |
basename = os.path.basename(f) | |
if (not regexp | |
or (isinstance(regexp, str) and re.match(regexp, basename)) | |
or (isinstance(regexp, (list, tuple)) | |
and any(re.match(r, basename) for r in regexp))): | |
if full_path: | |
files.append(os.path.join(dirpath, f)) | |
else: | |
files.append(f) | |
return files | |
# Functions to be mapped | |
# cast function | |
# can't use like this, serialization error | |
# cast = Caster(ContentsStreamDef.HEADERS).cast_line | |
def cast_content(splits): | |
return [ | |
int(splits[0]), | |
int(splits[1]), | |
int(splits[2]), | |
str(splits[3]) | |
] | |
def print_partition_size(par): | |
size = 0 | |
for _ in par: | |
size += 1 | |
return [size] | |
# Calculation | |
content_pattern = r'urlcontents\.txt\.[0-9]+\.gz' | |
content_paths = list_files('./par_test/gzip', regexp=content_pattern) | |
contents = dpark.textFile(content_paths, splitSize=2 << 20).map(lambda l: l.split('\t')) | |
contents = contents.map(cast_content).map(lambda l: (l[0], l)) | |
contents = contents.repartition(numSplits=32) | |
contents.saveAsCSVFile('/tmp/result', dialect='excel-tab') | |
# par_size = contents.mapPartitions(print_partition_size) | |
# | |
# sizes = par_size.collect() | |
# print sizes | |
# print sum(sizes) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment