Skip to content

Instantly share code, notes, and snippets.

@Iftimie
Last active January 23, 2022 19:00
Show Gist options
  • Save Iftimie/a2d4130ada987458846509adfec546de to your computer and use it in GitHub Desktop.
Save Iftimie/a2d4130ada987458846509adfec546de to your computer and use it in GitHub Desktop.
from concurrent.futures import process
import os
from typing import Iterable, Iterator
import time
from dask.distributed import Client
def main():
folder_path = "data"
filepaths = get_filepaths(folder_path)
lines = get_lines(filepaths)
words = get_words(lines)
print(list(words))
def get_filepaths(dir: str) -> Iterator[str]:
for root, dirs, files in os.walk(dir):
for file in files:
filepath = os.path.join(root, file)
yield filepath
def get_lines(filepaths: Iterable[str]) -> Iterator[str]:
client = Client()
futures = client.map(process_file, list(filepaths))
list_of_lists_of_lines = client.gather(futures)
for lines in list_of_lists_of_lines:
yield from lines
def get_words(lines: Iterable[str]) -> Iterator[str]:
yield from map(lambda line: line.split(), lines)
def process_file(filepath):
time.sleep(1)
with open(filepath, 'r') as f:
return f.readlines()
if __name__ == "__main__":
main()
import os
all_words = []
# Get all files
folder_path = "data"
for root, dirs, files in os.walk(folder_path):
for file in files:
filepath = os.path.join(root, file)
# Read all lines from the file
with open(filepath, 'r') as f:
lines = f.readlines()
for line in lines:
new_words = line.split()
all_words.extend(new_words)
print(all_words)
def main():
folder_path = "data"
filepaths = get_filepaths(folder_path)
lines = get_lines(filepaths)
words = get_words(lines)
for item in words:
print(item)
def get_filepaths(dir: str) -> Iterator[str]:
for root, dirs, files in os.walk(dir):
for file in files:
filepath = create_filepath(root, file)
if not filepath:
continue
yield filepath
@monitor_error()
def create_filepath(root, file):
return os.path.join(root, file)
def get_lines(filepaths: Iterable[str]) -> Iterator[str]:
for filepath in filepaths:
lines = process_file(filepath)
yield from lines
@monitor_error(return_value=[])
def process_file(filepath):
with open(filepath, 'r') as f:
lines = f.readlines()
return lines
def get_words(lines: Iterable[str]) -> Iterator[str]:
for line in lines:
word_list = process_line(line)
yield from word_list
@monitor_error(return_value=[])
def process_line(line):
return line.split()
def monitor_error(return_value=None):
def inner_decorator(func):
@wraps(func)
def wrap(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
logging.error(f"Error during {func.__name__}")
logging.error(traceback.format_exc())
return return_value
return wrap
return inner_decorator
def main():
folder_path = "data"
filepaths = get_filepaths(folder_path)
lines = get_lines(filepaths)
words = get_words(lines)
for item in words:
print(item)
@monitor_generator
def get_filepaths(dir: str) -> Iterator[str]:
for root, dirs, files in os.walk(dir):
for file in files:
filepath = create_filepath(root, file)
yield filepath
@monitor_error()
def create_filepath(root, file):
return os.path.join(root, file)
@monitor_generator
def get_lines(filepaths: Iterable[str]) -> Iterator[str]:
for filepath in filepaths:
lines = process_file(filepath)
yield from lines
@monitor_error(return_value=[])
def process_file(filepath):
with open(filepath, 'r') as f:
lines = f.readlines()
return lines
@monitor_generator
def get_words(lines: Iterable[str]) -> Iterator[str]:
for line in lines:
word_list = process_line(line)
yield from word_list
@monitor_error(return_value=[])
def process_line(line):
return line.split()
def monitor_error(return_value=None):
def inner_decorator(func):
@wraps(func)
def wrap(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
logging.error(f"Error during {func.__name__}")
logging.error(traceback.format_exc())
return return_value
return wrap
return inner_decorator
def monitor_generator(func):
@wraps(func)
def wrap(*args, **kwargs):
original_generator = func(*args, **kwargs)
while True:
try:
item = next(original_generator)
if not item:
logging.warning(f"Item from {original_generator} evaluated to False")
continue
# do some monitoring/counting/metric publishing/logging/statistics
yield item
except StopIteration:
break
except Exception:
logging.error(f"Error during processing of {func.__name__}")
logging.error(traceback.format_exc())
return wrap
import os
from typing import Iterable, Iterator
def main():
folder_path = "data"
filepaths_generator = get_filepaths(folder_path)
lines_generator = get_lines(filepaths_generator)
words_generator = get_words(lines_generator)
all_words = []
for word in words_generator:
all_words.append(word)
def get_filepaths(dir: str) -> Iterator[str]:
for root, dirs, files in os.walk(dir):
for file in files:
filepath = os.path.join(root, file)
yield filepath
def get_lines(filepaths: Iterable[str]) -> Iterator[str]:
for filepath in filepaths:
with open(filepath, 'r') as f:
for line in f:
yield line
def get_words(lines: Iterable[str]) -> Iterator[str]:
for line in lines:
yield from line.split()
if __name__=="__main__":
main()
from concurrent.futures import ProcessPoolExecutor
import os
from typing import Iterable, Iterator
import time
def main():
folder_path = "data"
filepaths = get_filepaths(folder_path)
lines = get_lines(filepaths)
words = get_words(lines)
print(list(words))
def get_filepaths(dir: str) -> Iterator[str]:
for root, dirs, files in os.walk(dir):
for file in files:
filepath = os.path.join(root, file)
yield filepath
def get_lines(filepaths: Iterable[str]) -> Iterator[str]:
with ProcessPoolExecutor(max_workers=3) as executor:
list_of_lists_of_lines = executor.map(process_file, filepaths)
for lines in list_of_lists_of_lines:
yield from lines
def get_words(lines: Iterable[str]) -> Iterator[str]:
yield from map(lambda line: line.split(), lines)
def process_file(filepath):
time.sleep(1)
with open(filepath, 'r') as f:
return f.readlines()
if __name__ == "__main__":
main()
import os
from typing import List
def main():
folder_path = "data"
filepaths = get_filepaths(folder_path) # ["/home/file1.txt", "/home/file2.txt", …]
lines = get_lines(filepaths) # ["Lorem ipsum dolor sit amet", "Consectetur adipiscing elit", …]
words = get_words(lines) # ["Lorem", "ipsum", "dolor sit", …]
print(words)
def get_filepaths(dir: str) -> List[str]:
all_filepaths = []
for root, dirs, files in os.walk(dir):
for file in files:
filepath = os.path.join(root, file)
all_filepaths.append(filepath)
return all_filepaths
def get_lines(filepaths: List[str]) -> List[str]:
all_lines = []
for filepath in filepaths:
with open(filepath, 'r') as f:
lines = f.readlines()
all_lines.extend(lines)
return all_lines
def get_words(lines: List[str]) -> List[str]:
all_words = []
for line in lines:
all_words.extend(line.split())
return all_words
if __name__ == "__main__":
main()
import os
from typing import Iterable, Iterator
from itertools import tee, chain
def main():
folder_path = "data"
filepaths = get_filepaths(folder_path)
filepaths_1, filepaths_2 = tee(filepaths, 2)
normal_lines = get_lines(filepaths_1)
reversed_lines = get_lines(filepaths_2)
lines = chain(normal_lines, reversed_lines)
words = get_words(lines)
print(list(words))
def get_filepaths(dir: str) -> Iterator[str]:
for root, dirs, files in os.walk(dir):
for file in files:
filepath = os.path.join(root, file)
yield filepath
def get_lines(filepaths: Iterable[str]) -> Iterator[str]:
for filepath in filepaths:
with open(filepath, 'r') as f:
lines = f.readlines()
yield from lines
def get_reversed_lines(filepaths: Iterable[str]):
for filepath in filepaths:
with open(filepath, 'r') as f:
lines = f.readlines()
yield from map(lambda line: line[::-1], lines)
def get_words(lines: Iterable[str]) -> Iterator[str]:
for line in lines:
yield from line.split()
if __name__ == "__main__":
main()
import os
import traceback
from typing import Iterable, Iterator
import logging
def main():
folder_path = "data"
filepaths = get_filepaths(folder_path)
lines = get_lines(filepaths)
words = get_words(lines)
for item in words:
print(item)
def get_filepaths(dir: str) -> Iterator[str]:
for root, dirs, files in os.walk(dir):
for file in files:
filepath = None
try:
filepath = os.path.join(root, file)
except:
logging.error("Exception during get_filepaths")
logging.error(traceback.format_exc())
yield filepath
def get_lines(filepaths: Iterable[str]) -> Iterator[str]:
for filepath in filepaths:
lines = []
try:
with open(filepath, 'r') as f:
lines = f.readlines()
except:
logging.error("Exception during get_lines")
logging.error(traceback.format_exc())
yield from lines
def get_words(lines: Iterable[str]) -> Iterator[str]:
for line in lines:
words = []
try:
words = line.split()
except:
logging.error("Exception during get_words")
logging.error(traceback.format_exc())
yield from words
if __name__=="__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment