-
-
Save rictjo/7c34f4e2ce77c49e75526f31915fb62c to your computer and use it in GitHub Desktop.
command title for C
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
import umap | |
def identify_axes( ax_dict:dict, fontsize:int=48 , color:str="darkgrey") : | |
kw = dict(ha="center",va="center",fontsize=fontsize,color=color) | |
for k,ax in ax_dict.items() : | |
ax.text(.85,.5,k,transform=ax.transAxes,**kw) | |
def distance_calculation ( coordinates:np.array , | |
distance_type:str , bRemoveCurse:bool=False , | |
nRound:int = None ) -> np.array : | |
# FROM IMPETUOUS-GFA | |
crds = coordinates | |
if 'correlation' in distance_type : | |
from impetuous.quantification import spearmanrho,pearsonrho | |
if 'pearson' in distance_type : | |
corr = pearsonrho( crds , crds ) | |
else : | |
corr = spearmanrho( crds , crds ) | |
corr = 0.5 * ( corr + corr.T ) # THIS SHOULD BE REMOVED ONCE DISC. LOCATED | |
if 'absolute' in distance_type : | |
corr = np.abs( corr ) | |
if 'square' in distance_type : | |
corr = corr**2 | |
distm = 1 - corr | |
else : | |
from scipy.spatial.distance import pdist,squareform | |
distm = squareform( pdist( crds , metric = distance_type )) | |
if bRemoveCurse : # EXPERIMENTAL | |
from impetuous.reducer import remove_curse | |
distm = remove_curse ( distm , nRound = nRound ) | |
return ( distm ) | |
#from impetuous.quantification import distance_calculation | |
def cluster_appraisal( x:pd.Series , garbage_n = 0 , Sfunc = lambda x:np.mean(x,0)) : | |
""" | |
Clustering Optimisation Method for Highly Connected Data | |
Richard Tjörnhammar | |
https://arxiv.org/abs/2208.04720v2 | |
""" | |
# FROM IMPETUOUS-GFA | |
from collections import Counter | |
decomposition = [ tuple((item[1],item[0])) for item in Counter(x.values.tolist()).items() if item[1] | |
> garbage_n ] | |
N = len ( x ) | |
A , B = 1 , N | |
if len(decomposition) > 0 : | |
A = Sfunc(decomposition)[0] | |
B = len(decomposition) | |
else : | |
decomposition = [ tuple( (0,0) ) ] | |
decomposition .append( tuple( ( B ,-garbage_n )) ) | |
decomposition .append( tuple( ( A*B/(A+B) , None ) ) ) | |
return ( decomposition ) | |
#from impetuous.clustering import clustering_appraisal | |
def generate_clustering_labels ( distm:np.array , cmd:str='min' , labels:list[str]=None , | |
bExtreme:bool=False , n_clusters:int = None) -> tuple : | |
# FROM IMPETUOUS-GFA | |
from impetuous.clustering import sclinkages | |
clabels_n , clabels_o = None , None | |
res = sclinkages( distm , cmd )['F'] | |
index = list( res.keys() ) | |
if labels is None : | |
labels = range(len(distm)) | |
hierarch_df = pd.DataFrame ( res.values() , index=list(res.keys()) , columns = labels ) | |
cluster_df = hierarch_df .T .apply( lambda x: cluster_appraisal(x,garbage_n = 0) ) | |
clabels_o , clabels_n = None , None | |
screening = np.array( [ v[-1][0] for v in cluster_df.values ] ) | |
level_values = np.array( list(res.keys()) ) | |
if bExtreme : | |
imax = np.argmax( screening ) | |
clabels_o = hierarch_df.iloc[imax,:].values.tolist() | |
if not n_clusters is None : | |
jhit = np.argmin([ np.abs(len(cluster_df.iloc[i])-2-n_clusters)\ | |
for i in range(len(cluster_df)) ]) | |
clabels_n = hierarch_df.iloc[jhit,:].values.tolist() | |
print ( len(set(clabels_n)) , jhit ) | |
return ( clabels_n , clabels_o , hierarch_df , np.array( [level_values,screening] ) ) | |
def recast_distance_to_similarity_matrix ( distm:np.array, transform=lambda x:np.exp(1.0/x) ,bDirect:bool=True )->np.array : | |
sdm = transform( distm ) * ( 1 - np.eye(len(distm)) ) | |
if bDirect: | |
return ( sdm ) | |
for i in range(len(sdm)) : | |
sdm[i,i] = 0 | |
ldm = sdm + np.diag(np.sum(sdm,0)) | |
return( ldm ) | |
def generate_clustering_solutions ( distm:np.array , cmd:str='min' , labels:list[str]=None , | |
bExtreme:bool=False , n_clusters:int = None) -> tuple : | |
# FROM IMPETUOUS-GFA | |
bUseSVD = True | |
bUseHierarchy = False | |
from impetuous.clustering import sclinkages | |
clabels_n , clabels_o = None , None | |
res = sclinkages( distm , cmd )['F'] | |
index = list( res.keys() ) | |
if labels is None : | |
labels = range(len(distm)) | |
hierarch_df = pd.DataFrame ( res.values() , index=list(res.keys()) , columns = labels ) | |
cluster_df = hierarch_df .T .apply( lambda x: cluster_appraisal(x,garbage_n = 0) ) | |
similarity = np.eye(len(hierarch_df.columns)) | |
from impetuous.quantification import dirac_matrix | |
if bUseHierarchy: | |
L_ = len(hierarch_df.index) | |
for i in range(len(hierarch_df.index)) : | |
print ( i/L_*100 ) | |
similarity += dirac_matrix( hierarch_df.iloc[i].values.tolist() ) | |
print ( similarity ) | |
sdm = similarity * ( 1 - np.eye(len(hierarch_df.columns)) ) | |
else : | |
sdm = recast_distance_to_similarity_matrix( distm ) | |
for i in range(len(sdm)) : | |
sdm[i,i] = 0 | |
ldm = sdm + np.diag(np.sum(sdm,0)) | |
print ( ldm ) | |
if bUseSVD : | |
sol = np.linalg.svd(ldm) | |
vectors = sol[0][:2] | |
vals = sol[1][:2] | |
else : | |
sol = np.linalg.eig(ldm) | |
vectors = sol[1][:2] | |
vals = sol[0][:2] | |
print ( vectors,vals ) | |
import matplotlib.pyplot as plt | |
plt.subplot(121) | |
plt.plot( np.sqrt(vals[0])*vectors[0],np.sqrt(vals[1])*vectors[1],'k*' ) | |
plt.subplot(122) | |
plt.plot( vectors[0],vectors[1],'k*' ) | |
plt.show() | |
exit( 1 ) | |
clabels_o , clabels_n = None , None | |
screening = np.array( [ v[-1][0] for v in cluster_df.values ] ) | |
level_values = np.array( list(res.keys()) ) | |
if bExtreme : | |
imax = np.argmax( screening ) | |
clabels_o = hierarch_df.iloc[imax,:].values.tolist() | |
if not n_clusters is None : | |
jhit = np.argmin([ np.abs(len(cluster_df.iloc[i])-2-n_clusters)\ | |
for i in range(len(cluster_df)) ]) | |
clabels_n = hierarch_df.iloc[jhit,:].values.tolist() | |
print ( len(set(clabels_n)) , jhit ) | |
return ( clabels_n , clabels_o , hierarch_df , np.array( [level_values,screening] ) ) | |
def reformD( D:np.array , N:int ) -> np.array : | |
from scipy.spatial.distance import pdist,squareform | |
D = squareform(D) | |
if N != len( D ) : | |
print ('MALFORMED INPUT') | |
exit( 1 ) | |
return ( D ) | |
from impetuous.quantification import dirac_matrix | |
from impetuous.clustering import immersiveness | |
def complete_happiness ( ldf:pd.DataFrame , ndf:pd.DataFrame=None , clustercol:int=0 , namecol=0 , neighborcol=1 ) -> list[float] : | |
if ndf is None : | |
N = float( len(set(ldf.iloc[:,clustercol].values.tolist())) ) | |
N = len(ldf.index) | |
return( (np.sum( dirac_matrix(ldf.iloc[:,clustercol].values ), 0 )/N).tolist() ) | |
lookup = { i:(c,j) for i,c,j in zip(ldf.index.values,ldf.iloc[:,clustercol].values,range(len(ldf)) ) } | |
total_happiness_ = [] | |
for i_ in range( len(ldf) ) : | |
total_happiness_ .append( happiness( i_ , ldf , ndf , lookup=lookup ) ) | |
return ( total_happiness_ ) | |
from impetuous.clustering import complete_immersiveness as comim | |
def complete_immersiveness ( labels:list , D:np.array , fraction:float = 0.1 , bins:int = None , title_cmd:str='' , | |
bVerbose:bool=False , bInvert:bool=False , bAlternative:bool = True ) -> np.array : | |
# HERE | |
if bAlternative : | |
bSanityCheck = bVerbose | |
if bins is None : | |
bins = len(D) | |
if bins<10 : | |
bins = 100 | |
bins += 10 | |
grouped = dirac_matrix( labels ) | |
ungrouped = 1 - grouped | |
if bSanityCheck : | |
in_d = [ d for d in ( D * grouped ) .reshape(-1) if d>0 ] | |
out_d = [ d for d in (D * ungrouped) .reshape(-1) if d>0 ] | |
all_positives = np.histogram(in_d , range=(0,np.max(D.reshape(-1))) , bins=bins )[0] | |
all_positives = all_positives/np.sum(all_positives) | |
all_negatives , common_range = np.histogram(out_d, range=(0,np.max(D.reshape(-1))) , bins=bins ) | |
all_negatives = all_negatives/np.sum(all_negatives) | |
domain = 0.5*(common_range[1:]+common_range[:-1]) | |
# | |
tpr = np.cumsum(all_positives) | |
tpr = tpr/tpr[-1] | |
fpr = np.cumsum(all_negatives) | |
fpr = fpr/fpr[-1] | |
print ( 'FULL AUC:' , np.trapz(tpr,fpr) ) | |
in_d = D * grouped | |
out_d = D * ungrouped | |
dr = np.max(D.reshape(-1))/bins | |
domain = np.array( [ dr*0.5+i*dr for i in range( bins ) ] ) | |
cP,cN = [],[] | |
for d in domain : | |
cP.append ( np.sum( (in_d>0 ) * (in_d<=d) , 0 ) ) | |
cN.append ( np.sum( (out_d>0) * (out_d<=d) , 0 ) ) | |
cPdf = pd.DataFrame(cP) + 1 | |
cNdf = pd.DataFrame(cN) + 1 | |
TPR = (cPdf/cPdf.iloc[-1,:]).values | |
FPR = (cNdf/cNdf.iloc[-1,:]).values | |
del cPdf,cNdf | |
all_immersions = [] | |
for tpr,fpr in zip( TPR.T,FPR.T ) : | |
all_immersions.append( ( np.trapz( tpr,fpr ), np.abs(np.trapz(np.diff(tpr),np.diff(fpr)) ) ) ) | |
if bSanityCheck: | |
import matplotlib.pyplot as plt | |
mosaic = """ | |
AC | |
BC | |
""" | |
axd = plt.figure(1).subplot_mosaic( mosaic ) | |
identify_axes(axd) | |
#plt.subplot(311) | |
axd['A'].plot( domain[1:] , np.diff(TPR[:,0]) , 'y' , label = 'Grouped, first instance, immersion' ) | |
axd['A'].plot( domain[1:] , np.diff(FPR[:,0]) , 'c' , label = 'Ungrouped, first instance, immersion' ) | |
axd['A'].plot( domain , all_positives , 'r' , label = 'Grouped, all instances, Immersiveness' ) | |
axd['A'].plot( domain , all_negatives , 'b' , label = 'Ungrouped, all instances, Immersiveness' ) | |
axd['A'].set_title( 'Immersiveness and Immersion' ) | |
axd['A'].set_xlabel( 'Distance [Arb Unit]' ) | |
axd['A'].set_ylabel( 'Histogram' ) | |
axd['A'].legend() | |
#plt.subplot(312) | |
mFPR = np.mean( FPR.T , 0 ) | |
mTPR = np.mean( TPR.T , 0 ) | |
sFPR = np.std(FPR.T,0) | |
sTPR = np.std(TPR.T,0) | |
# | |
axd['B'].plot( mFPR - sFPR , mTPR + sTPR , ':g' , label = 'mean + error' ) | |
axd['B'].plot( mFPR , mTPR , 'g' , label='mean' ) | |
axd['B'].plot( mFPR + sFPR , mTPR - sTPR , ':g' , label = 'mean - error' ) | |
# | |
axd['B'].set_xlabel( 'FPR' ) | |
axd['B'].set_ylabel( 'TPR' ) | |
axd['B'].legend() | |
str_conv = lambda num: str(np.round(num*1000)/1000) | |
cal_title = 'Full AUC and error estimate: ' + str_conv( np.trapz( mTPR , mFPR )) +\ | |
' +- ' + str_conv ( np.trapz( mTPR + sTPR , mFPR - sFPR ) -\ | |
np.trapz( mTPR - sTPR , mFPR + sFPR ) ) | |
axd['B'].set_title( cal_title ) | |
#plt.subplot(313) | |
axd['C'].plot( domain/np.max(domain) , sTPR**2 , ':g' , label = 'mean + error' ) | |
axd['C'].set_ylabel( 'Information Capacity $C_{I}$ [ Arb. Unit ]' ) # [$\quad E \cdot M^{-1}\quad$ ]') | |
axd['C'].set_xlabel( 'Cluster fraction $T$ [ Arb. Unit ]' ) # [$\quadM^{-1}\quad$]' ) | |
axd['C'].set_title( title_cmd[0].upper()+title_cmd[1:]+' Linkage Agglomerative Transition' ) | |
#identify_axes(axd) | |
plt.show() | |
print ( cal_title ) | |
return ( np.array( all_immersions ) ) | |
exit( 1 ) | |
# | |
total_immersiveness = [] | |
N = len(labels) | |
for i in range ( N ) : | |
immersed = immersiveness ( i , labels , D ) | |
if bVerbose : | |
print ( 'IMERSIVENESS = ' , i , immersed ) | |
total_immersiveness .append( np.array(immersed) ) | |
return ( np.array ( total_immersiveness ) ) | |
def create_coordinates( data_spread:float = 0.10 , | |
N_centers:int = 5 , | |
N_points:int = 100 , | |
bMergeInternal:bool = False ) -> np.array : | |
# | |
print ( "USING" , N_centers , "CENTERS FOR ASSESSMENT" ) | |
omega = np.pi/float(N_centers)*2.0 | |
X,Y = [],[] | |
for ic in range(N_centers) : | |
Y .append( np.sin(omega*ic) + np.random.randn(N_points )*data_spread ) | |
X .append( np.cos(omega*ic) + np.random.randn(N_points )*data_spread ) | |
X = np.array(X).reshape(-1) | |
Y = np.array(Y).reshape(-1) | |
return ( np.array([ X,Y ]).T ) | |
coordinates = np.array([ *np.random.randn(500,100) , *(np.random.randn(500,100) + 10) ]) | |
#print ( coordinates ) | |
print ( np.shape(create_coordinates(N_points=1000)) ) | |
print ( np.shape(coordinates)) | |
coordinates = create_coordinates() | |
# | |
#exit(1) | |
print ( coordinates , np.shape(coordinates) ) | |
dm = distance_calculation( coordinates , | |
distance_type = 'euclidean', #'correlation,spearman' , | |
bRemoveCurse = False ) | |
print ( dm ) | |
print ( np.sum(np.diag(dm)), np.shape(dm) ) | |
#generate_clustering_solutions( dm , cmd='max' , bExtreme=True , n_clusters=80 ) | |
print ( 'DOING CLUSTERING' ) | |
cmd='min' | |
cln,clo,hdf,sol = generate_clustering_labels( dm , cmd=cmd , bExtreme=True , n_clusters=80 ) | |
imax = np.argmax( sol[1] ) | |
print ( 'HAVE SOLUTION' ) | |
#print ( 'H>' , complete_happiness( hdf.iloc[[imax]].T ) ) | |
import time | |
t0 = time.time() | |
#res1 = complete_immersiveness ( hdf.iloc[imax].values.tolist() , | |
# dm , bAlternative=False ) # 1062.102365732193 | |
t1 = time.time() | |
print ( t1-t0 ) | |
res2 = complete_immersiveness ( hdf.iloc[imax].values.tolist() , dm , title_cmd = {'min':'single' , | |
'max':'maximum' , 'ward':'ward', 'median':'median' }[cmd] , | |
bAlternative=True , bVerbose=True , bins = 1000 ) # 1223.762861251831 w 1000 bins | |
t2 = time.time() | |
#print ( 'I>' , res1 , t1-t0 ) | |
print ( 'I>' , res2 , t2-t1 ) | |
# | |
exit( 1 ) | |
print ( clo ) | |
print ( sol[0] ) | |
print ( sol[1] ) | |
print ( imax , len( set(hdf.iloc[imax].values.tolist()) ) ) | |
import matplotlib.pyplot as plt | |
plt.plot(sol[0],sol[1],'k') | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment