Skip to content

Instantly share code, notes, and snippets.

@mikob
Last active March 31, 2023 19:07
Show Gist options
  • Save mikob/24a908471e38370f40d302b1cb1b41fb to your computer and use it in GitHub Desktop.
Save mikob/24a908471e38370f40d302b1cb1b41fb to your computer and use it in GitHub Desktop.
Bulk download audio pronunciations for Anki from Forvo and/or JapanesePod101. It will automatically update your anki deck with the populated the audio field.
#!/usr/bin/env python3
'''
Update 10/4/2021
* Download audio for the longest word in Spanish.
Update 9/22/2020
* Allow looking up through multiple lang codes by priority order on Forvo (eg. es_LATAM, es)
* Strip nbsp; when looking up words on Forvo
Update 9/5/2020
* Fixed to work with newer Anki versions 2.1.33+ (possibly earlier)
---
Make sure to adjust BACKUP_LOC to a directory of your choosing.
You will need to adjust CARD_TYPE and possibly DECK_NAME or create your own class that extends
AudioDownloader for other languages.
Adjust get_idxs and get_modded_fields_for_card as needed.
You might need to adjust the fields for kana and kanji for Japanese.
'''
import sqlite3
from abc import ABC, abstractmethod
from hashlib import sha1
# from anki.utils import fieldChecksum
from datetime import datetime
import re
import base64
from functools import reduce
import sys
import shutil
import time
import requests
import json
import os
import click
BACKUP_LOC = '/home/mikob/.local/share/Anki2/Miko/custom-backups/'
CARD_FIELD_SEPARATOR = '\x1f'
FAKE_BROWSER_HEADERS = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36",
"accept-language": "en-US,en;q=0.9,ja;q=0.8",
"cache-control": "no-cache",
"pragma": "no-cache",
"sec-fetch-mode":
"navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1"
}
unicode_regx = re.compile('[\x00-\x7F]')
word_sep_regx = re.compile('[\[\].,0-9a-zA-Z`\/「」、。:;0-9ー()()\s].')
def strip_special_chars(s):
return re.sub('[?:;/\\\"<>\.]+', '', s)
class AudioDownloader(ABC):
stats = {
'not_found': 0,
'found': 0,
'existing_sound': 0,
'invalid_audio': 0,
}
media_loc = None
DECK_NAME = ''
def __init__(self, dry, anki_user_folder_loc, last_mod):
db_loc = os.path.join(anki_user_folder_loc, 'collection.anki2')
self.media_loc = os.path.join(anki_user_folder_loc, 'collection.media')
backup_loc = os.path.join(
BACKUP_LOC, 'anki_backup_%s.anki2' % datetime.now())
shutil.copy(db_loc, backup_loc)
self.log("Created backup: %s" % backup_loc)
conn = sqlite3.connect(db_loc)
cur = conn.cursor()
mod_time = int(time.time()) - last_mod
cur.execute('SELECT id,name FROM notetypes')
note_types = cur.fetchall()
# find deck_id
deck_id = None
if self.DECK_NAME:
cur.execute('SELECT id,name FROM decks')
decks = cur.fetchall()
# get deck ids by name
for deck in decks:
_deck_id = deck[0]
deck_name = deck[1]
if deck_name == self.DECK_NAME:
deck_id = _deck_id
break
if not deck_id:
self.log(
f'Could not find deck {self.DECK_NAME}. Continuing with all decks...')
else:
self.log(f'{self.DECK_NAME} deck id: {deck_id}')
cards = []
for note_type in note_types:
note_id = note_type[0]
note_name = note_type[1]
if note_name == self.CARD_TYPE:
cards = self.get_cards(cur, note_id, mod_time, deck_id)
cur.execute(
'SELECT ord,name,ntid FROM fields WHERE ntid=?', (int(note_id),))
note_fields = cur.fetchall()
idxs = self.get_idxs(note_fields)
if len(idxs) == 0:
raise AssertionError("did not get any idxs")
break
# to restrict doing each note just once
note_set = set()
for card in cards:
id = card[0]
nid = card[1]
if not (nid in note_set):
note_set.add(nid)
modded_note_fields = self.get_modded_fields_for_card(
card, idxs)
if modded_note_fields is not None and len(modded_note_fields) > 0:
self.write_back(cur, id, modded_note_fields)
# csum = fieldChecksum(new_fields[0])
if not dry:
conn.commit()
conn.close()
self.log("Finished!\n\n%s" % self.stats)
'''
Get the cards that don't have audio and have been updated after the
mod time.
mid = note type id
'''
def get_cards(self, cur, note_type_id, mod_time, deck_id=None):
q = 'SELECT * FROM notes WHERE mid=? AND mod>?'
qa = (int(note_type_id), mod_time)
if deck_id:
q = 'SELECT * FROM notes INNER JOIN cards ON notes.id=cards.nid WHERE notes.mid=? AND notes.mod>? AND cards.did=?'
qa += (deck_id,)
cur.execute(q, qa)
cards = cur.fetchall()
self.log("Found %d cards" % (len(cards)))
return cards
def write_back(self, cur, id, modded_note_fields):
try:
modded_fields_str = str.join(
CARD_FIELD_SEPARATOR, modded_note_fields)
mod = int(time.time())
# need to set update sequence number (usn) to -1 to tell server we have updates
cur.execute("UPDATE notes SET flds=?,mod=?,usn=? WHERE id=?",
(modded_fields_str, mod, -1, id))
except Exception as e:
self.log("Error with card %s" % e)
def log(self, *msg):
print(self.CARD_TYPE, *msg)
'''
note_fields is a tuple with _ord (field idx), name, and note_type_id
Returns a tuple of idxs that get_modded_fields_for_card can use.
'''
@abstractmethod
def get_idxs(self, note_fields):
pass
'''
Takes idxs from get_idxs
'''
@abstractmethod
def get_modded_fields_for_card(self, cards, idxs):
pass
class ForvoDownloader(AudioDownloader):
CARD_TYPE = 'WAudio'
def strip_bad_chars(self, word):
return word.replace('&nbsp;', '')
def get_forvo_pronunciation(self, longest_word, processed_word):
for lang_code in self.LANG_CODES:
webPageUrl = "https://forvo.com/search/%s/%s/" % (
longest_word, lang_code)
# s = requests.Session()
# res = s.get('https://forvo.com')
# cookies = dict(res.cookies)
webPageText = requests.get(
webPageUrl, headers=FAKE_BROWSER_HEADERS).text
pageTextList = re.findall(
"<article class=\"search_words.*?</article>", webPageText, re.DOTALL)
if len(pageTextList) == 0:
continue
# first result might be search_words empty
pageText = pageTextList[-1]
pronunciations = re.findall("Play\(\d+,'(.*?)'", pageText)
if pronunciations:
for l in range(len(pronunciations)):
pronunciations[l] = base64.b64decode(
pronunciations[l]).decode()
words = '"%s"' % longest_word
if processed_word and longest_word != processed_word:
words = '"%s" ("%s")' % (longest_word, processed_word)
self.log('Found %d pronunciations for %s' %
(len(pronunciations), words))
return 'https://forvo.com/mp3/%s' % pronunciations[0]
def get_idxs(self, note_fields):
audio_idx = None
for _ord, name, note_type_id in note_fields:
if name == 'Audio':
return (_ord,)
return ()
def get_modded_fields_for_card(self, card, idxs):
audio_idx = idxs[0]
front_field_idx = 0
fields = card[6].split(CARD_FIELD_SEPARATOR)
existing_sound = fields[audio_idx].strip()
if existing_sound != '':
self.stats['existing_sound'] += 1
return
processed_word = self.strip_bad_chars(fields[front_field_idx])
longest_word = reduce(lambda memo, word: word if len(
word) > len(memo) else memo, processed_word.split(' '), '')
audio_url = self.get_forvo_pronunciation(longest_word, processed_word)
if audio_url:
try:
r = requests.get(audio_url, headers=FAKE_BROWSER_HEADERS)
if r.headers['content-type'] != 'audio/mpeg':
self.stats['invalid_audio'] += 1
return
self.stats['found'] += 1
sound_file_name = '%s_%s.mp3' % (
strip_special_chars(fields[front_field_idx]), 'forvo')
sound_file_path = os.path.join(self.media_loc, sound_file_name)
with open(sound_file_path, 'wb') as f:
for chunk in r:
f.write(chunk)
fields[audio_idx] = '[sound:%s]' % sound_file_name
except Exception as e:
self.log("Error with updating card with spec. idx %s" % e)
return fields
else:
words = '"%s"' % longest_word
if longest_word != processed_word:
words = '"%s" ("%s")' % (longest_word, processed_word)
self.log('Could not find pronunciation for %s' % words)
self.stats['not_found'] += 1
class Spanish(ForvoDownloader):
DECK_NAME = 'Español'
LANG_CODES = ['es_latam', 'es']
class Russian(ForvoDownloader):
DECK_NAME = 'По-русски'
LANG_CODES = ['ru']
def strip_accents(self, word):
return word.replace('а́', 'а').replace('е́', 'е')
class Japanese(AudioDownloader):
CARD_TYPE = 'Japanese'
DL_URL = 'http://assets.languagepod101.com/dictionary/japanese/audiomp3.php'
def __init__(self, *args, **kwargs):
self.stats.update({
'found_conjugated': 0,
'no_kanji_or_kana': 0,
})
super().__init__(*args, **kwargs)
@classmethod
def find_word(cls, s):
splitted = word_sep_regx.split(s.strip())
for w in splitted:
cleaned = remove_non_unicode_characters(w).strip()
if cleaned != '':
return cleaned
return ''
@classmethod
def to_dict_form(cls, kanji, kana):
kana_root = kana[:-3]
kanji_root = kanji[:-3]
def _filled(poss):
return [('%s%s' % (kanji_root, suff), '%s%s' % (kana_root, suff)) for suff in poss]
if kanji[-3:] == 'します' and kana[-3:] == 'します':
# group 3
return _filled(['する', ''])
elif kanji[-2:] == 'ます' and kana[-2:] == 'ます':
if kana[-3] in ('き', 'ぎ', 'み', 'り', 'い', 'し', 'ち',):
# group 1
if kana[-3] == 'き':
return _filled(['く'])
elif kana[-3] == 'ぎ':
return _filled(['ぐ'])
elif kana[-3] == 'み':
return _filled(['む'])
elif kana[-3] == 'り':
return _filled(['る'])
elif kana[-3] == 'い':
return _filled(['う'])
elif kana[-3] == 'し':
return _filled(['す'])
elif kana[-3] == 'ち':
return _filled(['つ'])
else:
# group 2
return [('%sる' % kanji[:-2], '%sる' % kana[:-2])]
return []
def get_idxs(self, note_fields):
audio_idx = None
kanji_idx = None
hiragana_idx = None
for _ord, name, note_type_id in note_fields:
fieldname = name.lower()
if 'audio' == fieldname:
audio_idx = _ord
elif 'hiragana' == fieldname:
hiragana_idx = _ord
elif fieldname == 'front' or ('vocab' in fieldname and 'hiragana' not in fieldname and 'back' not in fieldname):
kanji_idx = _ord
elif fieldname == 'text':
# needs work (cloze deletion)
kanji_idx = _ord
if audio_idx is None or kanji_idx is None or hiragana_idx is None:
raise AssertionError("missing kanji, hiragana or audio field(s)")
return (audio_idx, kanji_idx, hiragana_idx)
def get_modded_fields_for_card(self, card, idxs):
audio_idx = idxs[0]
kanji_idx = idxs[1]
hiragana_idx = idxs[2]
fields = card[6].split(CARD_FIELD_SEPARATOR)
raw_kanji = fields[kanji_idx]
raw_kana = fields[hiragana_idx]
kanji = self.find_word(raw_kanji)
kana = self.find_word(raw_kana)
if kana == '':
if kanji == '':
if raw_kana != '' or raw_kanji != '':
print("No kanji/kana for %s %s" % (raw_kanji, raw_kana))
self.stats['no_kanji_or_kana'] += 1
return
kana = kanji
existing_sound = fields[audio_idx].strip()
if existing_sound != '':
# print("already has audio %s %s" % (kanji, kana))
self.stats['existing_sound'] += 1
return
conjugated = False
try:
r = requests.get(self.DL_URL, params={
'kanji': kanji, 'kana': kana})
if r.headers['Content-length'] == '52288':
# now try conjugating
attempts = self.to_dict_form(kanji, kana)
for attempt in attempts:
conjugated_kanji, conjugated_kana = attempt
r = requests.get(self.DL_URL, params={
'kanji': conjugated_kanji, 'kana': conjugated_kana})
print("Trying %s %s for %s %s" %
(conjugated_kanji, conjugated_kana, kanji, kana))
if r.headers['Content-length'] != '52288':
conjugated = True
if not conjugated:
print("NOT found %s %s" % (kanji, kana))
self.stats['not_found'] += 1
return
if conjugated:
print("Found conjugated form %s %s" % (kanji, kana))
self.stats['found_conjugated'] += 1
else:
print("Found %s %s" % (kanji, kana))
self.stats['found'] += 1
sound_file_name = '%s_%s.mp3' % (kanji, kana)
sound_file_path = os.path.join(self.media_loc, sound_file_name)
with open(sound_file_path, 'wb') as f:
for chunk in r:
f.write(chunk)
fields[audio_idx] = '[sound:%s]' % sound_file_name
except Exception as e:
self.log("Error with updating card with spec. idx %s" % e)
return fields
class JapaneseCloze(Japanese):
CARD_TYPE = 'Japanese Cloze'
class JapaneseKanji(Japanese):
CARD_TYPE = 'Kanji'
def remove_non_unicode_characters(s):
return unicode_regx.sub('', s)
@click.command()
@click.argument('anki-user-folder-loc', type=click.Path(exists=True))
@click.option('--language', '-l', type=click.Choice(['japanese', 'russian', 'spanish']), required=True, multiple=True)
@click.option('--last-mod', default=time.time())
@click.option('--dry', is_flag=True)
def do(anki_user_folder_loc, language, last_mod, dry):
if 'japanese' in language:
Japanese(dry, anki_user_folder_loc, last_mod)
JapaneseCloze(dry, anki_user_folder_loc, last_mod)
JapaneseKanji(dry, anki_user_folder_loc, last_mod)
if 'russian' in language:
Russian(dry, anki_user_folder_loc, last_mod)
if 'spanish' in language:
Spanish(dry, anki_user_folder_loc, last_mod)
if __name__ == '__main__':
do()
@twwn
Copy link

twwn commented Aug 31, 2020

There's some issues with the code:

  • 98: this for loop should run for each note, not pointlessly each and every card of it (or alternatively: only for one card)
  • for the same reason, 386&387 should be commented out by default, with a note in the preface
  • Japanese(AudioDownloader) needs a default DECK_NAME
  • tests @ 290 & 292 should be == ffs, and the preface needs a note that the fields there (and 294 & 296) may need adjustment
  • actually I have not idea what 294 was about so I killed it

As for Anki itself, with newer 2.1 versions you must run "Downgrade & Quit" from (bottom right) of the File→"Switch Profile" dialog (C-S-p), otherwise the profile will be in a new format this script can't read. Anki will automatically upgrade it again the next time you open the profile normally.

@mikob
Copy link
Author

mikob commented Aug 31, 2020

@nwwt Thanks! I actually just noticed some of the issues with newer Anki. I will fix and update this script, it would be helpful to have your change requests in a fork - if you're so inclined.

@mikob
Copy link
Author

mikob commented Sep 5, 2020

@nwwt updated, FYI

@kanjieater
Copy link

This would be really convenient as an add-on. Thanks for sharing your personal work.

@maxgraze-zz
Copy link

How do you make this work? I added it the file to the forvo src but I'm not sure what to do next

@languagemaniac
Copy link

Hi, I tried running the script but it's telling me "Missing argument : 'ANKI_USER_FOLDER_LOC'.

Also, don't really know how to use this thing. Is there a wiki or something? I want to download pronounciations from japanesepod101 for a big list of words.

@rwmpelstilzchen
Copy link

@languagemaniac
JP101 is available at media.digitalwords.net/anki/lp/Japanese.apkg

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment