Skip to content

Instantly share code, notes, and snippets.

@buttercutter
Last active December 10, 2024 13:54
Show Gist options
  • Save buttercutter/1b7108bd7136e034bc0f67921f846348 to your computer and use it in GitHub Desktop.
Save buttercutter/1b7108bd7136e034bc0f67921f846348 to your computer and use it in GitHub Desktop.
Simple code implementation for nlp text clustering and classification tasks
# Credit: Claude-3.5-Sonnet-200k AI chatbot
import numpy as np
from transformers import AutoTokenizer, AutoModel
from transformers import LongformerModel, LongformerTokenizer
import torch
import pandas as pd
import os
import umap
import matplotlib.pyplot as plt
from time import time
from sklearn.manifold import TSNE
from sklearn.preprocessing import normalize, StandardScaler, MaxAbsScaler
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation, NMF, PCA
from sklearn.metrics import calinski_harabasz_score, silhouette_score, davies_bouldin_score, adjusted_rand_score
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.mixture import GaussianMixture
from collections import Counter, defaultdict
import ternary
import hdbscan
import spacy
import gensim
from gensim import corpora, models
from gensim.matutils import corpus2dense
# BERT model is limited to max context length of 512 tokens,
# Disable this option to use LongformerModel wich has context length of 4096 tokens.
USE_BERT = 1
# Check available device
if torch.cuda.is_available():
device = torch.device('cuda')
print("Using CUDA GPU")
elif torch.backends.mps.is_available():
device = torch.device('mps')
print("Using Apple Silicon MPS")
else:
device = torch.device('cpu')
print("Using CPU")
# Read the CSV file
df = pd.read_csv('Copy of sg_articles_list - channelnewsasia.csv')
# Convert all values in relevant columns to lowercase for case-insensitive comparison
df['Relevant (Yes/No) Polly'] = df['Relevant (Yes/No) Polly'].str.lower()
df['Relevant (Yes/No) Fei'] = df['Relevant (Yes/No) Fei'].str.lower()
# Get filenames where any of the relevant columns contain 'yes'
article_filenames = df[
(df['Relevant (Yes/No) Polly'] == 'yes') |
(df['Relevant (Yes/No) Fei'] == 'yes')
]['Article'].tolist()
# Clean up the filenames by replacing ':' with '_'
article_filenames = [filename.replace(':', '_') for filename in article_filenames]
# Create list to store article contents
articles = []
# Read each article file
for filename in article_filenames:
file_path = os.path.join('/Users/john/covidnews/singapore', filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
articles.append(content)
except FileNotFoundError:
print(f"Could not find file: {file_path}")
except Exception as e:
print(f"Error reading file {file_path}: {str(e)}")
print(f"Number of articles read: {len(articles)}")
'''
# Sample articles
articles = [
"The stock market saw significant gains today as tech stocks rallied.",
"Scientists discover new species of marine life in the Pacific.",
"Latest smartphone release features advanced AI capabilities.",
"New environmental regulations impact industrial sector.",
"Sports team wins championship after dramatic finale."
]
'''
class SimpleArticleClusterer:
def __init__(self, n_clusters=2):
self.vectorizer = TfidfVectorizer(max_features=1000)
self.kmeans = KMeans(n_clusters=n_clusters, n_init=10)
def fit_predict(self, texts):
# Transform texts to TF-IDF vectors
vectors = self.vectorizer.fit_transform(texts)
print(f"vectors.shape = {vectors.shape}")
# Perform clustering
return self.kmeans.fit_predict(vectors)
def demonstrate_clustering():
# Initialize clusterer
clusterer = SimpleArticleClusterer(n_clusters=2)
# Get clusters
clusters = clusterer.fit_predict(articles)
# Get TF-IDF vectors for topic analysis
vectors = clusterer.vectorizer.fit_transform(articles)
# Print basic clustering results
print("Clustering Results:")
for article, cluster in zip(articles, clusters):
print(f"\nCluster {cluster}:")
print(f"Article: {article[:100]}...")
# Get cluster probabilities for each article
vectors = vectors.toarray()
cluster_probs = np.zeros((len(articles), 2))
# Calculate distance-based probabilities
for i, vec in enumerate(vectors):
distances = np.linalg.norm(vec - clusterer.kmeans.cluster_centers_, axis=1)
similarities = 1 / (1 + distances) # Convert distances to similarities
probs = similarities / np.sum(similarities) # Normalize to get probabilities
cluster_probs[i] = probs
# Print detailed analysis
print("\nDetailed TF-IDF Cluster Analysis:")
analyze_clusters(articles, clusters)
print("\nTF-IDF Sample Articles per Cluster:")
print_topic_articles(articles, cluster_probs)
# Calculate clustering metrics
# Evaluate clustering results
print("\nClustering Metrics for TF-IDF:")
evaluate_clusters(clusterer, articles, clusters, topic_distributions=vectors)
# Visualize TF-IDF results
visualize_cluster_separation(vectors, clusters, "TF-IDF")
# Print cluster distribution
print("\nCluster Distribution:")
print(pd.Series(clusters).value_counts().sort_index())
# Print top terms for each cluster
feature_names = clusterer.vectorizer.get_feature_names_out()
print("\nTop Terms per Cluster:")
cluster_centers = clusterer.kmeans.cluster_centers_
for i, center in enumerate(cluster_centers):
top_indices = center.argsort()[-10:][::-1] # Get indices of top 10 terms
top_terms = [feature_names[idx] for idx in top_indices]
print(f"\nCluster {i} Top Terms:")
print(", ".join(top_terms))
return clusters
def demonstrate_clustering_original():
# Initialize clusterer
clusterer = SimpleArticleClusterer(n_clusters=2)
# Get clusters
clusters = clusterer.fit_predict(articles)
# Print results
print("Clustering Results:")
for article, cluster in zip(articles, clusters):
print(f"\nCluster {cluster}:")
print(f"Article: {article[:100]}...")
class BERTArticleClusterer:
def __init__(self, n_clusters=2, use_hdbscan=False):
if USE_BERT:
self.tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
self.model = AutoModel.from_pretrained('bert-base-uncased').to(device)
else:
self.tokenizer = LongformerTokenizer.from_pretrained('allenai/longformer-base-4096')
self.model = LongformerModel.from_pretrained('allenai/longformer-base-4096').to(device)
if not use_hdbscan:
self.clustering_algorithm = KMeans(n_clusters=n_clusters, n_init=10)
#self.clustering_algorithm = AgglomerativeClustering(n_clusters=n_clusters, linkage='complete')
#self.clustering_algorithm = GaussianMixture(n_components=n_clusters)
'''
self.clustering_algorithm = GaussianMixture(
n_components=2, # Number of clusters
covariance_type='tied', # Try: 'tied', 'diag', 'spherical', 'full'
n_init=20, # Increase number of initializations
max_iter=1000, # Increase maximum iterations
init_params='kmeans', # Use KMeans for initialization
random_state=24,
reg_covar=1e-3, # Increased regularization
tol=1e-3, # Relaxed tolerance
warm_start=True # Use previous solution,
)
'''
else:
'''
Silhouette Score: 0.049
Calinski-Harabasz Score: 57.230
Davies Score: 5.325
'''
self.clustering_algorithm = hdbscan.HDBSCAN( # HDBSCAN does not need to know the n_cluster
min_cluster_size=100, # Try 10% of dataset size
min_samples=1, #max(3,int(len(articles)*0.05)), # Try 5% of dataset size
metric='euclidean', # Try different metrics: 'l2', 'manhattan'
cluster_selection_method='eom', # Try 'leaf' instead, 'eom' tends to find major clusters
alpha=1.3, # Higher alpha = more conservative clustering
#p=2, # Power parameter for minkowski metric
cluster_selection_epsilon=8, # Helps control cluster granularity
core_dist_n_jobs=-1 # Use all CPU cores
)
def get_bert_embeddings(self, texts, batch_size=8):
if USE_BERT:
batch_size = batch_size << 3
embeddings = []
for i in range(0, len(texts), batch_size):
# Clear memory before processing each batch
if torch.cuda.is_available():
torch.cuda.empty_cache()
elif hasattr(torch.mps, 'empty_cache'): # Check if MPS backend exists
torch.mps.empty_cache()
batch_texts = texts[i:i + batch_size]
#batch_texts = [preprocess_text(text) for text in batch_texts]
if USE_BERT:
max_length = 512
else:
max_length = 4096
# Tokenize and encode text
inputs = self.tokenizer(batch_texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=max_length).to(device)
# Get BERT embeddings
with torch.no_grad():
outputs = self.model(**inputs)
batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu()
embeddings.extend(batch_embeddings.numpy())
# Clear GPU memory
del outputs
del inputs
#torch.cuda.empty_cache()
return np.array(embeddings)
def reduce_dimensions(self, embeddings, n_components=50, method='umap'):
if method == 'umap':
reducer = umap.UMAP(
n_components=n_components,
n_neighbors=15,
min_dist=0.1,
metric='cosine',
random_state=42
)
elif method == 'tsne':
# t-SNE with barnes_hut can only output 2 or 3 dimensions
if n_components > 3:
reducer = TSNE(
n_components=n_components,
method='exact', # Use exact method for high dimensions
random_state=42
)
else:
reducer = TSNE(
n_components=n_components,
method='barnes_hut', # Faster for low dimensions
perplexity=30,
random_state=42
)
else:
# Reduce dimensionality with PCA
reducer = PCA(n_components=n_components) # Keep top 50 principal components
reduced_embeddings = reducer.fit_transform(embeddings)
return reduced_embeddings
def fit_predict(self, texts):
# Get BERT embeddings
embeddings = self.get_bert_embeddings(texts)
# Normalize embeddings to unit length
embeddings = normalize(embeddings)
reduced_embeddings = self.reduce_dimensions(embeddings, n_components=50, method='tsne')
# Apply K-means clustering
return reduced_embeddings, self.clustering_algorithm.fit_predict(reduced_embeddings)
def calculate_cohens_d(cluster1_data, cluster2_data):
"""
Calculate Cohen's d effect size between two clusters
"""
# Calculate means
mean1 = np.mean(cluster1_data)
mean2 = np.mean(cluster2_data)
# Calculate standard deviations
std1 = np.std(cluster1_data, ddof=1)
std2 = np.std(cluster2_data, ddof=1)
# Calculate pooled standard deviation
n1 = len(cluster1_data)
n2 = len(cluster2_data)
pooled_std = np.sqrt(((n1 - 1) * std1**2 + (n2 - 1) * std2**2) / (n1 + n2 - 2))
# Calculate Cohen's d
d = abs(mean1 - mean2) / pooled_std
# Interpret the effect size
if d < 0.2:
interpretation = "negligible"
elif d < 0.5:
interpretation = "small"
elif d < 0.8:
interpretation = "medium"
else:
interpretation = "large"
return d, interpretation
def compare_clusterings(labels1, labels2):
"""
Compare two different clustering results using Adjusted Rand Index
Parameters:
labels1, labels2: Arrays of cluster labels to compare
Returns:
float: ARI score (-1 to 1, where 1 means perfect match)
"""
ari = adjusted_rand_score(labels1, labels2)
# Interpret the score
if ari > 0.9:
interpretation = "excellent agreement"
elif ari > 0.7:
interpretation = "strong agreement"
elif ari > 0.5:
interpretation = "moderate agreement"
else:
interpretation = "weak agreement"
return ari, interpretation
# Compares different clustering methods
def compare_clustering_methods(tfidf_results, bert_results, lda_results, nmf_results, gensim_results):
"""
Compare different clustering approaches using ARI
Uses pre-computed clustering results instead of recalculating
"""
# Check shapes
print(f"\nCluster array shapes:")
print(f"TF-IDF: {tfidf_results.shape if hasattr(tfidf_results, 'shape') else len(tfidf_results)}")
print(f"BERT: {bert_results.shape if hasattr(bert_results, 'shape') else len(bert_results)}")
print(f"LDA: {lda_results.shape if hasattr(lda_results, 'shape') else len(lda_results)}")
print(f"NMF: {nmf_results.shape if hasattr(nmf_results, 'shape') else len(nmf_results)}")
print(f"GENSIM: {gensim_results.shape if hasattr(gensim_results, 'shape') else len(gensim_results)}")
# Compare all pairs
comparisons = {
'TF-IDF vs BERT': compare_clusterings(tfidf_results, bert_results),
'TF-IDF vs LDA': compare_clusterings(tfidf_results, lda_results),
'TF-IDF vs NMF': compare_clusterings(tfidf_results, nmf_results),
'TF-IDF vs GENSIM': compare_clusterings(tfidf_results, gensim_results),
'BERT vs LDA': compare_clusterings(bert_results, lda_results),
'BERT vs NMF': compare_clusterings(bert_results, nmf_results),
'BERT vs GENSIM': compare_clusterings(bert_results, gensim_results),
'LDA vs NMF': compare_clusterings(lda_results, nmf_results),
'LDA vs GENSIM': compare_clusterings(lda_results, gensim_results),
'NMF vs GENSIM': compare_clusterings(nmf_results, gensim_results)
}
print("\nClustering Method Comparisons (Adjusted Rand Index):")
for comparison, (ari, interpretation) in comparisons.items():
print(f"{comparison}: {ari:.3f} ({interpretation})")
return comparisons
def evaluate_clusters(clusterer, texts, labels, topic_distributions=None):
if isinstance(clusterer, BERTArticleClusterer):
# For BERT, use embeddings for evaluation
embeddings = clusterer.get_bert_embeddings(texts)
elif topic_distributions is not None:
# For topic modeling methods (LDA, NMF, Gensim)
'''
eps = 1e-10 # Add small epsilon to avoid numerical issues
embeddings = topic_distributions + eps
# Optional: Additional normalization, # Normalize to make scores more comparable with BERT
embeddings = normalize(embeddings)
'''
# Scale features to similar range as BERT embeddings
scaler = StandardScaler()
embeddings = scaler.fit_transform(topic_distributions)
else:
# For TF-IDF or other methods
embeddings = topic_distributions
# Calculate clustering quality metrics
silhouette = silhouette_score(embeddings, labels)
calinski = calinski_harabasz_score(embeddings, labels)
davies = davies_bouldin_score(embeddings, labels)
print(f"Silhouette Score: {silhouette:.3f}")
print(f"Calinski-Harabasz Score: {calinski:.3f}")
print(f"Davies Score: {davies:.3f}")
def analyze_clusters(texts, clusters):
cluster_results = pd.DataFrame({
'Article': texts,
'Cluster': clusters
})
# Initialize vectorizer for key terms
vectorizer = CountVectorizer(
max_features=20,
stop_words='english',
ngram_range=(1,2) # Include both unigrams and bigrams
)
for cluster_id in sorted(set(clusters)):
cluster_texts = cluster_results[cluster_results['Cluster'] == cluster_id]['Article']
# Get frequent terms
term_matrix = vectorizer.fit_transform(cluster_texts)
terms = vectorizer.get_feature_names_out()
frequencies = term_matrix.sum(axis=0).A1
top_terms = sorted(zip(terms, frequencies), key=lambda x: x[1], reverse=True)[:10]
# Get representative articles (first few sentences)
sample_articles = cluster_texts.head(3).apply(
lambda x: ' '.join(x.split('.')[:2]) + '...'
).tolist()
print(f"\nCluster {cluster_id} Analysis:")
print(f"Number of articles: {len(cluster_texts)}")
print("\nTop terms:")
for term, freq in top_terms:
print(f"- {term}: {freq}")
# Print sample articles from each cluster
unique_labels = np.unique(clusters)
for cluster_id in range(len(unique_labels)):
print(f"\nCluster {cluster_id} Sample Articles:")
cluster_texts = [text for text, label in
zip(texts, clusters) if label == cluster_id]
for text in cluster_texts[:3]:
print(f"\n{text[:200]}...")
def analyze_cluster_separation(embeddings, clusters):
# Analyze cluster separation using Cohen's d
# Cohen's d analysis if embeddings are provided
if embeddings is None and len(set(clusters)) != 2:
print(f"inputs are not suitable for cohen's d analysis")
return [] # Return empty list instead of None
print("\nCohen's d Analysis of Cluster Separation:")
# Get data for each cluster
cluster0_mask = clusters == 0
cluster1_mask = clusters == 1
# For each dimension in embeddings
n_dimensions = embeddings.shape[1]
significant_dimensions = []
print(f"n_dimensions = {n_dimensions}")
for dim in range(n_dimensions):
d, interpretation = calculate_cohens_d(
embeddings[cluster0_mask, dim],
embeddings[cluster1_mask, dim]
)
if d >= 0.8: # Only show dimensions with large effect size
significant_dimensions.append({
'dimension': dim,
'cohens_d': d,
'interpretation': interpretation
})
# Sort and display top dimensions by effect size
significant_dimensions.sort(key=lambda x: x['cohens_d'], reverse=True)
print("\nTop dimensions with large separation (Cohen's d ≥ 0.8):")
for dim in significant_dimensions[:5]: # Show top 5
print(f"Dimension {dim['dimension']}: d = {dim['cohens_d']:.3f} ({dim['interpretation']})")
# Calculate average effect size
avg_d = np.mean([dim['cohens_d'] for dim in significant_dimensions])
print(f"\nAverage Cohen's d for significant dimensions: {avg_d:.3f}")
return significant_dimensions
def visualize_cluster_separation_ternary(topic_distributions, labels, method_name="LDA"):
"""
Visualize cluster separation using a ternary plot (enhanced for 2 topics).
"""
# Add a dummy topic with probability zero
dummy_topic = np.zeros((topic_distributions.shape[0], 1))
td_ternary = np.concatenate((topic_distributions, dummy_topic), axis=1)
# Create a ternary plot
fig, tax = ternary.figure(scale=1.0)
tax.boundary(linewidth=2.0)
tax.gridlines(multiple=0.2, color="black")
tax.set_title(f"{method_name}'s Cluster Separation (Ternary, 2 Topics)")
# Use a perceptually uniform colormap (viridis)
#colors = plt.cm.viridis(np.linspace(0, 1, max(labels) + 1))
colors = ["blue", "orange"]
# Plot each cluster with different colors and larger marker size
for cluster in range(max(labels) + 1):
mask = labels == cluster
points = td_ternary[mask]
tax.scatter(points, label=f'Cluster {cluster}', color=colors[cluster], alpha=0.7, s=60) # Larger marker
# Customize the plot
tax.ticks(axis='lbr', multiple=0.2, linewidth=1, offset=0.025)
tax.legend()
tax.left_axis_label("Topic 2")
tax.right_axis_label("Topic 1")
tax.bottom_axis_label("Dummy Topic")
tax.clear_matplotlib_ticks()
filename = f'cluster_visualization_ternary_enhanced_{method_name.lower()}.png'
plt.savefig(filename)
plt.close()
print(f"Enhanced ternary plot saved as {filename}")
def visualize_cluster_separation_histogram(topic_distributions, labels, method_name="LDA"):
"""
Visualize cluster separation using a histogram of topic probabilities.
"""
plt.figure(figsize=(10, 6))
# Use a perceptually uniform colormap
#colors = plt.cm.viridis(np.linspace(0, 1, max(labels) + 1))
colors = ["blue", "orange"]
for cluster in range(max(labels) + 1):
mask = labels == cluster
plt.hist(topic_distributions[mask, 0], # Probability of Topic 1
label=f'Cluster {cluster}',
alpha=0.7,
bins=20, # Adjust bins as needed
color=colors[cluster])
plt.xlabel("Probability of Topic 1")
plt.ylabel("Number of Documents")
plt.title(f"{method_name}'s Cluster Separation (Histogram, 2 Topics)")
plt.legend()
filename = f'cluster_visualization_histogram_{method_name.lower()}.png'
plt.savefig(filename)
plt.close()
print(f"Histogram plot saved as {filename}")
def visualize_cluster_separation(embeddings, labels, method_name="BERT", top_dims=2):
"""
Visualize cluster separation using dimensions with highest Cohen's d
"""
# Get separation analysis
significant_dimensions = analyze_cluster_separation(embeddings, labels)
if not significant_dimensions:
print(f"significant_dimensions = {significant_dimensions}")
return
if len(significant_dimensions) < 2:
print("Not enough significant dimensions for visualization")
return
# Get top dimensions
top_dimensions = [dim['dimension'] for dim in significant_dimensions[:top_dims]]
# Create scatter plot
plt.figure(figsize=(10, 6))
for cluster in range(2): # Assuming binary clustering
mask = labels == cluster
plt.scatter(
embeddings[mask, top_dimensions[0]],
embeddings[mask, top_dimensions[1]],
label=f'Cluster {cluster}',
alpha=0.6
)
plt.xlabel(f'Dimension {top_dimensions[0]} (d={significant_dimensions[0]["cohens_d"]:.2f})')
plt.ylabel(f'Dimension {top_dimensions[1]} (d={significant_dimensions[1]["cohens_d"]:.2f})')
plt.title(f"{method_name}'s Cluster or Topic Separation in Top 2 Dimensions by Cohen's d")
plt.legend()
#plt.show(block=True) # This will block execution until plot window is closed
#input("Press Enter to continue...") # Optional: wait for user input before continuing
# Save plot instead of showing it
# This is probably the most practical when code running with output logfile redirection
filename = f'cluster_visualization_{method_name.lower()}.png'
plt.savefig(filename)
plt.close() # Close the figure to free memory
print(f"Plot saved as {filename}")
def demonstrate_bert_clustering():
# Initialize clusterer
clusterer = BERTArticleClusterer(n_clusters=2)
try:
# Get clusters
#print(f"clusterer.fit_predict is running now")
reduced_embeddings, clusters = clusterer.fit_predict(articles)
# Evaluate clustering results
print("\nClustering Metrics for BERT:")
evaluate_clusters(clusterer, articles, clusters)
# Add cluster analysis
print("\nDetailed Cluster Analysis:")
analyze_clusters(articles, clusters)
# Optional: Save cluster assignments
cluster_results = pd.DataFrame({
'Article': articles,
'Cluster': clusters
})
# Print cluster distribution
print("\nCluster Distribution:")
print(pd.Series(clusters).value_counts().sort_index())
# Try to visualize if possible
try:
# Visualize the analysis results
visualize_cluster_separation(reduced_embeddings, clusters)
except Exception as viz_error:
print(f"Visualization error (non-critical): {str(viz_error)}")
# Return the clusters
return clusters
except Exception as e:
print(f"Critical error in clustering: {str(e)}")
return None
def extract_topics_lda(articles, num_topics=2, num_words=10):
"""
Latent Dirichlet Allocation (LDA) for topic modeling
"""
# Create TF-IDF vectors
vectorizer = TfidfVectorizer(
max_df=0.8, # Ignore terms that appear in more than 80% of documents
min_df=5, # Ignore terms that appear in fewer than 5 documents
max_features=5000,
stop_words='english',
ngram_range=(1, 2) # Consider single words and bi-grams
)
'''
# Initialize vectorizer for key terms
vectorizer = CountVectorizer(
max_features=5000,
stop_words='english',
ngram_range=(1,2) # Include both unigrams and bigrams
)
'''
X = vectorizer.fit_transform(articles)
# Scale the TF-IDF matrix before LDA
#X = MaxAbsScaler().fit_transform(X)
# Apply LDA
lda = LatentDirichletAllocation(
n_components=num_topics,
random_state=42,
max_iter=50, # Increase number of iterations
learning_method='online',
learning_decay=0.7, # Control learning rate decay
n_jobs=-1 # Use all available cores
)
lda.fit(X)
# Get feature names
feature_names = vectorizer.get_feature_names_out()
# Extract topics
topics = []
for topic_idx, topic in enumerate(lda.components_):
top_words = [feature_names[i] for i in topic.argsort()[:-num_words-1:-1]]
topics.append(top_words)
# Get topic distributions
topic_distributions = lda.transform(X)
# Normalize topic distributions
# --- KEY CHANGE: Add a small constant before normalization ---
epsilon = 1e-5 # Small constant
topic_distributions = topic_distributions + epsilon
topic_distributions = normalize(topic_distributions, norm='l1', axis=1)
# Dimensionality reduction for visualization (using t-SNE)
#if num_topics == 2:
# tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, topic_distributions.shape[0]-1), method='barnes_hut')
# topic_distributions = tsne.fit_transform(topic_distributions)
# Project back to word space for visualization and evaluation
#topic_distributions = np.dot(topic_distributions, lda.components_)
# Get cluster assignments
cluster_labels = np.argmax(topic_distributions, axis=1)
# Calculate clustering metrics
# Evaluate clustering results
print("\nClustering Metrics for LDA:")
#evaluate_clusters(clusterer=lda, texts=articles, labels=cluster_labels, topic_distributions=topic_distributions)
evaluate_clusters(clusterer=lda, texts=articles, labels=cluster_labels, topic_distributions=X.toarray())
return topics, topic_distributions
def extract_topics_nmf(articles, num_topics=2, num_words=10, alpha=0.001):
"""
Non-negative Matrix Factorization (NMF) for topic modeling
"""
# Create TF-IDF vectors
vectorizer = TfidfVectorizer(max_features=5000, stop_words='english')
X = vectorizer.fit_transform(articles)
# Scale the TF-IDF matrix before NMF
X = MaxAbsScaler().fit_transform(X)
# Fit NMF model
nmf = NMF(n_components=num_topics, random_state=42, alpha_W=alpha, alpha_H=alpha)
# First fit the model
nmf.fit(X)
# Then transform to get document-topic matrix
doc_topic = nmf.transform(X)
# Check for NaNs or infinite values in NMF outputs
assert not np.isnan(nmf.components_).any(), "NMF components contain NaN values."
assert not np.isnan(doc_topic).any(), "Document-topic matrix contains NaN values."
assert not np.isinf(nmf.components_).any(), "NMF components contain Inf values."
assert not np.isinf(doc_topic).any(), "Document-topic matrix contains Inf values."
# Project back to word space
#doc_topic = np.dot(doc_topic, nmf.components_)
# Normalize the document-topic distributions
#doc_topic = doc_topic / doc_topic.sum(axis=1)[:, np.newaxis]
# Get cluster assignments
cluster_labels = np.argmax(doc_topic, axis=1)
# Get feature names and extract topics
feature_names = vectorizer.get_feature_names_out()
topics = []
for topic_idx, topic in enumerate(nmf.components_):
top_words = [feature_names[i] for i in topic.argsort()[:-num_words-1:-1]]
topics.append(top_words)
# Calculate clustering metrics
# Evaluate clustering results
print("\nClustering Metrics for NMF:")
#evaluate_clusters(clusterer=nmf, texts=articles, labels=cluster_labels, topic_distributions=doc_topic)
evaluate_clusters(clusterer=nmf, texts=articles, labels=cluster_labels, topic_distributions=X.toarray())
return topics, doc_topic
def cluster_articles(articles, method='kmeans', n_clusters=5):
"""
Cluster articles using K-means or DBSCAN
"""
vectorizer = TfidfVectorizer(max_features=5000, stop_words='english')
X = vectorizer.fit_transform(articles)
if method == 'kmeans':
clustering = KMeans(n_clusters=n_clusters, random_state=42)
else:
clustering = DBSCAN(eps=0.3, min_samples=5)
labels = clustering.fit_predict(X)
return labels
def extract_keywords(articles):
"""
Extract keywords using spaCy
"""
nlp = spacy.load('en_core_web_sm')
keywords = []
for article in articles:
doc = nlp(article)
# Extract named entities and noun phrases
keywords_article = []
keywords_article.extend([ent.text for ent in doc.ents])
keywords_article.extend([chunk.text for chunk in doc.noun_chunks])
keywords.append(list(set(keywords_article)))
return keywords
def topic_modeling_gensim(articles, num_topics=2):
"""
Topic modeling using Gensim
"""
# Tokenize articles
nlp = spacy.load('en_core_web_sm')
texts = []
for article in articles:
doc = nlp(article)
tokens = [token.lemma_ for token in doc if not token.is_stop and token.is_alpha]
texts.append(tokens)
# Create dictionary and corpus
dictionary = corpora.Dictionary(texts)
corpus = [dictionary.doc2bow(text) for text in texts]
# Train LDA model
lda_model = models.LdaModel(
corpus=corpus,
id2word=dictionary,
num_topics=num_topics,
random_state=42,
passes=10
)
# Gets the Gensim clusters
gensim_distributions = get_gensim_distributions(
lda_model,
corpus,
lda_model.num_topics
)
gensim_clusters = np.argmax(gensim_distributions, axis=1)
# Convert corpus to document-term matrix for evaluation
doc_term_matrix = corpus2dense(corpus, num_terms=len(dictionary)).T
# Evaluate using document-term matrix
print("\nClustering Metrics for GENSIM:")
#evaluate_clusters(clusterer=lda_model, texts=articles, labels=gensim_clusters, topic_distributions=gensim_distributions)
evaluate_clusters(clusterer=lda_model, texts=articles, labels=gensim_clusters, topic_distributions=doc_term_matrix)
return lda_model, corpus, dictionary, gensim_distributions
# Get topic distributions for Gensim
def get_gensim_distributions(model, corpus, num_topics):
# Initialize distribution matrix
gensim_distributions = np.zeros((len(corpus), num_topics))
# Get topic distributions for each document
for i, doc in enumerate(corpus):
doc_topics = model.get_document_topics(doc, minimum_probability=0)
for topic_id, prob in doc_topics:
gensim_distributions[i, topic_id] = prob
# Normalize distributions
row_sums = gensim_distributions.sum(axis=1)
gensim_distributions = gensim_distributions / row_sums[:, np.newaxis]
return gensim_distributions
def print_topic_articles(articles, topic_distributions, n_samples=3, threshold=0.5):
"""
Print sample articles most strongly associated with each topic
Parameters:
- articles: list of article texts
- topic_distributions: array of shape (n_articles, n_topics)
- n_samples: number of sample articles to print per topic
- threshold: minimum topic probability to consider
"""
n_topics = topic_distributions.shape[1]
for topic_idx in range(n_topics):
print(f"\nTopic {topic_idx + 1} Sample Articles:")
print("-" * 80)
# Get articles most strongly associated with this topic
topic_probs = topic_distributions[:, topic_idx]
top_article_indices = np.argsort(topic_probs)[-n_samples:][::-1]
for idx, article_idx in enumerate(top_article_indices):
prob = topic_probs[article_idx]
if prob >= threshold:
# Print first 200 characters of the article
print(f"\nArticle {idx + 1} (Topic probability: {prob:.3f}):")
print(articles[article_idx][:200] + "...")
print("-" * 40)
# Example usage
def analyze_articles(articles):
t0 = time()
# 1. Topic Modeling using LDA
print("Extracting topics using LDA...")
lda_topics, article_topics_lda = extract_topics_lda(articles)
print("done in %0.3fs." % (time() - t0))
t0 = time()
# 2. Topic Modeling using NMF
print("Extracting topics using NMF...")
nmf_topics, article_topics_nmf = extract_topics_nmf(articles)
print("done in %0.3fs." % (time() - t0))
# 3. Clustering
print("Clustering articles...")
cluster_labels = cluster_articles(articles)
# 4. Keyword Extraction
print("Extracting keywords...")
article_keywords = extract_keywords(articles)
t0 = time()
# 5. Topic Modeling using Gensim
print("Performing topic modeling with Gensim...")
lda_model, corpus, dictionary, article_topics_gensim = topic_modeling_gensim(articles)
print("done in %0.3fs." % (time() - t0))
# Create results dictionary
results = {
'lda_topics': lda_topics,
'article_topics_lda': article_topics_lda,
'nmf_topics': nmf_topics,
'article_topics_nmf': article_topics_nmf,
'cluster_labels': cluster_labels,
'keywords': article_keywords,
'gensim_model': lda_model,
'gensim_corpus': corpus,
'gensim_dictionary': dictionary,
'article_topics_gensim': article_topics_gensim
}
return results
if __name__ == "__main__":
# Process and analyze articles using LDA, NMF and GENSIM
results = analyze_articles(articles)
# Gets the LDA clusters
lda_clusters = np.argmax(results['article_topics_lda'], axis=1)
#visualize_cluster_separation(results['article_topics_lda'], lda_clusters, "LDA")
visualize_cluster_separation_ternary(results['article_topics_lda'], lda_clusters, "LDA")
visualize_cluster_separation_histogram(results['article_topics_lda'], lda_clusters, "LDA")
# Gets the NMF clusters
nmf_clusters = np.argmax(results['article_topics_nmf'], axis=1)
visualize_cluster_separation(results['article_topics_nmf'], nmf_clusters, "NMF")
# Gets the GENSIM clusters
gensim_clusters = np.argmax(results['article_topics_gensim'], axis=1)
visualize_cluster_separation(results['article_topics_gensim'], gensim_clusters, "Gensim")
t0 = time()
# Use simple TF-IDF based clustering
print("Running TF-IDF based clustering...")
tfidf_clusters = demonstrate_clustering() # Get TF-IDF results
print("done in %0.3fs." % (time() - t0))
t0 = time()
# Use BERT based clustering
print("\nRunning BERT-based clustering...")
bert_clusters = demonstrate_bert_clustering() # Get BERT results
print("done in %0.3fs." % (time() - t0))
# Compare the results using adjusted RAND index
clustering_comparisons = compare_clustering_methods(
tfidf_clusters,
bert_clusters,
lda_clusters,
nmf_clusters,
gensim_clusters
)
# LDA Analysis
print("\nLDA Topics:")
for idx, topic in enumerate(results['lda_topics']):
print(f"LDA Topic {idx + 1}: {', '.join(topic)}")
print("\nLDA Sample Articles:")
print_topic_articles(articles, results['article_topics_lda'])
# NMF Analysis
print("\nNMF Topics:")
for idx, topic in enumerate(results['nmf_topics']):
print(f"NMF Topic {idx + 1}: {', '.join(topic)}")
print("\nNMF Sample Articles:")
print_topic_articles(articles, results['article_topics_nmf'])
# Gensim Analysis
print("\nGensim Topics:")
for topic_id in range(results['gensim_model'].num_topics):
topic_terms = results['gensim_model'].show_topic(topic_id, 10)
terms = [term for term, _ in topic_terms]
print(f"Gensim Topic {topic_id + 1}: {', '.join(terms)}")
print("\nGensim Sample Articles:")
print_topic_articles(articles, results['article_topics_gensim'])
print("\nCluster Distribution:")
unique_labels, counts = np.unique(results['cluster_labels'], return_counts=True)
for label, count in zip(unique_labels, counts):
print(f"Cluster {label}: {count} articles")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment