Created
September 12, 2017 14:58
-
-
Save pavlin-policar/e6728b2ca1a6559a53444dc5a261b4bb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import reduce | |
from typing import Iterable, Dict, List, Union, Optional | |
from collections import Counter | |
import numpy as np | |
from data_provider import get_talks | |
class TokenizedDocument(Counter): | |
def __init__(self, items: Union[str, List[str]], *args, **kwargs): | |
if isinstance(items, str): | |
items = items.lower().split(' ') | |
super().__init__(items, *args, **kwargs) | |
self.n_words = len(items) | |
self.__max_term_frequency = None | |
def boolean_term_frequency(self, term: str) -> float: | |
return float(self[term] > 0) | |
def term_frequency(self, term: str) -> float: | |
return self[term] | |
def relative_term_frequency(self, term: str) -> float: | |
return self[term] / self.n_words | |
def sublinear_tf_scaling(self, term: str) -> float: | |
return 1 + np.log(self[term]) if self[term] else 0 | |
def maximum_tf_normalization(self, term: str, k: Optional[float] = 0.4) -> float: | |
if self.__max_term_frequency is None: | |
self.__max_term_frequency = max(map(self.term_frequency, self.keys())) | |
return k + ((1 - k) * self.term_frequency(term) / self.__max_term_frequency) | |
def jaccard_similarity(collection_1: Iterable[str], collection_2: Iterable[str]) -> float: | |
collection_1, collection_2 = set(collection_1), set(collection_2) | |
return len(collection_1 & collection_2) / len(collection_1 | collection_2) | |
def inverse_document_frequencies( | |
tokenized_documents: List[TokenizedDocument], | |
) -> Dict[str, float]: | |
idf_values = {} | |
all_tokens = reduce(lambda acc, doc: acc | set(doc), tokenized_documents, set()) | |
n_documents = len(tokenized_documents) | |
for token in all_tokens: | |
contains_tokens = map(lambda doc: token in doc, tokenized_documents) | |
idf_values[token] = 1 + np.log(n_documents / (1 + sum(contains_tokens))) | |
return idf_values | |
def tfidf(documents: List[str]) -> np.ndarray: | |
tokenized_documents = [TokenizedDocument(document) for document in documents] | |
idf = inverse_document_frequencies(tokenized_documents) | |
return np.array([ | |
[document.sublinear_tf_scaling(term) * idf[term] for term in idf.keys()] | |
for document in tokenized_documents | |
], dtype=np.float) | |
def pairwise_cosine_similarity(matrix: np.ndarray) -> float: | |
"""Compute the pariwise cosine similarity of the rows in a given matrix. | |
References | |
---------- | |
https://stackoverflow.com/questions/17627219/whats-the-fastest-way-in-python-to-calculate-cosine-similarity-given-sparse-mat | |
""" | |
# Compute the base similarity | |
similarity = matrix.dot(matrix.T) | |
# Compute the squared magnitude of preferrence vectors | |
square_magnitude = np.diag(similarity) | |
inverse_squared_magnitue = 1 / square_magnitude | |
inverse_squared_magnitue[np.isinf(inverse_squared_magnitue)] = 0 | |
inverse_magnitude = np.sqrt(inverse_squared_magnitue) | |
cosine = similarity * inverse_magnitude | |
cosine = cosine.T * inverse_magnitude | |
return cosine | |
data = get_talks() | |
documents = data['transcript'].iloc[:300].tolist() | |
embeddings = tfidf(documents) | |
print(pairwise_cosine_similarity(embeddings)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment