Created
September 18, 2023 20:35
-
-
Save jreadey/30749af72f70a5b914b33dedba878e6b to your computer and use it in GitHub Desktop.
process_pool_reader.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import h5pyd | |
import time | |
from concurrent.futures import ProcessPoolExecutor | |
test_file = "/home/test_user1/snp500_link.h5" | |
h5path = "/dset" | |
max_workers = 20 | |
search_key = b'AAPL' | |
f = h5pyd.File(test_file) | |
dset = f[h5path] | |
extent = dset.shape[0] | |
print(extent) | |
print(dset[0]) | |
print(dset[0]["symbol"]) | |
print(dset.dtype) | |
block_size = 10_000 | |
def get_data(start_row): | |
t0 = time.time() | |
count = 0 | |
end_row = start_row + block_size | |
if end_row > extent: | |
end_row = extent | |
#print(f"{start_row} - {end_row}") | |
arr = dset[start_row:end_row] | |
for i in range(arr.shape[0]): | |
row = arr[i] | |
if row["symbol"] == search_key: | |
count += 1 | |
t_elap = time.time() - t0 | |
return (t_elap, count) | |
if __name__ == "__main__": | |
futures = [] | |
t0 = time.time() | |
print("main") | |
nrow = 0 | |
total_hits = 0 | |
with ProcessPoolExecutor(max_workers=max_workers) as exe: | |
while nrow < extent: | |
future = exe.submit(get_data, nrow) | |
futures.append(future) | |
nrow += block_size | |
for i, future in enumerate(futures): | |
result = future.result() | |
t_elap = result[0] | |
count = result[1] | |
#print(f"Future #{i} got {count} hits and took {t_elap:.2f} seconds") | |
total_hits += count | |
t_elap = time.time() - t0 | |
print(f"Total time: {t_elap:.2f} seconds") | |
print(f"number of hits: {total_hits}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment