Last active
January 23, 2022 19:00
-
-
Save Iftimie/a2d4130ada987458846509adfec546de to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import process | |
import os | |
from typing import Iterable, Iterator | |
import time | |
from dask.distributed import Client | |
def main(): | |
folder_path = "data" | |
filepaths = get_filepaths(folder_path) | |
lines = get_lines(filepaths) | |
words = get_words(lines) | |
print(list(words)) | |
def get_filepaths(dir: str) -> Iterator[str]: | |
for root, dirs, files in os.walk(dir): | |
for file in files: | |
filepath = os.path.join(root, file) | |
yield filepath | |
def get_lines(filepaths: Iterable[str]) -> Iterator[str]: | |
client = Client() | |
futures = client.map(process_file, list(filepaths)) | |
list_of_lists_of_lines = client.gather(futures) | |
for lines in list_of_lists_of_lines: | |
yield from lines | |
def get_words(lines: Iterable[str]) -> Iterator[str]: | |
yield from map(lambda line: line.split(), lines) | |
def process_file(filepath): | |
time.sleep(1) | |
with open(filepath, 'r') as f: | |
return f.readlines() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
all_words = [] | |
# Get all files | |
folder_path = "data" | |
for root, dirs, files in os.walk(folder_path): | |
for file in files: | |
filepath = os.path.join(root, file) | |
# Read all lines from the file | |
with open(filepath, 'r') as f: | |
lines = f.readlines() | |
for line in lines: | |
new_words = line.split() | |
all_words.extend(new_words) | |
print(all_words) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def main(): | |
folder_path = "data" | |
filepaths = get_filepaths(folder_path) | |
lines = get_lines(filepaths) | |
words = get_words(lines) | |
for item in words: | |
print(item) | |
def get_filepaths(dir: str) -> Iterator[str]: | |
for root, dirs, files in os.walk(dir): | |
for file in files: | |
filepath = create_filepath(root, file) | |
if not filepath: | |
continue | |
yield filepath | |
@monitor_error() | |
def create_filepath(root, file): | |
return os.path.join(root, file) | |
def get_lines(filepaths: Iterable[str]) -> Iterator[str]: | |
for filepath in filepaths: | |
lines = process_file(filepath) | |
yield from lines | |
@monitor_error(return_value=[]) | |
def process_file(filepath): | |
with open(filepath, 'r') as f: | |
lines = f.readlines() | |
return lines | |
def get_words(lines: Iterable[str]) -> Iterator[str]: | |
for line in lines: | |
word_list = process_line(line) | |
yield from word_list | |
@monitor_error(return_value=[]) | |
def process_line(line): | |
return line.split() | |
def monitor_error(return_value=None): | |
def inner_decorator(func): | |
@wraps(func) | |
def wrap(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception: | |
logging.error(f"Error during {func.__name__}") | |
logging.error(traceback.format_exc()) | |
return return_value | |
return wrap | |
return inner_decorator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def main(): | |
folder_path = "data" | |
filepaths = get_filepaths(folder_path) | |
lines = get_lines(filepaths) | |
words = get_words(lines) | |
for item in words: | |
print(item) | |
@monitor_generator | |
def get_filepaths(dir: str) -> Iterator[str]: | |
for root, dirs, files in os.walk(dir): | |
for file in files: | |
filepath = create_filepath(root, file) | |
yield filepath | |
@monitor_error() | |
def create_filepath(root, file): | |
return os.path.join(root, file) | |
@monitor_generator | |
def get_lines(filepaths: Iterable[str]) -> Iterator[str]: | |
for filepath in filepaths: | |
lines = process_file(filepath) | |
yield from lines | |
@monitor_error(return_value=[]) | |
def process_file(filepath): | |
with open(filepath, 'r') as f: | |
lines = f.readlines() | |
return lines | |
@monitor_generator | |
def get_words(lines: Iterable[str]) -> Iterator[str]: | |
for line in lines: | |
word_list = process_line(line) | |
yield from word_list | |
@monitor_error(return_value=[]) | |
def process_line(line): | |
return line.split() | |
def monitor_error(return_value=None): | |
def inner_decorator(func): | |
@wraps(func) | |
def wrap(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception: | |
logging.error(f"Error during {func.__name__}") | |
logging.error(traceback.format_exc()) | |
return return_value | |
return wrap | |
return inner_decorator | |
def monitor_generator(func): | |
@wraps(func) | |
def wrap(*args, **kwargs): | |
original_generator = func(*args, **kwargs) | |
while True: | |
try: | |
item = next(original_generator) | |
if not item: | |
logging.warning(f"Item from {original_generator} evaluated to False") | |
continue | |
# do some monitoring/counting/metric publishing/logging/statistics | |
yield item | |
except StopIteration: | |
break | |
except Exception: | |
logging.error(f"Error during processing of {func.__name__}") | |
logging.error(traceback.format_exc()) | |
return wrap |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from typing import Iterable, Iterator | |
def main(): | |
folder_path = "data" | |
filepaths_generator = get_filepaths(folder_path) | |
lines_generator = get_lines(filepaths_generator) | |
words_generator = get_words(lines_generator) | |
all_words = [] | |
for word in words_generator: | |
all_words.append(word) | |
def get_filepaths(dir: str) -> Iterator[str]: | |
for root, dirs, files in os.walk(dir): | |
for file in files: | |
filepath = os.path.join(root, file) | |
yield filepath | |
def get_lines(filepaths: Iterable[str]) -> Iterator[str]: | |
for filepath in filepaths: | |
with open(filepath, 'r') as f: | |
for line in f: | |
yield line | |
def get_words(lines: Iterable[str]) -> Iterator[str]: | |
for line in lines: | |
yield from line.split() | |
if __name__=="__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ProcessPoolExecutor | |
import os | |
from typing import Iterable, Iterator | |
import time | |
def main(): | |
folder_path = "data" | |
filepaths = get_filepaths(folder_path) | |
lines = get_lines(filepaths) | |
words = get_words(lines) | |
print(list(words)) | |
def get_filepaths(dir: str) -> Iterator[str]: | |
for root, dirs, files in os.walk(dir): | |
for file in files: | |
filepath = os.path.join(root, file) | |
yield filepath | |
def get_lines(filepaths: Iterable[str]) -> Iterator[str]: | |
with ProcessPoolExecutor(max_workers=3) as executor: | |
list_of_lists_of_lines = executor.map(process_file, filepaths) | |
for lines in list_of_lists_of_lines: | |
yield from lines | |
def get_words(lines: Iterable[str]) -> Iterator[str]: | |
yield from map(lambda line: line.split(), lines) | |
def process_file(filepath): | |
time.sleep(1) | |
with open(filepath, 'r') as f: | |
return f.readlines() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from typing import List | |
def main(): | |
folder_path = "data" | |
filepaths = get_filepaths(folder_path) # ["/home/file1.txt", "/home/file2.txt", …] | |
lines = get_lines(filepaths) # ["Lorem ipsum dolor sit amet", "Consectetur adipiscing elit", …] | |
words = get_words(lines) # ["Lorem", "ipsum", "dolor sit", …] | |
print(words) | |
def get_filepaths(dir: str) -> List[str]: | |
all_filepaths = [] | |
for root, dirs, files in os.walk(dir): | |
for file in files: | |
filepath = os.path.join(root, file) | |
all_filepaths.append(filepath) | |
return all_filepaths | |
def get_lines(filepaths: List[str]) -> List[str]: | |
all_lines = [] | |
for filepath in filepaths: | |
with open(filepath, 'r') as f: | |
lines = f.readlines() | |
all_lines.extend(lines) | |
return all_lines | |
def get_words(lines: List[str]) -> List[str]: | |
all_words = [] | |
for line in lines: | |
all_words.extend(line.split()) | |
return all_words | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from typing import Iterable, Iterator | |
from itertools import tee, chain | |
def main(): | |
folder_path = "data" | |
filepaths = get_filepaths(folder_path) | |
filepaths_1, filepaths_2 = tee(filepaths, 2) | |
normal_lines = get_lines(filepaths_1) | |
reversed_lines = get_lines(filepaths_2) | |
lines = chain(normal_lines, reversed_lines) | |
words = get_words(lines) | |
print(list(words)) | |
def get_filepaths(dir: str) -> Iterator[str]: | |
for root, dirs, files in os.walk(dir): | |
for file in files: | |
filepath = os.path.join(root, file) | |
yield filepath | |
def get_lines(filepaths: Iterable[str]) -> Iterator[str]: | |
for filepath in filepaths: | |
with open(filepath, 'r') as f: | |
lines = f.readlines() | |
yield from lines | |
def get_reversed_lines(filepaths: Iterable[str]): | |
for filepath in filepaths: | |
with open(filepath, 'r') as f: | |
lines = f.readlines() | |
yield from map(lambda line: line[::-1], lines) | |
def get_words(lines: Iterable[str]) -> Iterator[str]: | |
for line in lines: | |
yield from line.split() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import traceback | |
from typing import Iterable, Iterator | |
import logging | |
def main(): | |
folder_path = "data" | |
filepaths = get_filepaths(folder_path) | |
lines = get_lines(filepaths) | |
words = get_words(lines) | |
for item in words: | |
print(item) | |
def get_filepaths(dir: str) -> Iterator[str]: | |
for root, dirs, files in os.walk(dir): | |
for file in files: | |
filepath = None | |
try: | |
filepath = os.path.join(root, file) | |
except: | |
logging.error("Exception during get_filepaths") | |
logging.error(traceback.format_exc()) | |
yield filepath | |
def get_lines(filepaths: Iterable[str]) -> Iterator[str]: | |
for filepath in filepaths: | |
lines = [] | |
try: | |
with open(filepath, 'r') as f: | |
lines = f.readlines() | |
except: | |
logging.error("Exception during get_lines") | |
logging.error(traceback.format_exc()) | |
yield from lines | |
def get_words(lines: Iterable[str]) -> Iterator[str]: | |
for line in lines: | |
words = [] | |
try: | |
words = line.split() | |
except: | |
logging.error("Exception during get_words") | |
logging.error(traceback.format_exc()) | |
yield from words | |
if __name__=="__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment