-
-
Save mikob/24a908471e38370f40d302b1cb1b41fb to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3 | |
''' | |
Update 10/4/2021 | |
* Download audio for the longest word in Spanish. | |
Update 9/22/2020 | |
* Allow looking up through multiple lang codes by priority order on Forvo (eg. es_LATAM, es) | |
* Strip nbsp; when looking up words on Forvo | |
Update 9/5/2020 | |
* Fixed to work with newer Anki versions 2.1.33+ (possibly earlier) | |
--- | |
Make sure to adjust BACKUP_LOC to a directory of your choosing. | |
You will need to adjust CARD_TYPE and possibly DECK_NAME or create your own class that extends | |
AudioDownloader for other languages. | |
Adjust get_idxs and get_modded_fields_for_card as needed. | |
You might need to adjust the fields for kana and kanji for Japanese. | |
''' | |
import sqlite3 | |
from abc import ABC, abstractmethod | |
from hashlib import sha1 | |
# from anki.utils import fieldChecksum | |
from datetime import datetime | |
import re | |
import base64 | |
from functools import reduce | |
import sys | |
import shutil | |
import time | |
import requests | |
import json | |
import os | |
import click | |
BACKUP_LOC = '/home/mikob/.local/share/Anki2/Miko/custom-backups/' | |
CARD_FIELD_SEPARATOR = '\x1f' | |
FAKE_BROWSER_HEADERS = { | |
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3", | |
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36", | |
"accept-language": "en-US,en;q=0.9,ja;q=0.8", | |
"cache-control": "no-cache", | |
"pragma": "no-cache", | |
"sec-fetch-mode": | |
"navigate", | |
"sec-fetch-site": "none", | |
"sec-fetch-user": "?1", | |
"upgrade-insecure-requests": "1" | |
} | |
unicode_regx = re.compile('[\x00-\x7F]') | |
word_sep_regx = re.compile('[\[\].,0-9a-zA-Z`\/「」、。:;0-9ー()()\s].') | |
def strip_special_chars(s): | |
return re.sub('[?:;/\\\"<>\.]+', '', s) | |
class AudioDownloader(ABC): | |
stats = { | |
'not_found': 0, | |
'found': 0, | |
'existing_sound': 0, | |
'invalid_audio': 0, | |
} | |
media_loc = None | |
DECK_NAME = '' | |
def __init__(self, dry, anki_user_folder_loc, last_mod): | |
db_loc = os.path.join(anki_user_folder_loc, 'collection.anki2') | |
self.media_loc = os.path.join(anki_user_folder_loc, 'collection.media') | |
backup_loc = os.path.join( | |
BACKUP_LOC, 'anki_backup_%s.anki2' % datetime.now()) | |
shutil.copy(db_loc, backup_loc) | |
self.log("Created backup: %s" % backup_loc) | |
conn = sqlite3.connect(db_loc) | |
cur = conn.cursor() | |
mod_time = int(time.time()) - last_mod | |
cur.execute('SELECT id,name FROM notetypes') | |
note_types = cur.fetchall() | |
# find deck_id | |
deck_id = None | |
if self.DECK_NAME: | |
cur.execute('SELECT id,name FROM decks') | |
decks = cur.fetchall() | |
# get deck ids by name | |
for deck in decks: | |
_deck_id = deck[0] | |
deck_name = deck[1] | |
if deck_name == self.DECK_NAME: | |
deck_id = _deck_id | |
break | |
if not deck_id: | |
self.log( | |
f'Could not find deck {self.DECK_NAME}. Continuing with all decks...') | |
else: | |
self.log(f'{self.DECK_NAME} deck id: {deck_id}') | |
cards = [] | |
for note_type in note_types: | |
note_id = note_type[0] | |
note_name = note_type[1] | |
if note_name == self.CARD_TYPE: | |
cards = self.get_cards(cur, note_id, mod_time, deck_id) | |
cur.execute( | |
'SELECT ord,name,ntid FROM fields WHERE ntid=?', (int(note_id),)) | |
note_fields = cur.fetchall() | |
idxs = self.get_idxs(note_fields) | |
if len(idxs) == 0: | |
raise AssertionError("did not get any idxs") | |
break | |
# to restrict doing each note just once | |
note_set = set() | |
for card in cards: | |
id = card[0] | |
nid = card[1] | |
if not (nid in note_set): | |
note_set.add(nid) | |
modded_note_fields = self.get_modded_fields_for_card( | |
card, idxs) | |
if modded_note_fields is not None and len(modded_note_fields) > 0: | |
self.write_back(cur, id, modded_note_fields) | |
# csum = fieldChecksum(new_fields[0]) | |
if not dry: | |
conn.commit() | |
conn.close() | |
self.log("Finished!\n\n%s" % self.stats) | |
''' | |
Get the cards that don't have audio and have been updated after the | |
mod time. | |
mid = note type id | |
''' | |
def get_cards(self, cur, note_type_id, mod_time, deck_id=None): | |
q = 'SELECT * FROM notes WHERE mid=? AND mod>?' | |
qa = (int(note_type_id), mod_time) | |
if deck_id: | |
q = 'SELECT * FROM notes INNER JOIN cards ON notes.id=cards.nid WHERE notes.mid=? AND notes.mod>? AND cards.did=?' | |
qa += (deck_id,) | |
cur.execute(q, qa) | |
cards = cur.fetchall() | |
self.log("Found %d cards" % (len(cards))) | |
return cards | |
def write_back(self, cur, id, modded_note_fields): | |
try: | |
modded_fields_str = str.join( | |
CARD_FIELD_SEPARATOR, modded_note_fields) | |
mod = int(time.time()) | |
# need to set update sequence number (usn) to -1 to tell server we have updates | |
cur.execute("UPDATE notes SET flds=?,mod=?,usn=? WHERE id=?", | |
(modded_fields_str, mod, -1, id)) | |
except Exception as e: | |
self.log("Error with card %s" % e) | |
def log(self, *msg): | |
print(self.CARD_TYPE, *msg) | |
''' | |
note_fields is a tuple with _ord (field idx), name, and note_type_id | |
Returns a tuple of idxs that get_modded_fields_for_card can use. | |
''' | |
@abstractmethod | |
def get_idxs(self, note_fields): | |
pass | |
''' | |
Takes idxs from get_idxs | |
''' | |
@abstractmethod | |
def get_modded_fields_for_card(self, cards, idxs): | |
pass | |
class ForvoDownloader(AudioDownloader): | |
CARD_TYPE = 'WAudio' | |
def strip_bad_chars(self, word): | |
return word.replace(' ', '') | |
def get_forvo_pronunciation(self, longest_word, processed_word): | |
for lang_code in self.LANG_CODES: | |
webPageUrl = "https://forvo.com/search/%s/%s/" % ( | |
longest_word, lang_code) | |
# s = requests.Session() | |
# res = s.get('https://forvo.com') | |
# cookies = dict(res.cookies) | |
webPageText = requests.get( | |
webPageUrl, headers=FAKE_BROWSER_HEADERS).text | |
pageTextList = re.findall( | |
"<article class=\"search_words.*?</article>", webPageText, re.DOTALL) | |
if len(pageTextList) == 0: | |
continue | |
# first result might be search_words empty | |
pageText = pageTextList[-1] | |
pronunciations = re.findall("Play\(\d+,'(.*?)'", pageText) | |
if pronunciations: | |
for l in range(len(pronunciations)): | |
pronunciations[l] = base64.b64decode( | |
pronunciations[l]).decode() | |
words = '"%s"' % longest_word | |
if processed_word and longest_word != processed_word: | |
words = '"%s" ("%s")' % (longest_word, processed_word) | |
self.log('Found %d pronunciations for %s' % | |
(len(pronunciations), words)) | |
return 'https://forvo.com/mp3/%s' % pronunciations[0] | |
def get_idxs(self, note_fields): | |
audio_idx = None | |
for _ord, name, note_type_id in note_fields: | |
if name == 'Audio': | |
return (_ord,) | |
return () | |
def get_modded_fields_for_card(self, card, idxs): | |
audio_idx = idxs[0] | |
front_field_idx = 0 | |
fields = card[6].split(CARD_FIELD_SEPARATOR) | |
existing_sound = fields[audio_idx].strip() | |
if existing_sound != '': | |
self.stats['existing_sound'] += 1 | |
return | |
processed_word = self.strip_bad_chars(fields[front_field_idx]) | |
longest_word = reduce(lambda memo, word: word if len( | |
word) > len(memo) else memo, processed_word.split(' '), '') | |
audio_url = self.get_forvo_pronunciation(longest_word, processed_word) | |
if audio_url: | |
try: | |
r = requests.get(audio_url, headers=FAKE_BROWSER_HEADERS) | |
if r.headers['content-type'] != 'audio/mpeg': | |
self.stats['invalid_audio'] += 1 | |
return | |
self.stats['found'] += 1 | |
sound_file_name = '%s_%s.mp3' % ( | |
strip_special_chars(fields[front_field_idx]), 'forvo') | |
sound_file_path = os.path.join(self.media_loc, sound_file_name) | |
with open(sound_file_path, 'wb') as f: | |
for chunk in r: | |
f.write(chunk) | |
fields[audio_idx] = '[sound:%s]' % sound_file_name | |
except Exception as e: | |
self.log("Error with updating card with spec. idx %s" % e) | |
return fields | |
else: | |
words = '"%s"' % longest_word | |
if longest_word != processed_word: | |
words = '"%s" ("%s")' % (longest_word, processed_word) | |
self.log('Could not find pronunciation for %s' % words) | |
self.stats['not_found'] += 1 | |
class Spanish(ForvoDownloader): | |
DECK_NAME = 'Español' | |
LANG_CODES = ['es_latam', 'es'] | |
class Russian(ForvoDownloader): | |
DECK_NAME = 'По-русски' | |
LANG_CODES = ['ru'] | |
def strip_accents(self, word): | |
return word.replace('а́', 'а').replace('е́', 'е') | |
class Japanese(AudioDownloader): | |
CARD_TYPE = 'Japanese' | |
DL_URL = 'http://assets.languagepod101.com/dictionary/japanese/audiomp3.php' | |
def __init__(self, *args, **kwargs): | |
self.stats.update({ | |
'found_conjugated': 0, | |
'no_kanji_or_kana': 0, | |
}) | |
super().__init__(*args, **kwargs) | |
@classmethod | |
def find_word(cls, s): | |
splitted = word_sep_regx.split(s.strip()) | |
for w in splitted: | |
cleaned = remove_non_unicode_characters(w).strip() | |
if cleaned != '': | |
return cleaned | |
return '' | |
@classmethod | |
def to_dict_form(cls, kanji, kana): | |
kana_root = kana[:-3] | |
kanji_root = kanji[:-3] | |
def _filled(poss): | |
return [('%s%s' % (kanji_root, suff), '%s%s' % (kana_root, suff)) for suff in poss] | |
if kanji[-3:] == 'します' and kana[-3:] == 'します': | |
# group 3 | |
return _filled(['する', '']) | |
elif kanji[-2:] == 'ます' and kana[-2:] == 'ます': | |
if kana[-3] in ('き', 'ぎ', 'み', 'り', 'い', 'し', 'ち',): | |
# group 1 | |
if kana[-3] == 'き': | |
return _filled(['く']) | |
elif kana[-3] == 'ぎ': | |
return _filled(['ぐ']) | |
elif kana[-3] == 'み': | |
return _filled(['む']) | |
elif kana[-3] == 'り': | |
return _filled(['る']) | |
elif kana[-3] == 'い': | |
return _filled(['う']) | |
elif kana[-3] == 'し': | |
return _filled(['す']) | |
elif kana[-3] == 'ち': | |
return _filled(['つ']) | |
else: | |
# group 2 | |
return [('%sる' % kanji[:-2], '%sる' % kana[:-2])] | |
return [] | |
def get_idxs(self, note_fields): | |
audio_idx = None | |
kanji_idx = None | |
hiragana_idx = None | |
for _ord, name, note_type_id in note_fields: | |
fieldname = name.lower() | |
if 'audio' == fieldname: | |
audio_idx = _ord | |
elif 'hiragana' == fieldname: | |
hiragana_idx = _ord | |
elif fieldname == 'front' or ('vocab' in fieldname and 'hiragana' not in fieldname and 'back' not in fieldname): | |
kanji_idx = _ord | |
elif fieldname == 'text': | |
# needs work (cloze deletion) | |
kanji_idx = _ord | |
if audio_idx is None or kanji_idx is None or hiragana_idx is None: | |
raise AssertionError("missing kanji, hiragana or audio field(s)") | |
return (audio_idx, kanji_idx, hiragana_idx) | |
def get_modded_fields_for_card(self, card, idxs): | |
audio_idx = idxs[0] | |
kanji_idx = idxs[1] | |
hiragana_idx = idxs[2] | |
fields = card[6].split(CARD_FIELD_SEPARATOR) | |
raw_kanji = fields[kanji_idx] | |
raw_kana = fields[hiragana_idx] | |
kanji = self.find_word(raw_kanji) | |
kana = self.find_word(raw_kana) | |
if kana == '': | |
if kanji == '': | |
if raw_kana != '' or raw_kanji != '': | |
print("No kanji/kana for %s %s" % (raw_kanji, raw_kana)) | |
self.stats['no_kanji_or_kana'] += 1 | |
return | |
kana = kanji | |
existing_sound = fields[audio_idx].strip() | |
if existing_sound != '': | |
# print("already has audio %s %s" % (kanji, kana)) | |
self.stats['existing_sound'] += 1 | |
return | |
conjugated = False | |
try: | |
r = requests.get(self.DL_URL, params={ | |
'kanji': kanji, 'kana': kana}) | |
if r.headers['Content-length'] == '52288': | |
# now try conjugating | |
attempts = self.to_dict_form(kanji, kana) | |
for attempt in attempts: | |
conjugated_kanji, conjugated_kana = attempt | |
r = requests.get(self.DL_URL, params={ | |
'kanji': conjugated_kanji, 'kana': conjugated_kana}) | |
print("Trying %s %s for %s %s" % | |
(conjugated_kanji, conjugated_kana, kanji, kana)) | |
if r.headers['Content-length'] != '52288': | |
conjugated = True | |
if not conjugated: | |
print("NOT found %s %s" % (kanji, kana)) | |
self.stats['not_found'] += 1 | |
return | |
if conjugated: | |
print("Found conjugated form %s %s" % (kanji, kana)) | |
self.stats['found_conjugated'] += 1 | |
else: | |
print("Found %s %s" % (kanji, kana)) | |
self.stats['found'] += 1 | |
sound_file_name = '%s_%s.mp3' % (kanji, kana) | |
sound_file_path = os.path.join(self.media_loc, sound_file_name) | |
with open(sound_file_path, 'wb') as f: | |
for chunk in r: | |
f.write(chunk) | |
fields[audio_idx] = '[sound:%s]' % sound_file_name | |
except Exception as e: | |
self.log("Error with updating card with spec. idx %s" % e) | |
return fields | |
class JapaneseCloze(Japanese): | |
CARD_TYPE = 'Japanese Cloze' | |
class JapaneseKanji(Japanese): | |
CARD_TYPE = 'Kanji' | |
def remove_non_unicode_characters(s): | |
return unicode_regx.sub('', s) | |
@click.command() | |
@click.argument('anki-user-folder-loc', type=click.Path(exists=True)) | |
@click.option('--language', '-l', type=click.Choice(['japanese', 'russian', 'spanish']), required=True, multiple=True) | |
@click.option('--last-mod', default=time.time()) | |
@click.option('--dry', is_flag=True) | |
def do(anki_user_folder_loc, language, last_mod, dry): | |
if 'japanese' in language: | |
Japanese(dry, anki_user_folder_loc, last_mod) | |
JapaneseCloze(dry, anki_user_folder_loc, last_mod) | |
JapaneseKanji(dry, anki_user_folder_loc, last_mod) | |
if 'russian' in language: | |
Russian(dry, anki_user_folder_loc, last_mod) | |
if 'spanish' in language: | |
Spanish(dry, anki_user_folder_loc, last_mod) | |
if __name__ == '__main__': | |
do() |
for mid, card_type in models.items():
if card_type['name'] in CARD_TYPES:
for idx, field in enumerate(card_type['flds']):
if field['name'] == 'Audio':
audio_field_idx = idx
if audio_field_idx:
audio_field_indices.append(idx)
cur.execute('SELECT * FROM notes WHERE mid=? AND mod>?', (int(mid), mod_time))
cards.append(cur.fetchall())
audio_field_indices.append(idx)
should probably be audio_field_indices.append(audio_field_idx )
. Right now this will only work if Audio
is the last field.
This doesn't work (I'm on 2.1). It needs click (perhaps in the same folder?) to run, and anki_user_folder_loc doesn't seem to be defined. How does this even work for you?
Doesnt work on 2.1 , any update on this?
There's some issues with the code:
- 98: this for loop should run for each note, not pointlessly each and every card of it (or alternatively: only for one card)
- for the same reason, 386&387 should be commented out by default, with a note in the preface
- Japanese(AudioDownloader) needs a default DECK_NAME
- tests @ 290 & 292 should be == ffs, and the preface needs a note that the fields there (and 294 & 296) may need adjustment
- actually I have not idea what 294 was about so I killed it
As for Anki itself, with newer 2.1 versions you must run "Downgrade & Quit" from (bottom right) of the File→"Switch Profile" dialog (C-S-p), otherwise the profile will be in a new format this script can't read. Anki will automatically upgrade it again the next time you open the profile normally.
@nwwt Thanks! I actually just noticed some of the issues with newer Anki. I will fix and update this script, it would be helpful to have your change requests in a fork - if you're so inclined.
@nwwt updated, FYI
This would be really convenient as an add-on. Thanks for sharing your personal work.
How do you make this work? I added it the file to the forvo src but I'm not sure what to do next
Hi, I tried running the script but it's telling me "Missing argument : 'ANKI_USER_FOLDER_LOC'.
Also, don't really know how to use this thing. Is there a wiki or something? I want to download pronounciations from japanesepod101 for a big list of words.
@languagemaniac
JP101 is available at media.digitalwords.net/anki/lp/Japanese.apkg
Could this be adapted for other courses on Innovative Language sites? I'm manually downloading the sentences from Frenchpod101 right now, but would love an automated way to do it.