Skip to content

Instantly share code, notes, and snippets.

@garanews
Created June 11, 2019 07:58
Show Gist options
  • Save garanews/d93a8c91dc06d75db93b952977e59756 to your computer and use it in GitHub Desktop.
Save garanews/d93a8c91dc06d75db93b952977e59756 to your computer and use it in GitHub Desktop.
takagi_dask2.py
import asyncio
import asyncpg
import cupy as cp
import numpy as np
from dask import dataframe as dd
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import time
async def read_async():
conn = await asyncpg.connect('postgresql://user:password@ip_db/malwares')
rows = await conn.fetch('SELECT data FROM malwares limit 1')
return [x['data'] for x in rows]
def ruzicka(matrix_a, vector_b):
mm = matrix_a.values
info = [x[0] for x in mm]
mat_a = cp.array([np.unpackbits(np.frombuffer(x[1], dtype=np.uint8)) for x in mm]) * cp.arange(1023, -1, -1, dtype=np.int32)
min_up = cp.minimum(mat_a, vector_b)
max_down = cp.maximum(mat_a, vector_b)
numerator = cp.sum(min_up, axis=1)
denominator = cp.sum(max_down, axis=1)
ruz = numerator / denominator
return ",".join(["%s>%2f" % (x, y) for (x,y) in zip(info, cp.asnumpy(ruz))])
if __name__ == "__main__":
f_start = time.time()
d_start = time.time()
cluster = LocalCUDACluster()
client = Client(cluster)
malpedia = dd.read_sql_table(
"malwares",
"postgresql+psycopg2://user:passwordV@ip_db/malwares",
npartitions=8,
index_col="id",
columns=[ "sha256","data"],
)
malpedia = client.persist(malpedia)
end = time.time()-f_start
f_start = time.time()
print("DB MAIN", end)
vector_new = asyncio.get_event_loop().run_until_complete(read_async())
vector_b_np = cp.array([np.unpackbits(np.frombuffer(x, dtype=np.uint8)) for x in vector_new]) * cp.arange(1023, -1, -1, dtype=np.int32)
end = time.time()-f_start
f_start = time.time()
print("DB SINGOLO", end)
res = malpedia.map_partitions(
lambda df: ruzicka(df, vector_b_np),
meta=('result', str)
).compute()
print(res)
end = time.time()-f_start
print("GPU computation time", end)
end = time.time()-d_start
print("total time", end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment