Skip to content

Instantly share code, notes, and snippets.

@ryderwishart
Created July 8, 2024 20:32
Show Gist options
  • Save ryderwishart/225f6c614f7c43d7878fbe8f075dd171 to your computer and use it in GitHub Desktop.
Save ryderwishart/225f6c614f7c43d7878fbe8f075dd171 to your computer and use it in GitHub Desktop.
Statistical gloss predictions - Markov Chain Monte Carlo
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Markov Chain Monte Carlo Glossing Tool"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Corpus files already exist. Loading from disk...\n",
"Training on 31055 sentence pairs\n",
"Number of stop words: 100\n",
"Top 10 stop words: ['la', 'son', 'this', 'has', 'des', 'sa', 'he', 'which', 'n', 'à']\n",
"Source sentence: The big cat loves to eat pizza on the mat\n",
"Target sentence: Le gros chat aime manger de la pizza sur le tapis\n",
"\n",
"Debugging information:\n",
"Source tokens: ['big', 'cat', 'loves', 'eat', 'pizza', 'mat']\n",
"Target tokens: ['gros', 'chat', 'aime', 'manger', 'pizza', 'tapis']\n",
"Source n-grams: ['big cat loves', 'cat loves eat', 'loves eat pizza', 'eat pizza mat']\n",
"Target n-grams: ['gros chat aime', 'chat aime manger', 'aime manger pizza', 'manger pizza tapis']\n",
"\n",
"Alignments:\n",
"big cat loves -> [('gros chat aime', 10021.631306160578), ('chat aime manger', 7516.223479620434), ('aime manger pizza', 5010.815653080289)]\n",
"cat loves eat -> [('chat aime manger', 10021.631306160578), ('gros chat aime', 7516.223479620434), ('aime manger pizza', 7516.223479620434)]\n",
"loves eat pizza -> [('aime manger pizza', 10021.631306160578), ('chat aime manger', 7516.223479620434), ('manger pizza tapis', 7516.223479620434)]\n",
"eat pizza mat -> [('manger pizza tapis', 10021.631306160578), ('aime manger pizza', 7516.223479620434), ('chat aime manger', 5010.815653080289)]\n"
]
}
],
"source": [
"from collections import defaultdict, Counter\n",
"import re\n",
"import requests\n",
"import os\n",
"import math\n",
"import numpy as np\n",
"from typing import List, Tuple, Dict, Set\n",
"\n",
"def download_corpus(url, filename):\n",
" if not os.path.exists(filename):\n",
" print(f\"Downloading {filename}...\")\n",
" response = requests.get(url)\n",
" with open(filename, 'w', encoding='utf-8') as f:\n",
" f.write(response.text)\n",
" \n",
" with open(filename, 'r', encoding='utf-8') as f:\n",
" return f.read().split('\\n')\n",
"\n",
"class ImprovedStatisticalGlosser:\n",
" def __init__(self, n=3):\n",
" self.n = n\n",
" self.co_occurrences = defaultdict(lambda: defaultdict(int))\n",
" self.source_counts = defaultdict(int)\n",
" self.target_counts = defaultdict(int)\n",
" self.source_doc_freq = defaultdict(int)\n",
" self.target_doc_freq = defaultdict(int)\n",
" self.total_docs = 0\n",
" self.stop_words: Set[str] = set()\n",
"\n",
" def train(self, source_sentences: List[str], target_sentences: List[str]):\n",
" # Calculate stop words before training\n",
" self.calculate_stop_words(source_sentences + target_sentences)\n",
" \n",
" self.total_docs = len(source_sentences)\n",
" for idx, (source, target) in enumerate(zip(source_sentences, target_sentences)):\n",
" source_tokens = self.tokenize(source)\n",
" target_tokens = self.tokenize(target)\n",
" \n",
" source_ngrams = self.get_ngrams(source_tokens)\n",
" target_ngrams = self.get_ngrams(target_tokens)\n",
" \n",
" source_set = set(source_ngrams)\n",
" target_set = set(target_ngrams)\n",
" \n",
" for s_ngram in source_ngrams:\n",
" for t_ngram in target_ngrams:\n",
" self.co_occurrences[s_ngram][t_ngram] += 1\n",
" self.source_counts[s_ngram] += 1\n",
" \n",
" for t_ngram in target_ngrams:\n",
" self.target_counts[t_ngram] += 1\n",
" \n",
" for s_ngram in source_set:\n",
" self.source_doc_freq[s_ngram] += 1\n",
" for t_ngram in target_set:\n",
" self.target_doc_freq[t_ngram] += 1\n",
"\n",
" def calculate_stop_words(self, sentences: List[str], max_stop_words: int = 100):\n",
" word_counts = Counter(word for sentence in sentences for word in self.tokenize(sentence))\n",
" sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)\n",
" \n",
" # Calculate the elbow point\n",
" x = np.arange(1, len(sorted_words) + 1)\n",
" y = np.array([count for _, count in sorted_words])\n",
" \n",
" # Calculate the angle between consecutive points\n",
" angles = np.diff(np.arctan2(np.diff(y), np.diff(x)))\n",
" elbow_index = np.argmax(angles) + 1\n",
" \n",
" # Use the elbow point or max_stop_words, whichever is smaller\n",
" num_stop_words = min(elbow_index, max_stop_words)\n",
" self.stop_words = set(word for word, _ in sorted_words[:num_stop_words])\n",
"\n",
" def tokenize(self, sentence: str) -> List[str]:\n",
" tokens = re.findall(r'\\w+', sentence.lower())\n",
" return [token for token in tokens if token not in self.stop_words]\n",
"\n",
" def gloss(self, source_sentence, target_sentence):\n",
" source_tokens = self.tokenize(source_sentence)\n",
" target_tokens = self.tokenize(target_sentence)\n",
" \n",
" source_ngrams = self.get_ngrams(source_tokens)\n",
" target_ngrams = self.get_ngrams(target_tokens)\n",
" \n",
" mappings = []\n",
" \n",
" for i, s_ngram in enumerate(source_ngrams):\n",
" ngram_mappings = []\n",
" for j, t_ngram in enumerate(target_ngrams):\n",
" score = self.calculate_score(s_ngram, t_ngram, i, j, len(source_ngrams), len(target_ngrams))\n",
" if score > 0:\n",
" ngram_mappings.append((t_ngram, score))\n",
" \n",
" if not ngram_mappings: # If no n-gram matches, try individual tokens\n",
" s_tokens = s_ngram.split()\n",
" for s_token in s_tokens:\n",
" for j, t_ngram in enumerate(target_ngrams):\n",
" score = self.calculate_score(s_token, t_ngram, i, j, len(source_ngrams), len(target_ngrams))\n",
" if score > 0:\n",
" ngram_mappings.append((t_ngram, score))\n",
" \n",
" ngram_mappings.sort(key=lambda x: x[1], reverse=True)\n",
" mappings.append((s_ngram, ngram_mappings[:3])) # Keep top 3 mappings\n",
" \n",
" return mappings\n",
"\n",
" def calculate_score(self, source_ngram, target_ngram, source_pos, target_pos, source_len, target_len):\n",
" epsilon = 1e-10 # Smoothing factor\n",
" \n",
" co_occur = sum(self.co_occurrences[s_token][t_token] for s_token in source_ngram.split() for t_token in target_ngram.split())\n",
" co_occur += epsilon # Add smoothing\n",
" \n",
" source_count = sum(self.source_counts[s_token] for s_token in source_ngram.split()) + epsilon\n",
" target_count = sum(self.target_counts[t_token] for t_token in target_ngram.split()) + epsilon\n",
" \n",
" source_idf = sum(math.log((self.total_docs + epsilon) / (self.source_doc_freq[s_token] + epsilon)) for s_token in source_ngram.split())\n",
" target_idf = sum(math.log((self.total_docs + epsilon) / (self.target_doc_freq[t_token] + epsilon)) for t_token in target_ngram.split())\n",
" \n",
" tfidf_score = (co_occur / source_count) * source_idf * (co_occur / target_count) * target_idf\n",
" \n",
" position_score = 1 - abs((source_pos / source_len) - (target_pos / target_len))\n",
" \n",
" return tfidf_score * position_score\n",
"\n",
" def reassemble_alignments(self, mappings, source_tokens, target_tokens):\n",
" final_alignments = []\n",
" covered_source = set()\n",
" covered_target = set()\n",
" \n",
" for s_ngram, t_mappings in mappings:\n",
" if not t_mappings:\n",
" continue\n",
" \n",
" best_t_ngram, score = t_mappings[0]\n",
" s_indices = self.find_ngram_indices(source_tokens, s_ngram)\n",
" t_indices = self.find_ngram_indices(target_tokens, best_t_ngram)\n",
" \n",
" if not (set(s_indices) & covered_source) and not (set(t_indices) & covered_target):\n",
" final_alignments.append((s_ngram, best_t_ngram, score))\n",
" covered_source.update(s_indices)\n",
" covered_target.update(t_indices)\n",
" \n",
" # Handle unaligned tokens\n",
" for i, token in enumerate(source_tokens):\n",
" if i not in covered_source:\n",
" final_alignments.append((token, \"\", 0))\n",
" \n",
" final_alignments.sort(key=lambda x: source_tokens.index(x[0].split()[0]))\n",
" return final_alignments\n",
"\n",
" def get_ngrams(self, tokens):\n",
" return [' '.join(tokens[i:i+self.n]) for i in range(len(tokens)-self.n+1)]\n",
"\n",
" @staticmethod\n",
" def find_ngram_indices(tokens, ngram):\n",
" ngram_tokens = ngram.split()\n",
" return list(range(tokens.index(ngram_tokens[0]), tokens.index(ngram_tokens[0]) + len(ngram_tokens)))\n",
"\n",
"# Test the implementation\n",
"if __name__ == \"__main__\":\n",
" # Define corpus URLs and filenames\n",
" french_url = \"https://github.com/BibleNLP/ebible/raw/main/corpus/fra-fraLSG.txt\"\n",
" english_url = \"https://github.com/BibleNLP/ebible/raw/main/corpus/eng-eng-web.txt\"\n",
" french_filename = \"french_corpus.txt\"\n",
" english_filename = \"english_corpus.txt\"\n",
"\n",
" # Check if corpus files already exist\n",
" if os.path.exists(french_filename) and os.path.exists(english_filename):\n",
" print(\"Corpus files already exist. Loading from disk...\")\n",
" with open(french_filename, 'r', encoding='utf-8') as f:\n",
" french_corpus = f.readlines()\n",
" with open(english_filename, 'r', encoding='utf-8') as f:\n",
" english_corpus = f.readlines()\n",
" else:\n",
" print(\"Downloading corpus files...\")\n",
" french_corpus = download_corpus(french_url, french_filename)\n",
" english_corpus = download_corpus(english_url, english_filename)\n",
"\n",
" # Remove empty lines and ensure corpora are aligned\n",
" french_corpus = [line.strip() for line in french_corpus if line.strip()]\n",
" english_corpus = [line.strip() for line in english_corpus if line.strip()]\n",
" min_length = min(len(french_corpus), len(english_corpus))\n",
" french_corpus = french_corpus[:min_length]\n",
" english_corpus = english_corpus[:min_length]\n",
"\n",
" print(f\"Training on {len(french_corpus)} sentence pairs\")\n",
"\n",
" # Create and train the glosser\n",
" glosser = ImprovedStatisticalGlosser()\n",
" \n",
" glosser.train(english_corpus, french_corpus)\n",
" \n",
" print(f\"Number of stop words: {len(glosser.stop_words)}\")\n",
" print(f\"Top 10 stop words: {list(glosser.stop_words)[:10]}\")\n",
" \n",
" # Test the glosser with a French sentence\n",
" source_test = \"The big cat loves to eat pizza on the mat\"\n",
" target_test = \"Le gros chat aime manger de la pizza sur le tapis\"\n",
" \n",
" print(\"Source sentence:\", source_test)\n",
" print(\"Target sentence:\", target_test)\n",
" \n",
" # Add debugging statements\n",
" print(\"\\nDebugging information:\")\n",
" source_tokens = glosser.tokenize(source_test)\n",
" target_tokens = glosser.tokenize(target_test)\n",
" print(\"Source tokens:\", source_tokens)\n",
" print(\"Target tokens:\", target_tokens)\n",
" \n",
" source_ngrams = glosser.get_ngrams(source_tokens)\n",
" target_ngrams = glosser.get_ngrams(target_tokens)\n",
" print(\"Source n-grams:\", source_ngrams)\n",
" print(\"Target n-grams:\", target_ngrams)\n",
" alignments = glosser.gloss(source_test, target_test)\n",
" print(\"\\nAlignments:\")\n",
" \n",
" # Group alignments by source phrase\n",
" grouped_alignments = {}\n",
" for alignment in alignments:\n",
" if len(alignment) == 3:\n",
" source, target, score = alignment\n",
" if isinstance(source, tuple):\n",
" source = ' '.join(source) if isinstance(source[0], str) else ' '.join([' '.join(s) for s in source])\n",
" if isinstance(target, tuple):\n",
" target = ' '.join(target) if isinstance(target[0], str) else ' '.join([' '.join(t) for t in target])\n",
" \n",
" if source not in grouped_alignments:\n",
" grouped_alignments[source] = []\n",
" grouped_alignments[source].append((target, score))\n",
" elif len(alignment) == 2:\n",
" source, target = alignment\n",
" if isinstance(source, tuple):\n",
" source = ' '.join(source) if isinstance(source[0], str) else ' '.join([' '.join(s) for s in source])\n",
" if isinstance(target, tuple):\n",
" target = ' '.join(target) if isinstance(target[0], str) else ' '.join([' '.join(t) for t in target])\n",
" \n",
" if source not in grouped_alignments:\n",
" grouped_alignments[source] = []\n",
" grouped_alignments[source].append((target, None))\n",
" else:\n",
" print(f\"Unexpected alignment format: {alignment}\")\n",
" \n",
" # Print grouped alignments\n",
" for source, targets in grouped_alignments.items():\n",
" if len(targets) == 1:\n",
" target, score = targets[0]\n",
" if score is not None:\n",
" print(f\"{source} -> {target} (Score: {score:.4f})\")\n",
" else:\n",
" print(f\"{source} -> {target}\")\n",
" else:\n",
" print(f\"{source} -> {targets}\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment