Skip to content

Instantly share code, notes, and snippets.

@adamsmith
Last active March 22, 2024 22:16
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamsmith/80cea13871aaafb0acf6e7658af537a3 to your computer and use it in GitHub Desktop.
Save adamsmith/80cea13871aaafb0acf6e7658af537a3 to your computer and use it in GitHub Desktop.
Automated Apple Notes transcription to Markdown file
import os
import datetime
import json
import pathlib
import secrets
import sqlite3
import subprocess
import time
# Description: This code continuously polls for new Apple Voice Memos. When a new one is found, it is transcribed using
# Whisper, and the transcript is prepended to a Markdown file, e.g. an Obsidian note document. In practice this works
# pretty quickly, i.e. on the order of seconds. With iCloud enabled, new memos reocrded on any of your devices will
# be processed.
#
# Quick tip: If you use an Apple Watch, add the Voice Memo "complication" to your watch face, so you can record new
# Voice Memos quickly (only two taps!)
#
# Requirements:
# - if you want to transcribe Apple Voice Memos from other devices, iCloud sync of voice memos must be ENABLED on all
# devices you record from, and on the device running this code
# - to transcribe, we use the whisper.cpp executable/binary, from https://github.com/ggerganov/whisper.cpp, please
# set `WHISPER_CPP_BINARY_FILEPATH` and `MODEL_FILEPATH` as appropriate. I use the `ggml-large.bin` model file (~3 GB).
# - requires the `sqlite` Python package
# - ffmpeg must be installed to `"/opt/homebrew/bin/ffmpeg"`, or you must change the path to the right one
#
# Notes:
# - transcripts of new voice memos are prepended to the Markdown file path in `NOTE_TO_SELF_FILEPATH_TO_PREPEND`;
# you must populate this to some Markdown file on your system, or modify the code that uses this variable to use whatever
# destination for transcribed notes that you'd like
# - all previously-existing voice memos will be processed the first time you run this code!
# - this code is written to run continuously in the background, but you can modify it to run once to process all new
# voice memos, and then quit by dropping the `while True` loop
# - check out the constants defined below to see the files created / used; some need to be configured, some you
# may change if you like
#
# Improvement ideas:
# - Use an LLM to clean up the formatting, fix transcription mistakes, and remove extra back-and-forth in dialogs (excessive
# "yeah"s when turn-taking), etc
# - There is probably a better way to remove non-speech tokens (e.g. `(silence)`) from Whisper output
# - A good bit of the latency of processing voice memos comes from loading the Whisper model, which we do for each
# audio file; we could load it once and keep it in memory to reduce latency, but increase memory usage and complexity
# YOU *MUST* CHANGE THESE FILEPATH VARIABLES
NOTE_TO_SELF_FILEPATH_TO_PREPEND = "FILL ME IN"
WHISPER_CPP_BINARY_FILEPATH = "/Users/adam/Dropbox/dev/assets/adam_whisper/whisper.cpp-1.3.0/whisper_main"
MODEL_FILEPATH = "/Users/adam/Dropbox/dev/assets/adam_whisper/models/ggml-large.bin"
# you *may* change these filepath variables, if you want to
PROCESSED_JSON_FILE_PATH = os.path.join(os.path.expanduser("~"), ".apple_voice_memos_processed.json")
TEMP_DIR = os.path.join(os.path.expanduser("~"), ".tmp")
def main():
if not os.path.exists(NOTE_TO_SELF_FILEPATH_TO_PREPEND):
raise Exception(f"The filepath in `NOTE_TO_SELF_FILEPATH_TO_PREPEND` does not exist, if it's right then `touch` it: {NOTE_TO_SELF_FILEPATH_TO_PREPEND}")
if not os.path.exists(PROCESSED_JSON_FILE_PATH):
atomic_file_write(PROCESSED_JSON_FILE_PATH, json.dumps([]))
os.makedirs(TEMP_DIR, exist_ok=True)
voice_memos_sqlite_path = os.path.join(os.path.expanduser("~"), "Library", "Application Support",
"com.apple.voicememos", "Recordings", "CloudRecordings.db")
if not os.access(voice_memos_sqlite_path, os.R_OK):
raise Exception(f"No permission to read database file: {voice_memos_sqlite_path}")
APPLE_TIME_OFFSET = 978307200.825232 # offset between datetime starts to count (1.1.1970) and Apple starts to count (1.1.2001)
while True:
with sqlite3.connect(voice_memos_sqlite_path) as conn:
cur = conn.cursor()
cur.execute("SELECT ZDATE, ZDURATION, ZCUSTOMLABEL, ZPATH FROM ZCLOUDRECORDING ORDER BY ZDATE")
rows = cur.fetchall()
with open(PROCESSED_JSON_FILE_PATH, "rt") as f:
processed = json.load(f)
now_ts = time.time()
for row in rows:
recorded_at_ts = row[0] + APPLE_TIME_OFFSET
duration = datetime.timedelta(seconds=row[1])
# label = row[2]
path = row[3]
# skip really old, or currently-downloading, recordings
if not path:
continue
if not path.endswith(".m4a"):
raise Exception(f"Voice memo filepath does not end in `.m4a`: {path}")
# have we "processed" this voice memo already?
if any(p["timestamp_int"] == int(recorded_at_ts) for p in processed):
continue
# transcribe with Whisper, add to top of Markdown file
print(f"Transcribing Apple voice memo and prepending to Markdown file (ts={int(recorded_at_ts)})...")
transcription_speaker_segments = transcribe(path)
if "".join(transcription_speaker_segments):
# if there is a single sentence, strip off the trailing "."
for ix, tss in enumerate(transcription_speaker_segments):
if tss.count("."):
transcription_speaker_segments[ix] = tss.strip(".")
note_timestamp_markdown = datetime.datetime.fromtimestamp(recorded_at_ts).strftime("%Y-%m-%d %-I:%M%p").lower()
link_and_duration_markdown = f"""([recording]({pathlib.Path(path).as_uri()}){", " + timedelta_human_readable(duration.total_seconds()) if duration.total_seconds() > 30 else ""})"""
if len(transcription_speaker_segments) == 1:
markdown = f"* {note_timestamp_markdown}: {transcription_speaker_segments[0]} {link_and_duration_markdown}"
else:
markdown = "\n\t* ".join([f"* {note_timestamp_markdown} transcript {link_and_duration_markdown}"] +
transcription_speaker_segments)
with open(NOTE_TO_SELF_FILEPATH_TO_PREPEND, "rt") as f:
original_contents = f.read()
new_contents = f"{markdown}\n\n{original_contents}"
atomic_file_write(NOTE_TO_SELF_FILEPATH_TO_PREPEND, new_contents)
else:
print("Transcript was empty, so not prepending into file")
# write `processed.json` **after each completed file**
# this used to not be atomic, and once I found this file wiped, so we really should use an
# atomic write
processed.append({ "timestamp_int": int(recorded_at_ts), "processed_at": now_ts,
"total_seconds": duration.total_seconds() })
atomic_file_write(PROCESSED_JSON_FILE_PATH, json.dumps(processed))
time.sleep(1)
# --------------------------------------------------------------------
# Adam library functions
# --------------------------------------------------------------------
def pad_number(n, num_digits_before_fraction, num_seconds_fraction_digits=0):
if num_seconds_fraction_digits == 0:
return format(int(n), f"0{num_digits_before_fraction}")
else:
return ("{0:0" + str(num_digits_before_fraction + num_seconds_fraction_digits + 1)
+ "." + str(num_seconds_fraction_digits) + "f}").format(n)
def timedelta_human_readable(total_s, num_seconds_fraction_digits=0, show_seconds=True):
# if total_s is < 30, we return seconds, even if show_seconds is False
if not show_seconds and num_seconds_fraction_digits > 0:
raise Exception("Non-sensical args")
if total_s is None:
return None
total_s_was_negative = (total_s < 0)
if total_s < 0:
total_s *= -1
m, s = divmod(total_s, 60)
m = int(m)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
if not show_seconds and s >= 30:
m += 1
return (
("-" if total_s_was_negative else "") +
(f"{d}d " if d else "") +
(f"{pad_number(h, 2 if total_s >= 24*3600 else 1)}h " if total_s >= 3600 else "") +
(f"{pad_number(m, 2 if total_s >= 3600 else 1)}m " if total_s >= 60 else "") +
# we add `or total_s < 30` in the line below as special case so we still return `"Xs"` instead of `""` for the `total_s<30 and not show_seconds` case
(f"{pad_number(s, 2 if total_s >= 60 else 1, num_seconds_fraction_digits)}s" if (show_seconds or total_s < 30) else "")).strip()
def atomic_file_write(filepath, contents, m_and_a_times_ns=None):
if not os.path.exists(os.path.dirname(filepath)):
raise Exception(f"Containing folder does not exist: {os.path.dirname(filepath)}")
if not (isinstance(contents, bytes) or isinstance(contents, str)):
raise Exception("Expecting `bytes` or `str` contents")
if not os.path.exists(TEMP_DIR):
raise Exception("Temp dir does not exist")
temp_filepath = os.path.join(TEMP_DIR, get_unique_temp_filename())
if os.path.exists(temp_filepath):
raise Exception("Very unlikely since time.time() is a float")
# check `m_and_a_times_ns` for errors before writing the temp file to disk
if m_and_a_times_ns is not None:
if not (isinstance(m_and_a_times_ns, tuple) and len(m_and_a_times_ns) == 2):
raise Exception("Expecting `m_and_a_times_ns` to be a tuple with two items")
if not all(isinstance(t, int) for t in m_and_a_times_ns):
raise Exception("Expecting int timestamps in `m_and_a_times_ns`")
if min(m_and_a_times_ns) < (datetime.datetime(1979, 1, 1).timestamp() * 1e9):
raise Exception("`m_and_a_times_ns` are earlier than 1979, when expressed as nanoseconds")
try:
with open(temp_filepath, "wb") as f:
f.write(contents if isinstance(contents, bytes) else contents.encode("utf-8"))
if m_and_a_times_ns is not None:
os.utime(temp_filepath, ns=(m_and_a_times_ns, m_and_a_times_ns))
os.replace(temp_filepath, filepath)
finally:
try:
os.remove(temp_filepath)
except OSError:
pass
def subprocess_run(to_run, raise_exception_on_nonzero_exit_code=True):
if not isinstance(to_run, list):
raise Exception("Expected command input as a list")
completed_process = subprocess.run(to_run, capture_output=True)
if raise_exception_on_nonzero_exit_code and completed_process.returncode != 0:
print(f"""stdout\n------\n{completed_process.stdout.decode("utf-8") if completed_process.stdout else ""}\n\n\n\nstderr\n------\n{completed_process.stderr.decode("utf-8") if completed_process.stderr else ""}""")
raise Exception(f"Non-zero return code from command {completed_process.args}: {completed_process.returncode}")
return completed_process.stdout.decode("utf-8")
def get_unique_temp_filename(filename_extension=None):
return datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S") + "--" + secrets.token_hex(8).lower() + (filename_extension or "")
# --------------------------------------------------------------------
# Transcription
# --------------------------------------------------------------------
def transcribe(input_filepath, print_transcript_lines_to_log=False):
if not os.path.exists(input_filepath):
raise Exception("Input file does not exist")
tmp_str = get_unique_temp_filename()
# convert to 16-bit wav
wav_filepath = os.path.join(TEMP_DIR, tmp_str + ".wav")
ffmpeg_command = ["/opt/homebrew/bin/ffmpeg",
"-i", input_filepath, "-ar", "16000", "-ac", "1", "-af", "dynaudnorm", "-c:a", "pcm_s16le", wav_filepath]
subprocess_run(ffmpeg_command)
# run whisper.cpp
try:
output_extension = ".json"
output_filepath = os.path.join(TEMP_DIR, tmp_str + output_extension)
output_filepath_without_extension = output_filepath[:-len(output_extension)] # oddly whisper.cpp adds the extension
# I tried running with 12 threads on my 10-core mbp M1 Max and it froze, so just using 8 threads
# you could probably get away with more on the Mac Studio
whisper_command = [WHISPER_CPP_BINARY_FILEPATH,
"--threads", "8", "--output-json", "--model", MODEL_FILEPATH, "--file", wav_filepath,
"--output-file", output_filepath_without_extension]
subprocess_run(whisper_command)
finally:
try:
os.remove(wav_filepath)
except OSError: pass
try:
# read and delete the output file
speaker_segments = [""]
with open(output_filepath, "rt") as f:
for row in json.load(f)["transcription"]:
if print_transcript_lines_to_log:
print(f"""Raw transcript line: `{row["text"]}`""")
line = row["text"].strip('" ')
line = line.replace("[BLANK_AUDIO]", "")
# it seems there's some historical context around non-speech tokens from Whisper, see:
# `non_speech_tokens()` @ https://github.com/openai/whisper/blob/main/whisper/tokenizer.py#LL236C16-L236C16
# https://github.com/rock3125/whisper.cpp/commit/a6bac7c32a7a7815291c0f3e67cd7fa56d0b7166
if line.startswith(">> "):
# not sure what this "means" from Whisper
line = line[len(">> "):]
if line.startswith("- "):
# new speaker segment
speaker_segments.append("")
line = line[len("- "):]
if print_transcript_lines_to_log:
print("Starting new speaker segment")
if line:
if len(speaker_segments[-1]) > 0:
speaker_segments[-1] = speaker_segments[-1] + " " + line
else:
speaker_segments[-1] = line # don't prepend a space
if print_transcript_lines_to_log:
print(f"Processed transcript line: `{line}`")
else:
if print_transcript_lines_to_log:
print("Processed line is empty, so not adding anything to the transcript")
finally:
try:
os.remove(output_filepath)
except OSError: pass
speaker_segments = [ss for ss in speaker_segments if ss]
speaker_segments = [ss[0].capitalize() + ss[1:] for ss in speaker_segments]
if print_transcript_lines_to_log:
print(f"Processed transcript: `{speaker_segments}`")
return speaker_segments # list of strings where each string is a new speaker speaking, roughly
# --------------------------------------------------------------------
# Call main
# --------------------------------------------------------------------
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment