Skip to content

Instantly share code, notes, and snippets.

@cnmoro
Created June 21, 2024 18:51
Show Gist options
  • Save cnmoro/fd7d297465d66ff9ba3978ee1fe158cf to your computer and use it in GitHub Desktop.
Save cnmoro/fd7d297465d66ff9ba3978ee1fe158cf to your computer and use it in GitHub Desktop.
Semantic Chunking & Compressing
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation
from minivectordb.embedding_model import EmbeddingModel
from sklearn.metrics.pairwise import cosine_similarity
import tiktoken, nltk, numpy as np, fasttext, base64
from nltk.tokenize import sent_tokenize
from nltk.corpus import stopwords
nltk.download('punkt')
nltk.download('stopwords')
embedding_model = EmbeddingModel()
gpt_encoding = tiktoken.encoding_for_model("gpt-4")
portuguese_stopwords = list(set(stopwords.words('portuguese')))
english_stopwords = list(set(stopwords.words('english')))
langdetect_model = fasttext.load_model('lid.176.ftz')
def detect_language(text):
detected_lang = langdetect_model.predict(text.replace('\n', ' '), k=1)[0][0]
return 'pt' if (str(detected_lang) == '__label__pt' or str(detected_lang) == 'portuguese') else 'en'
def advanced_semantic_chunk_text(full_text, tokens_per_chunk=250, num_topics=5):
def calculate_similarity(embed1, embed2):
return cosine_similarity([embed1], [embed2])[0][0]
def create_lda_model(texts, stopwords):
vectorizer = CountVectorizer(max_df=0.95, min_df=2, stop_words=stopwords)
doc_term_matrix = vectorizer.fit_transform(texts)
lda = LatentDirichletAllocation(n_components=num_topics, random_state=42)
lda.fit(doc_term_matrix)
return lda, vectorizer
def get_topic_distribution(text, lda, vectorizer):
vec = vectorizer.transform([text])
return lda.transform(vec)[0]
# Split the text into sentences
sentences = sent_tokenize(full_text)
# Create initial chunks based on token count
chunks = []
current_chunk = []
current_chunk_length = 0
for sentence in sentences:
sentence_tokens = gpt_encoding.encode(sentence)
if current_chunk_length + len(sentence_tokens) > tokens_per_chunk and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = []
current_chunk_length = 0
current_chunk.append(sentence)
current_chunk_length += len(sentence_tokens)
if current_chunk:
chunks.append(' '.join(current_chunk))
# Create LDA model
text_lang = detect_language(full_text)
lda_model, vectorizer = create_lda_model(chunks, portuguese_stopwords if text_lang == 'pt' else english_stopwords)
# Optimize chunks
optimized_chunks = []
current_chunk = chunks[0]
current_embedding = embedding_model.extract_embeddings(current_chunk)
current_topic_dist = get_topic_distribution(current_chunk, lda_model, vectorizer)
for next_chunk in chunks[1:]:
next_embedding = embedding_model.extract_embeddings(next_chunk)
next_topic_dist = get_topic_distribution(next_chunk, lda_model, vectorizer)
# Calculate semantic similarity
similarity = calculate_similarity(current_embedding, next_embedding)
# Calculate topic similarity
topic_similarity = cosine_similarity([current_topic_dist], [next_topic_dist])[0][0]
# Combine semantic and topic similarity
combined_similarity = (similarity + topic_similarity) / 2
if combined_similarity > 0.6:
# Merge chunks
current_chunk += " " + next_chunk
current_embedding = embedding_model.extract_embeddings(current_chunk)
current_topic_dist = get_topic_distribution(current_chunk, lda_model, vectorizer)
else:
optimized_chunks.append(current_chunk)
current_chunk = next_chunk
current_embedding = next_embedding
current_topic_dist = next_topic_dist
optimized_chunks.append(current_chunk)
return optimized_chunks
def semantic_compress_text(full_text, compression_rate=0.7, num_topics=5):
def calculate_similarity(embed1, embed2):
return cosine_similarity([embed1], [embed2])[0][0]
def create_lda_model(texts, stopwords):
vectorizer = CountVectorizer(max_df=0.95, min_df=2, stop_words=stopwords)
doc_term_matrix = vectorizer.fit_transform(texts)
lda = LatentDirichletAllocation(n_components=num_topics, random_state=42)
lda.fit(doc_term_matrix)
return lda, vectorizer
def get_topic_distribution(text, lda, vectorizer):
vec = vectorizer.transform([text])
return lda.transform(vec)[0]
def sentence_importance(sentence, doc_embedding, lda_model, vectorizer, stopwords):
sentence_embedding = embedding_model.extract_embeddings(sentence)
semantic_similarity = calculate_similarity(doc_embedding, sentence_embedding)
topic_dist = get_topic_distribution(sentence, lda_model, vectorizer)
topic_importance = np.max(topic_dist)
# Calculate lexical diversity
words = sentence.split()
unique_words = set([word.lower() for word in words if word.lower() not in stopwords])
lexical_diversity = len(unique_words) / len(words) if words else 0
# Combine factors (you can adjust weights as needed)
importance = (0.4 * semantic_similarity) + (0.4 * topic_importance) + (0.2 * lexical_diversity)
return importance
# Split the text into sentences
sentences = sent_tokenize(full_text)
text_lang = detect_language(full_text)
# Create LDA model
lda_model, vectorizer = create_lda_model(sentences, portuguese_stopwords if text_lang == 'pt' else english_stopwords)
# Get document-level embedding
doc_embedding = embedding_model.extract_embeddings(full_text)
# Calculate importance for each sentence
sentence_scores = [(sentence, sentence_importance(sentence, doc_embedding, lda_model, vectorizer, portuguese_stopwords if text_lang == 'pt' else english_stopwords))
for sentence in sentences]
# Sort sentences by importance
sorted_sentences = sorted(sentence_scores, key=lambda x: x[1], reverse=True)
# Determine how many words to keep
total_words = sum(len(sentence.split()) for sentence in sentences)
target_words = int(total_words * compression_rate)
# Reconstruct the compressed text
compressed_text = []
current_words = 0
for sentence, _ in sorted_sentences:
sentence_words = len(sentence.split())
if current_words + sentence_words <= target_words:
compressed_text.append(sentence)
current_words += sentence_words
else:
break
# Reorder sentences to maintain original flow
compressed_text.sort(key=lambda x: sentences.index(x))
return ' '.join(compressed_text)
# Example usage
full_text = "Your long text here..."
chunks = advanced_semantic_chunk_text(full_text, tokens_per_chunk=100)
compression_rate = 0.3 # 30% compression
compression_rate = 1 - compression_rate
compressed_text = semantic_compress_text(full_text, compression_rate = compression_rate)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment