Skip to content

Instantly share code, notes, and snippets.

@rictjo
Created January 10, 2024 14:52
Show Gist options
  • Save rictjo/7c34f4e2ce77c49e75526f31915fb62c to your computer and use it in GitHub Desktop.
Save rictjo/7c34f4e2ce77c49e75526f31915fb62c to your computer and use it in GitHub Desktop.
command title for C
import pandas as pd
import numpy as np
import umap
def identify_axes( ax_dict:dict, fontsize:int=48 , color:str="darkgrey") :
kw = dict(ha="center",va="center",fontsize=fontsize,color=color)
for k,ax in ax_dict.items() :
ax.text(.85,.5,k,transform=ax.transAxes,**kw)
def distance_calculation ( coordinates:np.array ,
distance_type:str , bRemoveCurse:bool=False ,
nRound:int = None ) -> np.array :
# FROM IMPETUOUS-GFA
crds = coordinates
if 'correlation' in distance_type :
from impetuous.quantification import spearmanrho,pearsonrho
if 'pearson' in distance_type :
corr = pearsonrho( crds , crds )
else :
corr = spearmanrho( crds , crds )
corr = 0.5 * ( corr + corr.T ) # THIS SHOULD BE REMOVED ONCE DISC. LOCATED
if 'absolute' in distance_type :
corr = np.abs( corr )
if 'square' in distance_type :
corr = corr**2
distm = 1 - corr
else :
from scipy.spatial.distance import pdist,squareform
distm = squareform( pdist( crds , metric = distance_type ))
if bRemoveCurse : # EXPERIMENTAL
from impetuous.reducer import remove_curse
distm = remove_curse ( distm , nRound = nRound )
return ( distm )
#from impetuous.quantification import distance_calculation
def cluster_appraisal( x:pd.Series , garbage_n = 0 , Sfunc = lambda x:np.mean(x,0)) :
"""
Clustering Optimisation Method for Highly Connected Data
Richard Tjörnhammar
https://arxiv.org/abs/2208.04720v2
"""
# FROM IMPETUOUS-GFA
from collections import Counter
decomposition = [ tuple((item[1],item[0])) for item in Counter(x.values.tolist()).items() if item[1]
> garbage_n ]
N = len ( x )
A , B = 1 , N
if len(decomposition) > 0 :
A = Sfunc(decomposition)[0]
B = len(decomposition)
else :
decomposition = [ tuple( (0,0) ) ]
decomposition .append( tuple( ( B ,-garbage_n )) )
decomposition .append( tuple( ( A*B/(A+B) , None ) ) )
return ( decomposition )
#from impetuous.clustering import clustering_appraisal
def generate_clustering_labels ( distm:np.array , cmd:str='min' , labels:list[str]=None ,
bExtreme:bool=False , n_clusters:int = None) -> tuple :
# FROM IMPETUOUS-GFA
from impetuous.clustering import sclinkages
clabels_n , clabels_o = None , None
res = sclinkages( distm , cmd )['F']
index = list( res.keys() )
if labels is None :
labels = range(len(distm))
hierarch_df = pd.DataFrame ( res.values() , index=list(res.keys()) , columns = labels )
cluster_df = hierarch_df .T .apply( lambda x: cluster_appraisal(x,garbage_n = 0) )
clabels_o , clabels_n = None , None
screening = np.array( [ v[-1][0] for v in cluster_df.values ] )
level_values = np.array( list(res.keys()) )
if bExtreme :
imax = np.argmax( screening )
clabels_o = hierarch_df.iloc[imax,:].values.tolist()
if not n_clusters is None :
jhit = np.argmin([ np.abs(len(cluster_df.iloc[i])-2-n_clusters)\
for i in range(len(cluster_df)) ])
clabels_n = hierarch_df.iloc[jhit,:].values.tolist()
print ( len(set(clabels_n)) , jhit )
return ( clabels_n , clabels_o , hierarch_df , np.array( [level_values,screening] ) )
def recast_distance_to_similarity_matrix ( distm:np.array, transform=lambda x:np.exp(1.0/x) ,bDirect:bool=True )->np.array :
sdm = transform( distm ) * ( 1 - np.eye(len(distm)) )
if bDirect:
return ( sdm )
for i in range(len(sdm)) :
sdm[i,i] = 0
ldm = sdm + np.diag(np.sum(sdm,0))
return( ldm )
def generate_clustering_solutions ( distm:np.array , cmd:str='min' , labels:list[str]=None ,
bExtreme:bool=False , n_clusters:int = None) -> tuple :
# FROM IMPETUOUS-GFA
bUseSVD = True
bUseHierarchy = False
from impetuous.clustering import sclinkages
clabels_n , clabels_o = None , None
res = sclinkages( distm , cmd )['F']
index = list( res.keys() )
if labels is None :
labels = range(len(distm))
hierarch_df = pd.DataFrame ( res.values() , index=list(res.keys()) , columns = labels )
cluster_df = hierarch_df .T .apply( lambda x: cluster_appraisal(x,garbage_n = 0) )
similarity = np.eye(len(hierarch_df.columns))
from impetuous.quantification import dirac_matrix
if bUseHierarchy:
L_ = len(hierarch_df.index)
for i in range(len(hierarch_df.index)) :
print ( i/L_*100 )
similarity += dirac_matrix( hierarch_df.iloc[i].values.tolist() )
print ( similarity )
sdm = similarity * ( 1 - np.eye(len(hierarch_df.columns)) )
else :
sdm = recast_distance_to_similarity_matrix( distm )
for i in range(len(sdm)) :
sdm[i,i] = 0
ldm = sdm + np.diag(np.sum(sdm,0))
print ( ldm )
if bUseSVD :
sol = np.linalg.svd(ldm)
vectors = sol[0][:2]
vals = sol[1][:2]
else :
sol = np.linalg.eig(ldm)
vectors = sol[1][:2]
vals = sol[0][:2]
print ( vectors,vals )
import matplotlib.pyplot as plt
plt.subplot(121)
plt.plot( np.sqrt(vals[0])*vectors[0],np.sqrt(vals[1])*vectors[1],'k*' )
plt.subplot(122)
plt.plot( vectors[0],vectors[1],'k*' )
plt.show()
exit( 1 )
clabels_o , clabels_n = None , None
screening = np.array( [ v[-1][0] for v in cluster_df.values ] )
level_values = np.array( list(res.keys()) )
if bExtreme :
imax = np.argmax( screening )
clabels_o = hierarch_df.iloc[imax,:].values.tolist()
if not n_clusters is None :
jhit = np.argmin([ np.abs(len(cluster_df.iloc[i])-2-n_clusters)\
for i in range(len(cluster_df)) ])
clabels_n = hierarch_df.iloc[jhit,:].values.tolist()
print ( len(set(clabels_n)) , jhit )
return ( clabels_n , clabels_o , hierarch_df , np.array( [level_values,screening] ) )
def reformD( D:np.array , N:int ) -> np.array :
from scipy.spatial.distance import pdist,squareform
D = squareform(D)
if N != len( D ) :
print ('MALFORMED INPUT')
exit( 1 )
return ( D )
from impetuous.quantification import dirac_matrix
from impetuous.clustering import immersiveness
def complete_happiness ( ldf:pd.DataFrame , ndf:pd.DataFrame=None , clustercol:int=0 , namecol=0 , neighborcol=1 ) -> list[float] :
if ndf is None :
N = float( len(set(ldf.iloc[:,clustercol].values.tolist())) )
N = len(ldf.index)
return( (np.sum( dirac_matrix(ldf.iloc[:,clustercol].values ), 0 )/N).tolist() )
lookup = { i:(c,j) for i,c,j in zip(ldf.index.values,ldf.iloc[:,clustercol].values,range(len(ldf)) ) }
total_happiness_ = []
for i_ in range( len(ldf) ) :
total_happiness_ .append( happiness( i_ , ldf , ndf , lookup=lookup ) )
return ( total_happiness_ )
from impetuous.clustering import complete_immersiveness as comim
def complete_immersiveness ( labels:list , D:np.array , fraction:float = 0.1 , bins:int = None , title_cmd:str='' ,
bVerbose:bool=False , bInvert:bool=False , bAlternative:bool = True ) -> np.array :
# HERE
if bAlternative :
bSanityCheck = bVerbose
if bins is None :
bins = len(D)
if bins<10 :
bins = 100
bins += 10
grouped = dirac_matrix( labels )
ungrouped = 1 - grouped
if bSanityCheck :
in_d = [ d for d in ( D * grouped ) .reshape(-1) if d>0 ]
out_d = [ d for d in (D * ungrouped) .reshape(-1) if d>0 ]
all_positives = np.histogram(in_d , range=(0,np.max(D.reshape(-1))) , bins=bins )[0]
all_positives = all_positives/np.sum(all_positives)
all_negatives , common_range = np.histogram(out_d, range=(0,np.max(D.reshape(-1))) , bins=bins )
all_negatives = all_negatives/np.sum(all_negatives)
domain = 0.5*(common_range[1:]+common_range[:-1])
#
tpr = np.cumsum(all_positives)
tpr = tpr/tpr[-1]
fpr = np.cumsum(all_negatives)
fpr = fpr/fpr[-1]
print ( 'FULL AUC:' , np.trapz(tpr,fpr) )
in_d = D * grouped
out_d = D * ungrouped
dr = np.max(D.reshape(-1))/bins
domain = np.array( [ dr*0.5+i*dr for i in range( bins ) ] )
cP,cN = [],[]
for d in domain :
cP.append ( np.sum( (in_d>0 ) * (in_d<=d) , 0 ) )
cN.append ( np.sum( (out_d>0) * (out_d<=d) , 0 ) )
cPdf = pd.DataFrame(cP) + 1
cNdf = pd.DataFrame(cN) + 1
TPR = (cPdf/cPdf.iloc[-1,:]).values
FPR = (cNdf/cNdf.iloc[-1,:]).values
del cPdf,cNdf
all_immersions = []
for tpr,fpr in zip( TPR.T,FPR.T ) :
all_immersions.append( ( np.trapz( tpr,fpr ), np.abs(np.trapz(np.diff(tpr),np.diff(fpr)) ) ) )
if bSanityCheck:
import matplotlib.pyplot as plt
mosaic = """
AC
BC
"""
axd = plt.figure(1).subplot_mosaic( mosaic )
identify_axes(axd)
#plt.subplot(311)
axd['A'].plot( domain[1:] , np.diff(TPR[:,0]) , 'y' , label = 'Grouped, first instance, immersion' )
axd['A'].plot( domain[1:] , np.diff(FPR[:,0]) , 'c' , label = 'Ungrouped, first instance, immersion' )
axd['A'].plot( domain , all_positives , 'r' , label = 'Grouped, all instances, Immersiveness' )
axd['A'].plot( domain , all_negatives , 'b' , label = 'Ungrouped, all instances, Immersiveness' )
axd['A'].set_title( 'Immersiveness and Immersion' )
axd['A'].set_xlabel( 'Distance [Arb Unit]' )
axd['A'].set_ylabel( 'Histogram' )
axd['A'].legend()
#plt.subplot(312)
mFPR = np.mean( FPR.T , 0 )
mTPR = np.mean( TPR.T , 0 )
sFPR = np.std(FPR.T,0)
sTPR = np.std(TPR.T,0)
#
axd['B'].plot( mFPR - sFPR , mTPR + sTPR , ':g' , label = 'mean + error' )
axd['B'].plot( mFPR , mTPR , 'g' , label='mean' )
axd['B'].plot( mFPR + sFPR , mTPR - sTPR , ':g' , label = 'mean - error' )
#
axd['B'].set_xlabel( 'FPR' )
axd['B'].set_ylabel( 'TPR' )
axd['B'].legend()
str_conv = lambda num: str(np.round(num*1000)/1000)
cal_title = 'Full AUC and error estimate: ' + str_conv( np.trapz( mTPR , mFPR )) +\
' +- ' + str_conv ( np.trapz( mTPR + sTPR , mFPR - sFPR ) -\
np.trapz( mTPR - sTPR , mFPR + sFPR ) )
axd['B'].set_title( cal_title )
#plt.subplot(313)
axd['C'].plot( domain/np.max(domain) , sTPR**2 , ':g' , label = 'mean + error' )
axd['C'].set_ylabel( 'Information Capacity $C_{I}$ [ Arb. Unit ]' ) # [$\quad E \cdot M^{-1}\quad$ ]')
axd['C'].set_xlabel( 'Cluster fraction $T$ [ Arb. Unit ]' ) # [$\quadM^{-1}\quad$]' )
axd['C'].set_title( title_cmd[0].upper()+title_cmd[1:]+' Linkage Agglomerative Transition' )
#identify_axes(axd)
plt.show()
print ( cal_title )
return ( np.array( all_immersions ) )
exit( 1 )
#
total_immersiveness = []
N = len(labels)
for i in range ( N ) :
immersed = immersiveness ( i , labels , D )
if bVerbose :
print ( 'IMERSIVENESS = ' , i , immersed )
total_immersiveness .append( np.array(immersed) )
return ( np.array ( total_immersiveness ) )
def create_coordinates( data_spread:float = 0.10 ,
N_centers:int = 5 ,
N_points:int = 100 ,
bMergeInternal:bool = False ) -> np.array :
#
print ( "USING" , N_centers , "CENTERS FOR ASSESSMENT" )
omega = np.pi/float(N_centers)*2.0
X,Y = [],[]
for ic in range(N_centers) :
Y .append( np.sin(omega*ic) + np.random.randn(N_points )*data_spread )
X .append( np.cos(omega*ic) + np.random.randn(N_points )*data_spread )
X = np.array(X).reshape(-1)
Y = np.array(Y).reshape(-1)
return ( np.array([ X,Y ]).T )
coordinates = np.array([ *np.random.randn(500,100) , *(np.random.randn(500,100) + 10) ])
#print ( coordinates )
print ( np.shape(create_coordinates(N_points=1000)) )
print ( np.shape(coordinates))
coordinates = create_coordinates()
#
#exit(1)
print ( coordinates , np.shape(coordinates) )
dm = distance_calculation( coordinates ,
distance_type = 'euclidean', #'correlation,spearman' ,
bRemoveCurse = False )
print ( dm )
print ( np.sum(np.diag(dm)), np.shape(dm) )
#generate_clustering_solutions( dm , cmd='max' , bExtreme=True , n_clusters=80 )
print ( 'DOING CLUSTERING' )
cmd='min'
cln,clo,hdf,sol = generate_clustering_labels( dm , cmd=cmd , bExtreme=True , n_clusters=80 )
imax = np.argmax( sol[1] )
print ( 'HAVE SOLUTION' )
#print ( 'H>' , complete_happiness( hdf.iloc[[imax]].T ) )
import time
t0 = time.time()
#res1 = complete_immersiveness ( hdf.iloc[imax].values.tolist() ,
# dm , bAlternative=False ) # 1062.102365732193
t1 = time.time()
print ( t1-t0 )
res2 = complete_immersiveness ( hdf.iloc[imax].values.tolist() , dm , title_cmd = {'min':'single' ,
'max':'maximum' , 'ward':'ward', 'median':'median' }[cmd] ,
bAlternative=True , bVerbose=True , bins = 1000 ) # 1223.762861251831 w 1000 bins
t2 = time.time()
#print ( 'I>' , res1 , t1-t0 )
print ( 'I>' , res2 , t2-t1 )
#
exit( 1 )
print ( clo )
print ( sol[0] )
print ( sol[1] )
print ( imax , len( set(hdf.iloc[imax].values.tolist()) ) )
import matplotlib.pyplot as plt
plt.plot(sol[0],sol[1],'k')
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment