Skip to content

Instantly share code, notes, and snippets.

@darvell
Created August 23, 2020 18:57
Show Gist options
  • Save darvell/bc0a672237c1759785ed914a8d8177c0 to your computer and use it in GitHub Desktop.
Save darvell/bc0a672237c1759785ed914a8d8177c0 to your computer and use it in GitHub Desktop.
Starting point for a scraper for use with FirstVoices. For archival purposes only.
#/usr/bin/python3
try:
import requests
except:
raise Exception("Requests library not installed. Please run 'easy_install requests' or 'pip3 install requests")
import os
import csv
import json
modify_mp3 = False
try:
import mutagen
from mutagen.easyid3 import EasyID3
from mutagen.id3 import ID3
modify_mp3 = True
except:
print("Mutagen not installed, MP3's won't have comments.")
class FirstVoicesApi():
def __init__(self, language_name = None):
self.language_name = language_name
self.session = requests.Session()
self.session.headers["Accept"] = "*/*"
self.session.headers["enrichers.document"] = "ancestry,dialect,character,word,media,book"
self.session.headers["Accept-Encoding"] = "gzip, deflate, br"
self.session.headers["properties"] = "*"
self.session.headers["Origin"] = "https://www.firstvoices.com"
self.session.headers["Nuxeo-transaction-timeout"] = "60000"
self.session.headers["Accept-Language"] = "en-US,en;q=0.9"
self.session.headers["User-Agent"] = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36"
with self.session.get("https://www.firstvoices.com",stream=True) as base_get:
pass
with self.session.get("https://www.firstvoices.com/nuxeo/login.jsp?nxtimeout=true&forceAnonymousLogin=true", stream=True) as user_get:
pass
self.languages = {}
self.language_objects = []
for language in self.get_all_languages():
self.languages[str(language)] = {"words": [], "language": {"family": language.language_family, "subgroup": language.subgroup, "dialect": language.dialect}}
def get_all_languages(self):
if len(self.language_objects) > 0:
return self.language_objects
print("Getting all languages...")
result = []
data = self.session.get("https://www.firstvoices.com/nuxeo/api/v1/query/get_dialects?pageProvider=get_dialects&queryParams=sections").json()
for entry in data["entries"]:
path_split = entry["path"].split("/")
result.append(LanguageDefinition(path_split[-4],path_split[-3],path_split[-2], entry["uid"],entry["path"]))
print(f"Retrieved {len(result)} languages.")
self.language_objects = result
return result
def get_language(self,family,group,dialect):
for language in self.get_all_languages():
if language.language_family in family and language.group in group and language.dialect in dialect:
return language
def get_language(self,dialect):
for language in self.get_all_languages():
if dialect.lower() in language.dialect.lower():
return language
elif dialect.lower() in language.subgroup.lower():
return language
elif dialect.lower() in language.language_family.lower():
return language
raise Exception("Can't find language.")
def get_language_family(self, family):
result = []
for language in self.get_all_languages():
if language.language_family.lower() in family.lower():
result.append(language)
return result
def query_words(self, language):
if not isinstance(language, LanguageDefinition):
language = self.get_language(language)
print("Setting up session for full word query.")
wrd_req = self.session.get(f"https://www.firstvoices.com/explore/FV/sections/Data/{str(language)}/learn/words/")
print("Getting page 1 of ?")
results = self.query("FVWord", language, page_size=2000)
entries = results["entries"]
if results["numberOfPages"] > 1:
for x in range(1,results["numberOfPages"] + 1 ):
print("Getting page {0} of {1}".format(x + 1, results["numberOfPages"]))
entries.extend(self.query("FVWord", language, page_size=2000, page=x)["entries"])
words = []
for entry in entries:
words.append(Word(entry))
self.languages[str(language)]["entries"] = entries
self.languages[str(language)]["words"] = words
return words
# FVWord, FVCharacter, FVDialect, FVPhrase, FVWord
def query(self, db_name = "FVWord", language = None, **kwargs):
base_url = "https://www.firstvoices.com/nuxeo/api/v1/automation/Document.EnrichedQuery"
escaped_lang = str(language).replace("'","\\'")
query = f"SELECT * FROM {db_name} WHERE ecm:path STARTSWITH '/FV/sections/Data/{escaped_lang}' AND ecm:isVersion = 0 AND ecm:isTrashed = 0 "
page_size = kwargs['page_size'] if 'page_size' in kwargs else 10
current_page = kwargs["page"] if 'page' in kwargs else 0
payload = {"params":
{"language":"NXQL","dialectId": "undefined","sortBy":"fv:custom_order","pageSize": page_size ,"sortOrder":"asc","currentPageIndex":f"{current_page}","query":query},"context":{}}
results = self.session.post(base_url, json=payload).json()
return results
def dump_language(self, language, target_directory = None):
if isinstance(language,LanguageDefinition) == False:
language = self.get_language(language)
if target_directory == None:
target_directory = os.path.join(os.path.abspath("."), language.language_family,language.subgroup,language.dialect)
os.makedirs(target_directory, exist_ok=True)
if len(self.languages[str(language)]["words"]) <= 0:
print(f"Retrieving words for {language}.")
self.query_words(language)
print("Generating CSV.")
with open(os.path.join(target_directory,'language.csv'), 'w') as csvf:
csvfile = csv.writer(csvf)
csvfile.writerow(["Word","Plural","Translation","Category","Lexical Category","Notes","Pronunciation", "Recording", "Original Recording URL"])
for word in self.languages[str(language)]["words"]:
csvfile.writerow([word.word,word.plural,word.translation,word.category,word.lexical_category, word.notes,word.pronunciation, word.word.replace("/","%2F").replace("\\","_") + ".mp3" if word.recording_url else "" ,"https://www.firstvoices.com/nuxeo/" + word.recording_url if word.recording_url else ""])
print("Generating JSON dump.")
words_raw = {}
for word in self.languages[str(language)]["words"]:
words_raw[word.word] = word.raw_entry
with open(os.path.join(target_directory, "language_raw.json"), "w",encoding="utf8") as f:
f.write(json.dumps(words_raw, indent=2, sort_keys=True,ensure_ascii=False))
print("Downloading recordings.")
for word in self.languages[str(language)]["words"]:
audio = word.get_audio_dl()
if audio:
try:
filename = word.get_sanitized_filename()
with open(os.path.join(target_directory, filename), "wb+") as f:
with self.session.get(audio[1], stream=True) as s:
f.write(s.content)
if modify_mp3 and '.mp3' in filename:
try:
mp3 = ID3(os.path.join(target_directory, filename), translate=True, v2_version=4)
mp3.save()
mp3 = EasyID3(os.path.join(target_directory, filename))
original_filename = word.original_filename
mp3["title"] = word.word
comment = f"{word.word} -> {word.translation}\n({original_filename})"
if len(comment.encode("utf8")) >= 28:
comment = original_filename
mp3["comment"] = comment
mp3.save()
except:
pass
except:
print(f"Unable to download {audio[0]} {audio[1]} -> {filename}")
def download_word_data(self, word):
pass
class Word():
def __init__(self, word_obj):
self.raw_entry = word_obj
property_obj = word_obj["properties"]
self.lexical_category = property_obj["fv-word:part_of_speech"]
self.notes = "\n".join(property_obj["fv-word:notes"])
self.word = word_obj["title"]
self.plural = ", ".join(property_obj["fv-word:plural"]) if len(property_obj["fv-word:plural"]) > 0 else ""
self.plural = self.plural.strip()
self.cultural_note = ""
self.recording_source = ""
if len(property_obj["fv:cultural_note"]) != 0:
self.cultural_note = ", ".join(property_obj["fv:cultural_note"])
if len(self.notes.strip('\n')):
self.notes = ""
self.category = ""
self.translation = ""
context = word_obj["contextParameters"]["word"]
if 'categories' in context and len(context["categories"]) > 0:
self.category = ", " .join([x["dc:title"] for x in context["categories"]])
self.category = self.category.strip()
if self.category[-1] == ',':
self.category = self.category[:-1]
self.picture_url = None
if 'related_pictures' in context and len(context["related_pictures"]) > 0:
self.picture_url = context["related_pictures"][0]["path"]
self.recording_url = None
if 'related_audio' in context and len(context["related_audio"]) > 0:
self.recording_url = "https://www.firstvoices.com/" + context["related_audio"][0]["path"]
self.original_filename = self.recording_url.split("/")[-1]
self.filetype = self.original_filename.split(".")[-1]
if 'sources' in context and len(context['sources']) > 0:
self.recording_source = ", ".join([x["dc:title"] for x in context["sources"]])
if self.recording_source[-2:] == ", ":
self.recording_source = self.recording_source[:-2]
for translations in property_obj["fv:definitions"]:
if "en" in translations["language"]:
self.translation = translations["translation"]
self.pronunciation = property_obj["fv-word:pronunciation"] if property_obj["fv-word:pronunciation"] else ""
def get_audio_dl(self):
if self.recording_url == None:
return
return self.parse_dl(self.recording_url)
def get_sanitized_filename(self):
if self.recording_url == None:
return ""
return self.word.replace("/","%2F").replace("\\","_") + "." + self.filetype.replace(".","")
def get_picture_dl(self):
if self.picture_url is None:
return
return self.parse_dl(self.picture_url)
def parse_dl(self, path):
if path == None or len(path.strip()) == 0:
return None
real_filename = path.split('/')[-1]
download_url = '/'.join(path.split('/')[:-1])
download_url = "www.firstvoices.com/nuxeo/" + download_url
download_url = "https://" + download_url.replace("//","/")
return (real_filename, download_url)
class LanguageDefinition():
def __init__(self, language_family, subgroup, dialect, portal_id = None, path = None):
self.language_family = language_family
self.subgroup = subgroup
self.dialect = dialect
self.portal_id = portal_id
self.path = path
def __str__(self):
return f"{self.language_family}/{self.subgroup}/{self.dialect}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment