Last active
March 31, 2023 19:07
-
-
Save mikob/24a908471e38370f40d302b1cb1b41fb to your computer and use it in GitHub Desktop.
Bulk download audio pronunciations for Anki from Forvo and/or JapanesePod101. It will automatically update your anki deck with the populated the audio field.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Update 10/4/2021 | |
* Download audio for the longest word in Spanish. | |
Update 9/22/2020 | |
* Allow looking up through multiple lang codes by priority order on Forvo (eg. es_LATAM, es) | |
* Strip nbsp; when looking up words on Forvo | |
Update 9/5/2020 | |
* Fixed to work with newer Anki versions 2.1.33+ (possibly earlier) | |
--- | |
Make sure to adjust BACKUP_LOC to a directory of your choosing. | |
You will need to adjust CARD_TYPE and possibly DECK_NAME or create your own class that extends | |
AudioDownloader for other languages. | |
Adjust get_idxs and get_modded_fields_for_card as needed. | |
You might need to adjust the fields for kana and kanji for Japanese. | |
''' | |
import sqlite3 | |
from abc import ABC, abstractmethod | |
from hashlib import sha1 | |
# from anki.utils import fieldChecksum | |
from datetime import datetime | |
import re | |
import base64 | |
from functools import reduce | |
import sys | |
import shutil | |
import time | |
import requests | |
import json | |
import os | |
import click | |
BACKUP_LOC = '/home/mikob/.local/share/Anki2/Miko/custom-backups/' | |
CARD_FIELD_SEPARATOR = '\x1f' | |
FAKE_BROWSER_HEADERS = { | |
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3", | |
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36", | |
"accept-language": "en-US,en;q=0.9,ja;q=0.8", | |
"cache-control": "no-cache", | |
"pragma": "no-cache", | |
"sec-fetch-mode": | |
"navigate", | |
"sec-fetch-site": "none", | |
"sec-fetch-user": "?1", | |
"upgrade-insecure-requests": "1" | |
} | |
unicode_regx = re.compile('[\x00-\x7F]') | |
word_sep_regx = re.compile('[\[\].,0-9a-zA-Z`\/「」、。:;0-9ー()()\s].') | |
def strip_special_chars(s): | |
return re.sub('[?:;/\\\"<>\.]+', '', s) | |
class AudioDownloader(ABC): | |
stats = { | |
'not_found': 0, | |
'found': 0, | |
'existing_sound': 0, | |
'invalid_audio': 0, | |
} | |
media_loc = None | |
DECK_NAME = '' | |
def __init__(self, dry, anki_user_folder_loc, last_mod): | |
db_loc = os.path.join(anki_user_folder_loc, 'collection.anki2') | |
self.media_loc = os.path.join(anki_user_folder_loc, 'collection.media') | |
backup_loc = os.path.join( | |
BACKUP_LOC, 'anki_backup_%s.anki2' % datetime.now()) | |
shutil.copy(db_loc, backup_loc) | |
self.log("Created backup: %s" % backup_loc) | |
conn = sqlite3.connect(db_loc) | |
cur = conn.cursor() | |
mod_time = int(time.time()) - last_mod | |
cur.execute('SELECT id,name FROM notetypes') | |
note_types = cur.fetchall() | |
# find deck_id | |
deck_id = None | |
if self.DECK_NAME: | |
cur.execute('SELECT id,name FROM decks') | |
decks = cur.fetchall() | |
# get deck ids by name | |
for deck in decks: | |
_deck_id = deck[0] | |
deck_name = deck[1] | |
if deck_name == self.DECK_NAME: | |
deck_id = _deck_id | |
break | |
if not deck_id: | |
self.log( | |
f'Could not find deck {self.DECK_NAME}. Continuing with all decks...') | |
else: | |
self.log(f'{self.DECK_NAME} deck id: {deck_id}') | |
cards = [] | |
for note_type in note_types: | |
note_id = note_type[0] | |
note_name = note_type[1] | |
if note_name == self.CARD_TYPE: | |
cards = self.get_cards(cur, note_id, mod_time, deck_id) | |
cur.execute( | |
'SELECT ord,name,ntid FROM fields WHERE ntid=?', (int(note_id),)) | |
note_fields = cur.fetchall() | |
idxs = self.get_idxs(note_fields) | |
if len(idxs) == 0: | |
raise AssertionError("did not get any idxs") | |
break | |
# to restrict doing each note just once | |
note_set = set() | |
for card in cards: | |
id = card[0] | |
nid = card[1] | |
if not (nid in note_set): | |
note_set.add(nid) | |
modded_note_fields = self.get_modded_fields_for_card( | |
card, idxs) | |
if modded_note_fields is not None and len(modded_note_fields) > 0: | |
self.write_back(cur, id, modded_note_fields) | |
# csum = fieldChecksum(new_fields[0]) | |
if not dry: | |
conn.commit() | |
conn.close() | |
self.log("Finished!\n\n%s" % self.stats) | |
''' | |
Get the cards that don't have audio and have been updated after the | |
mod time. | |
mid = note type id | |
''' | |
def get_cards(self, cur, note_type_id, mod_time, deck_id=None): | |
q = 'SELECT * FROM notes WHERE mid=? AND mod>?' | |
qa = (int(note_type_id), mod_time) | |
if deck_id: | |
q = 'SELECT * FROM notes INNER JOIN cards ON notes.id=cards.nid WHERE notes.mid=? AND notes.mod>? AND cards.did=?' | |
qa += (deck_id,) | |
cur.execute(q, qa) | |
cards = cur.fetchall() | |
self.log("Found %d cards" % (len(cards))) | |
return cards | |
def write_back(self, cur, id, modded_note_fields): | |
try: | |
modded_fields_str = str.join( | |
CARD_FIELD_SEPARATOR, modded_note_fields) | |
mod = int(time.time()) | |
# need to set update sequence number (usn) to -1 to tell server we have updates | |
cur.execute("UPDATE notes SET flds=?,mod=?,usn=? WHERE id=?", | |
(modded_fields_str, mod, -1, id)) | |
except Exception as e: | |
self.log("Error with card %s" % e) | |
def log(self, *msg): | |
print(self.CARD_TYPE, *msg) | |
''' | |
note_fields is a tuple with _ord (field idx), name, and note_type_id | |
Returns a tuple of idxs that get_modded_fields_for_card can use. | |
''' | |
@abstractmethod | |
def get_idxs(self, note_fields): | |
pass | |
''' | |
Takes idxs from get_idxs | |
''' | |
@abstractmethod | |
def get_modded_fields_for_card(self, cards, idxs): | |
pass | |
class ForvoDownloader(AudioDownloader): | |
CARD_TYPE = 'WAudio' | |
def strip_bad_chars(self, word): | |
return word.replace(' ', '') | |
def get_forvo_pronunciation(self, longest_word, processed_word): | |
for lang_code in self.LANG_CODES: | |
webPageUrl = "https://forvo.com/search/%s/%s/" % ( | |
longest_word, lang_code) | |
# s = requests.Session() | |
# res = s.get('https://forvo.com') | |
# cookies = dict(res.cookies) | |
webPageText = requests.get( | |
webPageUrl, headers=FAKE_BROWSER_HEADERS).text | |
pageTextList = re.findall( | |
"<article class=\"search_words.*?</article>", webPageText, re.DOTALL) | |
if len(pageTextList) == 0: | |
continue | |
# first result might be search_words empty | |
pageText = pageTextList[-1] | |
pronunciations = re.findall("Play\(\d+,'(.*?)'", pageText) | |
if pronunciations: | |
for l in range(len(pronunciations)): | |
pronunciations[l] = base64.b64decode( | |
pronunciations[l]).decode() | |
words = '"%s"' % longest_word | |
if processed_word and longest_word != processed_word: | |
words = '"%s" ("%s")' % (longest_word, processed_word) | |
self.log('Found %d pronunciations for %s' % | |
(len(pronunciations), words)) | |
return 'https://forvo.com/mp3/%s' % pronunciations[0] | |
def get_idxs(self, note_fields): | |
audio_idx = None | |
for _ord, name, note_type_id in note_fields: | |
if name == 'Audio': | |
return (_ord,) | |
return () | |
def get_modded_fields_for_card(self, card, idxs): | |
audio_idx = idxs[0] | |
front_field_idx = 0 | |
fields = card[6].split(CARD_FIELD_SEPARATOR) | |
existing_sound = fields[audio_idx].strip() | |
if existing_sound != '': | |
self.stats['existing_sound'] += 1 | |
return | |
processed_word = self.strip_bad_chars(fields[front_field_idx]) | |
longest_word = reduce(lambda memo, word: word if len( | |
word) > len(memo) else memo, processed_word.split(' '), '') | |
audio_url = self.get_forvo_pronunciation(longest_word, processed_word) | |
if audio_url: | |
try: | |
r = requests.get(audio_url, headers=FAKE_BROWSER_HEADERS) | |
if r.headers['content-type'] != 'audio/mpeg': | |
self.stats['invalid_audio'] += 1 | |
return | |
self.stats['found'] += 1 | |
sound_file_name = '%s_%s.mp3' % ( | |
strip_special_chars(fields[front_field_idx]), 'forvo') | |
sound_file_path = os.path.join(self.media_loc, sound_file_name) | |
with open(sound_file_path, 'wb') as f: | |
for chunk in r: | |
f.write(chunk) | |
fields[audio_idx] = '[sound:%s]' % sound_file_name | |
except Exception as e: | |
self.log("Error with updating card with spec. idx %s" % e) | |
return fields | |
else: | |
words = '"%s"' % longest_word | |
if longest_word != processed_word: | |
words = '"%s" ("%s")' % (longest_word, processed_word) | |
self.log('Could not find pronunciation for %s' % words) | |
self.stats['not_found'] += 1 | |
class Spanish(ForvoDownloader): | |
DECK_NAME = 'Español' | |
LANG_CODES = ['es_latam', 'es'] | |
class Russian(ForvoDownloader): | |
DECK_NAME = 'По-русски' | |
LANG_CODES = ['ru'] | |
def strip_accents(self, word): | |
return word.replace('а́', 'а').replace('е́', 'е') | |
class Japanese(AudioDownloader): | |
CARD_TYPE = 'Japanese' | |
DL_URL = 'http://assets.languagepod101.com/dictionary/japanese/audiomp3.php' | |
def __init__(self, *args, **kwargs): | |
self.stats.update({ | |
'found_conjugated': 0, | |
'no_kanji_or_kana': 0, | |
}) | |
super().__init__(*args, **kwargs) | |
@classmethod | |
def find_word(cls, s): | |
splitted = word_sep_regx.split(s.strip()) | |
for w in splitted: | |
cleaned = remove_non_unicode_characters(w).strip() | |
if cleaned != '': | |
return cleaned | |
return '' | |
@classmethod | |
def to_dict_form(cls, kanji, kana): | |
kana_root = kana[:-3] | |
kanji_root = kanji[:-3] | |
def _filled(poss): | |
return [('%s%s' % (kanji_root, suff), '%s%s' % (kana_root, suff)) for suff in poss] | |
if kanji[-3:] == 'します' and kana[-3:] == 'します': | |
# group 3 | |
return _filled(['する', '']) | |
elif kanji[-2:] == 'ます' and kana[-2:] == 'ます': | |
if kana[-3] in ('き', 'ぎ', 'み', 'り', 'い', 'し', 'ち',): | |
# group 1 | |
if kana[-3] == 'き': | |
return _filled(['く']) | |
elif kana[-3] == 'ぎ': | |
return _filled(['ぐ']) | |
elif kana[-3] == 'み': | |
return _filled(['む']) | |
elif kana[-3] == 'り': | |
return _filled(['る']) | |
elif kana[-3] == 'い': | |
return _filled(['う']) | |
elif kana[-3] == 'し': | |
return _filled(['す']) | |
elif kana[-3] == 'ち': | |
return _filled(['つ']) | |
else: | |
# group 2 | |
return [('%sる' % kanji[:-2], '%sる' % kana[:-2])] | |
return [] | |
def get_idxs(self, note_fields): | |
audio_idx = None | |
kanji_idx = None | |
hiragana_idx = None | |
for _ord, name, note_type_id in note_fields: | |
fieldname = name.lower() | |
if 'audio' == fieldname: | |
audio_idx = _ord | |
elif 'hiragana' == fieldname: | |
hiragana_idx = _ord | |
elif fieldname == 'front' or ('vocab' in fieldname and 'hiragana' not in fieldname and 'back' not in fieldname): | |
kanji_idx = _ord | |
elif fieldname == 'text': | |
# needs work (cloze deletion) | |
kanji_idx = _ord | |
if audio_idx is None or kanji_idx is None or hiragana_idx is None: | |
raise AssertionError("missing kanji, hiragana or audio field(s)") | |
return (audio_idx, kanji_idx, hiragana_idx) | |
def get_modded_fields_for_card(self, card, idxs): | |
audio_idx = idxs[0] | |
kanji_idx = idxs[1] | |
hiragana_idx = idxs[2] | |
fields = card[6].split(CARD_FIELD_SEPARATOR) | |
raw_kanji = fields[kanji_idx] | |
raw_kana = fields[hiragana_idx] | |
kanji = self.find_word(raw_kanji) | |
kana = self.find_word(raw_kana) | |
if kana == '': | |
if kanji == '': | |
if raw_kana != '' or raw_kanji != '': | |
print("No kanji/kana for %s %s" % (raw_kanji, raw_kana)) | |
self.stats['no_kanji_or_kana'] += 1 | |
return | |
kana = kanji | |
existing_sound = fields[audio_idx].strip() | |
if existing_sound != '': | |
# print("already has audio %s %s" % (kanji, kana)) | |
self.stats['existing_sound'] += 1 | |
return | |
conjugated = False | |
try: | |
r = requests.get(self.DL_URL, params={ | |
'kanji': kanji, 'kana': kana}) | |
if r.headers['Content-length'] == '52288': | |
# now try conjugating | |
attempts = self.to_dict_form(kanji, kana) | |
for attempt in attempts: | |
conjugated_kanji, conjugated_kana = attempt | |
r = requests.get(self.DL_URL, params={ | |
'kanji': conjugated_kanji, 'kana': conjugated_kana}) | |
print("Trying %s %s for %s %s" % | |
(conjugated_kanji, conjugated_kana, kanji, kana)) | |
if r.headers['Content-length'] != '52288': | |
conjugated = True | |
if not conjugated: | |
print("NOT found %s %s" % (kanji, kana)) | |
self.stats['not_found'] += 1 | |
return | |
if conjugated: | |
print("Found conjugated form %s %s" % (kanji, kana)) | |
self.stats['found_conjugated'] += 1 | |
else: | |
print("Found %s %s" % (kanji, kana)) | |
self.stats['found'] += 1 | |
sound_file_name = '%s_%s.mp3' % (kanji, kana) | |
sound_file_path = os.path.join(self.media_loc, sound_file_name) | |
with open(sound_file_path, 'wb') as f: | |
for chunk in r: | |
f.write(chunk) | |
fields[audio_idx] = '[sound:%s]' % sound_file_name | |
except Exception as e: | |
self.log("Error with updating card with spec. idx %s" % e) | |
return fields | |
class JapaneseCloze(Japanese): | |
CARD_TYPE = 'Japanese Cloze' | |
class JapaneseKanji(Japanese): | |
CARD_TYPE = 'Kanji' | |
def remove_non_unicode_characters(s): | |
return unicode_regx.sub('', s) | |
@click.command() | |
@click.argument('anki-user-folder-loc', type=click.Path(exists=True)) | |
@click.option('--language', '-l', type=click.Choice(['japanese', 'russian', 'spanish']), required=True, multiple=True) | |
@click.option('--last-mod', default=time.time()) | |
@click.option('--dry', is_flag=True) | |
def do(anki_user_folder_loc, language, last_mod, dry): | |
if 'japanese' in language: | |
Japanese(dry, anki_user_folder_loc, last_mod) | |
JapaneseCloze(dry, anki_user_folder_loc, last_mod) | |
JapaneseKanji(dry, anki_user_folder_loc, last_mod) | |
if 'russian' in language: | |
Russian(dry, anki_user_folder_loc, last_mod) | |
if 'spanish' in language: | |
Spanish(dry, anki_user_folder_loc, last_mod) | |
if __name__ == '__main__': | |
do() |
@nwwt Thanks! I actually just noticed some of the issues with newer Anki. I will fix and update this script, it would be helpful to have your change requests in a fork - if you're so inclined.
@nwwt updated, FYI
This would be really convenient as an add-on. Thanks for sharing your personal work.
How do you make this work? I added it the file to the forvo src but I'm not sure what to do next
Hi, I tried running the script but it's telling me "Missing argument : 'ANKI_USER_FOLDER_LOC'.
Also, don't really know how to use this thing. Is there a wiki or something? I want to download pronounciations from japanesepod101 for a big list of words.
@languagemaniac
JP101 is available at media.digitalwords.net/anki/lp/Japanese.apkg
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
There's some issues with the code:
As for Anki itself, with newer 2.1 versions you must run "Downgrade & Quit" from (bottom right) of the File→"Switch Profile" dialog (C-S-p), otherwise the profile will be in a new format this script can't read. Anki will automatically upgrade it again the next time you open the profile normally.