Skip to content

Instantly share code, notes, and snippets.

@cj2001
Created May 9, 2019 22:38
Show Gist options
  • Save cj2001/1224ff882ab352cdf2a667d7466d22ec to your computer and use it in GitHub Desktop.
Save cj2001/1224ff882ab352cdf2a667d7466d22ec to your computer and use it in GitHub Desktop.
Dask for Michal
import json
import argparse
import yaml
from typing import Any, Dict
import networkx as nx
import pandas as pd
from tqdm import tqdm
import pygraphviz
import graphviz
from octopy import Presto
from pprint import pprint
from dask.distributed import Client, progress
import dask
import dask.dataframe as dd
import dask.bag as dbag
db = Presto()
client = Client()
# client = Client("analytics-kubelet-055005d.private-us-east-1.github.net:32395")
client
query = '''
SELECT source, target
FROM {}
'''
res_bag = dbag.from_sequence(Presto().execute(query.format('hive.cj2001.cj_star_list_v010')))
df = res_bag.to_dataframe(meta=None, columns=['source','target'])
df['full source'] = df.source.map(lambda row: 'User ' + str(row))
df['full repo'] = df.target.map(lambda row: 'Repo ' + str(row))
df2 = df.drop(['source', 'target'], axis=1)
df3 = df2.rename(columns={'full source': 'source', 'full repo': 'target'})
df4 = df3.drop_duplicates()
pd_df = df4.compute(scheduler='processes')
G = nx.from_pandas_edgelist(pd_df, source='source', target='target')
print('Number of edges: {}'.format(nx.number_of_edges(G)))
print('Number of nodes: {}'.format(nx.number_of_nodes(G)))
nodes = list(G.nodes())
user_ls = [node for node in nodes if node[0:4] == 'User']
test_user_ls = user_ls[0:1000]
test_user_df = pd.DataFrame(test_user_ls, columns=['user'])
test_user_dd = dd.from_pandas(test_user_df, npartitions=64)
def pad_partition(numerized_doc, max_len):
if type(numerized_doc) != list:
return
return pad_sequences([numerized_doc], maxlen=max_len, truncating='post')[0]
def ego_graph(row, graph, radius):
print(row.user)
if row.user == 'foo':
return
return nx.ego_graph(G, row, radius).edges()
michal_dd = test_user_dd.apply(ego_graph, graph=G, radius=2, axis=1)
radius=2
#test_user_dd['ego graph'] = test_user_dd.user.apply(lambda row: list(nx.ego_graph(G, row, radius).edges()))
#test_user_dd['tester'] = test_user_dd.user.apply()
michal_dd.compute(schedule='processes')
#test_user_dd.head(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment