Skip to content

Instantly share code, notes, and snippets.

@dineshdharme
Last active March 19, 2024 17:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dineshdharme/f315f74560441a50484409b1530e3b54 to your computer and use it in GitHub Desktop.
Save dineshdharme/f315f74560441a50484409b1530e3b54 to your computer and use it in GitHub Desktop.
Preprocessing example of a file using Dask.
https://stackoverflow.com/questions/78162865/handling-column-breaks-in-pipe-delimited-file/78182964#78182964
You can use `dask` to achieve this task of preprocessing. The following code will process the 50GB file in blocks of 500MB and write out the output in 5 partitions. Everything is a delayed / lazy operation just like in spark. Let me know how it goes. You may have to remove the header line from the data and then provide the header in your spark dataframe.
Install dask as
`pip install dask[complete]`
import dask.bag as db
from dask.distributed import Client
from dask import delayed
def preprocess_line(line):
processed_line = '|'.join([f'"{field}"' for field in line.split('|')])
return processed_line
if __name__ == '__main__':
input_file = "../data/pipedelimited.csv"
client = Client(n_workers=8, threads_per_worker=4)
b = db.read_text(input_file, blocksize="500MB") # blocksize=None for streaming, read more on the options here
line_count = b.count()
line_count_computed = line_count.compute()
print(f"count of lines in whole file = {line_count_computed}")
delayed_partitions = b.to_delayed()
first_flag = True
first_line = None
second_line = None
processed_lines = []
for delayed_partition in delayed_partitions:
partition = delayed_partition.compute()
lines = iter(partition)
print(f"first line = {lines}")
try:
while True:
if first_flag:
first_line = next(lines)
first_flag = False
continue
else:
second_line = next(lines)
final_line = first_line + second_line.strip()
processedline = preprocess_line(final_line)
processed_lines.append(processedline)
#print(processedline)
first_flag = True
except StopIteration:
print("Reached the end of the list.")
processed_bag = db.from_sequence(processed_lines, npartitions=5)
output_path = "../dask_output/processed_corrected.csv"
processed_bag.to_textfiles(output_path)
Output is as follows :
count of lines in whole file = 2592
first line = <list_iterator object at 0x724570f54ac0>
Reached the end of the list.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment