Last active
March 19, 2024 17:02
-
-
Save dineshdharme/f315f74560441a50484409b1530e3b54 to your computer and use it in GitHub Desktop.
Preprocessing example of a file using Dask.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
https://stackoverflow.com/questions/78162865/handling-column-breaks-in-pipe-delimited-file/78182964#78182964 | |
You can use `dask` to achieve this task of preprocessing. The following code will process the 50GB file in blocks of 500MB and write out the output in 5 partitions. Everything is a delayed / lazy operation just like in spark. Let me know how it goes. You may have to remove the header line from the data and then provide the header in your spark dataframe. | |
Install dask as | |
`pip install dask[complete]` | |
import dask.bag as db | |
from dask.distributed import Client | |
from dask import delayed | |
def preprocess_line(line): | |
processed_line = '|'.join([f'"{field}"' for field in line.split('|')]) | |
return processed_line | |
if __name__ == '__main__': | |
input_file = "../data/pipedelimited.csv" | |
client = Client(n_workers=8, threads_per_worker=4) | |
b = db.read_text(input_file, blocksize="500MB") # blocksize=None for streaming, read more on the options here | |
line_count = b.count() | |
line_count_computed = line_count.compute() | |
print(f"count of lines in whole file = {line_count_computed}") | |
delayed_partitions = b.to_delayed() | |
first_flag = True | |
first_line = None | |
second_line = None | |
processed_lines = [] | |
for delayed_partition in delayed_partitions: | |
partition = delayed_partition.compute() | |
lines = iter(partition) | |
print(f"first line = {lines}") | |
try: | |
while True: | |
if first_flag: | |
first_line = next(lines) | |
first_flag = False | |
continue | |
else: | |
second_line = next(lines) | |
final_line = first_line + second_line.strip() | |
processedline = preprocess_line(final_line) | |
processed_lines.append(processedline) | |
#print(processedline) | |
first_flag = True | |
except StopIteration: | |
print("Reached the end of the list.") | |
processed_bag = db.from_sequence(processed_lines, npartitions=5) | |
output_path = "../dask_output/processed_corrected.csv" | |
processed_bag.to_textfiles(output_path) | |
Output is as follows : | |
count of lines in whole file = 2592 | |
first line = <list_iterator object at 0x724570f54ac0> | |
Reached the end of the list. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment