Skip to content

Instantly share code, notes, and snippets.

@jreadey
Created September 18, 2023 20:35
Show Gist options
  • Save jreadey/30749af72f70a5b914b33dedba878e6b to your computer and use it in GitHub Desktop.
Save jreadey/30749af72f70a5b914b33dedba878e6b to your computer and use it in GitHub Desktop.
process_pool_reader.py
import h5pyd
import time
from concurrent.futures import ProcessPoolExecutor
test_file = "/home/test_user1/snp500_link.h5"
h5path = "/dset"
max_workers = 20
search_key = b'AAPL'
f = h5pyd.File(test_file)
dset = f[h5path]
extent = dset.shape[0]
print(extent)
print(dset[0])
print(dset[0]["symbol"])
print(dset.dtype)
block_size = 10_000
def get_data(start_row):
t0 = time.time()
count = 0
end_row = start_row + block_size
if end_row > extent:
end_row = extent
#print(f"{start_row} - {end_row}")
arr = dset[start_row:end_row]
for i in range(arr.shape[0]):
row = arr[i]
if row["symbol"] == search_key:
count += 1
t_elap = time.time() - t0
return (t_elap, count)
if __name__ == "__main__":
futures = []
t0 = time.time()
print("main")
nrow = 0
total_hits = 0
with ProcessPoolExecutor(max_workers=max_workers) as exe:
while nrow < extent:
future = exe.submit(get_data, nrow)
futures.append(future)
nrow += block_size
for i, future in enumerate(futures):
result = future.result()
t_elap = result[0]
count = result[1]
#print(f"Future #{i} got {count} hits and took {t_elap:.2f} seconds")
total_hits += count
t_elap = time.time() - t0
print(f"Total time: {t_elap:.2f} seconds")
print(f"number of hits: {total_hits}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment