Automated Apple Notes transcription to Markdown file
import os
import datetime
import json
import pathlib
import secrets
import sqlite3
import subprocess
import time
# Description: This code continuously polls for new Apple Voice Memos. When a new one is found, it is transcribed using
# Whisper, and the transcript is prepended to a Markdown file, e.g. an Obsidian note document. In practice this works
# pretty quickly, i.e. on the order of seconds. With iCloud enabled, new memos reocrded on any of your devices will
# be processed.
# Quick tip: If you use an Apple Watch, add the Voice Memo "complication" to your watch face, so you can record new
# Voice Memos quickly (only two taps!)
# Requirements:
# - if you want to transcribe Apple Voice Memos from other devices, iCloud sync of voice memos must be ENABLED on all
# devices you record from, and on the device running this code
# - to transcribe, we use the whisper.cpp executable/binary, from, please
# set `WHISPER_CPP_BINARY_FILEPATH` and `MODEL_FILEPATH` as appropriate. I use the `ggml-large.bin` model file (~3 GB).
# - requires the `sqlite` Python package
# - ffmpeg must be installed to `"/opt/homebrew/bin/ffmpeg"`, or you must change the path to the right one
# Notes:
# - transcripts of new voice memos are prepended to the Markdown file path in `NOTE_TO_SELF_FILEPATH_TO_PREPEND`;
# you must populate this to some Markdown file on your system, or modify the code that uses this variable to use whatever
# destination for transcribed notes that you'd like
# - all previously-existing voice memos will be processed the first time you run this code!
# - this code is written to run continuously in the background, but you can modify it to run once to process all new
# voice memos, and then quit by dropping the `while True` loop
# - check out the constants defined below to see the files created / used; some need to be configured, some you
# may change if you like
# Improvement ideas:
# - Use an LLM to clean up the formatting, fix transcription mistakes, and remove extra back-and-forth in dialogs (excessive
# "yeah"s when turn-taking), etc
# - There is probably a better way to remove non-speech tokens (e.g. `(silence)`) from Whisper output
# - A good bit of the latency of processing voice memos comes from loading the Whisper model, which we do for each
# audio file; we could load it once and keep it in memory to reduce latency, but increase memory usage and complexity
WHISPER_CPP_BINARY_FILEPATH = "/Users/adam/Dropbox/dev/assets/adam_whisper/whisper.cpp-1.3.0/whisper_main"
MODEL_FILEPATH = "/Users/adam/Dropbox/dev/assets/adam_whisper/models/ggml-large.bin"
# you *may* change these filepath variables, if you want to
PROCESSED_JSON_FILE_PATH = os.path.join(os.path.expanduser("~"), ".apple_voice_memos_processed.json")
TEMP_DIR = os.path.join(os.path.expanduser("~"), ".tmp")
def main():
if not os.path.exists(NOTE_TO_SELF_FILEPATH_TO_PREPEND):
raise Exception(f"The filepath in `NOTE_TO_SELF_FILEPATH_TO_PREPEND` does not exist, if it's right then `touch` it: {NOTE_TO_SELF_FILEPATH_TO_PREPEND}")
if not os.path.exists(PROCESSED_JSON_FILE_PATH):
atomic_file_write(PROCESSED_JSON_FILE_PATH, json.dumps([]))
os.makedirs(TEMP_DIR, exist_ok=True)
voice_memos_sqlite_path = os.path.join(os.path.expanduser("~"), "Library", "Application Support",
"", "Recordings", "CloudRecordings.db")
if not os.access(voice_memos_sqlite_path, os.R_OK):
raise Exception(f"No permission to read database file: {voice_memos_sqlite_path}")
APPLE_TIME_OFFSET = 978307200.825232 # offset between datetime starts to count (1.1.1970) and Apple starts to count (1.1.2001)
while True:
with sqlite3.connect(voice_memos_sqlite_path) as conn:
cur = conn.cursor()
rows = cur.fetchall()
with open(PROCESSED_JSON_FILE_PATH, "rt") as f:
processed = json.load(f)
now_ts = time.time()
for row in rows:
recorded_at_ts = row[0] + APPLE_TIME_OFFSET
duration = datetime.timedelta(seconds=row[1])
# label = row[2]
path = row[3]
# skip really old, or currently-downloading, recordings
if not path:
if not path.endswith(".m4a"):
raise Exception(f"Voice memo filepath does not end in `.m4a`: {path}")
# have we "processed" this voice memo already?
if any(p["timestamp_int"] == int(recorded_at_ts) for p in processed):
# transcribe with Whisper, add to top of Markdown file
print(f"Transcribing Apple voice memo and prepending to Markdown file (ts={int(recorded_at_ts)})...")
transcription_speaker_segments = transcribe(path)
if "".join(transcription_speaker_segments):
# if there is a single sentence, strip off the trailing "."
for ix, tss in enumerate(transcription_speaker_segments):
if tss.count("."):
transcription_speaker_segments[ix] = tss.strip(".")
note_timestamp_markdown = datetime.datetime.fromtimestamp(recorded_at_ts).strftime("%Y-%m-%d %-I:%M%p").lower()
link_and_duration_markdown = f"""([recording]({pathlib.Path(path).as_uri()}){", " + timedelta_human_readable(duration.total_seconds()) if duration.total_seconds() > 30 else ""})"""
if len(transcription_speaker_segments) == 1:
markdown = f"* {note_timestamp_markdown}: {transcription_speaker_segments[0]} {link_and_duration_markdown}"
markdown = "\n\t* ".join([f"* {note_timestamp_markdown} transcript {link_and_duration_markdown}"] +
with open(NOTE_TO_SELF_FILEPATH_TO_PREPEND, "rt") as f:
original_contents =
new_contents = f"{markdown}\n\n{original_contents}"
atomic_file_write(NOTE_TO_SELF_FILEPATH_TO_PREPEND, new_contents)
print("Transcript was empty, so not prepending into file")
# write `processed.json` **after each completed file**
# this used to not be atomic, and once I found this file wiped, so we really should use an
# atomic write
processed.append({ "timestamp_int": int(recorded_at_ts), "processed_at": now_ts,
"total_seconds": duration.total_seconds() })
atomic_file_write(PROCESSED_JSON_FILE_PATH, json.dumps(processed))
# --------------------------------------------------------------------
# Adam library functions
# --------------------------------------------------------------------
def pad_number(n, num_digits_before_fraction, num_seconds_fraction_digits=0):
if num_seconds_fraction_digits == 0:
return format(int(n), f"0{num_digits_before_fraction}")
return ("{0:0" + str(num_digits_before_fraction + num_seconds_fraction_digits + 1)
+ "." + str(num_seconds_fraction_digits) + "f}").format(n)
def timedelta_human_readable(total_s, num_seconds_fraction_digits=0, show_seconds=True):
# if total_s is < 30, we return seconds, even if show_seconds is False
if not show_seconds and num_seconds_fraction_digits > 0:
raise Exception("Non-sensical args")
if total_s is None:
return None
total_s_was_negative = (total_s < 0)
if total_s < 0:
total_s *= -1
m, s = divmod(total_s, 60)
m = int(m)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
if not show_seconds and s >= 30:
m += 1
return (
("-" if total_s_was_negative else "") +
(f"{d}d " if d else "") +
(f"{pad_number(h, 2 if total_s >= 24*3600 else 1)}h " if total_s >= 3600 else "") +
(f"{pad_number(m, 2 if total_s >= 3600 else 1)}m " if total_s >= 60 else "") +
# we add `or total_s < 30` in the line below as special case so we still return `"Xs"` instead of `""` for the `total_s<30 and not show_seconds` case
(f"{pad_number(s, 2 if total_s >= 60 else 1, num_seconds_fraction_digits)}s" if (show_seconds or total_s < 30) else "")).strip()
def atomic_file_write(filepath, contents, m_and_a_times_ns=None):
if not os.path.exists(os.path.dirname(filepath)):
raise Exception(f"Containing folder does not exist: {os.path.dirname(filepath)}")
if not (isinstance(contents, bytes) or isinstance(contents, str)):
raise Exception("Expecting `bytes` or `str` contents")
if not os.path.exists(TEMP_DIR):
raise Exception("Temp dir does not exist")
temp_filepath = os.path.join(TEMP_DIR, get_unique_temp_filename())
if os.path.exists(temp_filepath):
raise Exception("Very unlikely since time.time() is a float")
# check `m_and_a_times_ns` for errors before writing the temp file to disk
if m_and_a_times_ns is not None:
if not (isinstance(m_and_a_times_ns, tuple) and len(m_and_a_times_ns) == 2):
raise Exception("Expecting `m_and_a_times_ns` to be a tuple with two items")
if not all(isinstance(t, int) for t in m_and_a_times_ns):
raise Exception("Expecting int timestamps in `m_and_a_times_ns`")
if min(m_and_a_times_ns) < (datetime.datetime(1979, 1, 1).timestamp() * 1e9):
raise Exception("`m_and_a_times_ns` are earlier than 1979, when expressed as nanoseconds")
with open(temp_filepath, "wb") as f:
f.write(contents if isinstance(contents, bytes) else contents.encode("utf-8"))
if m_and_a_times_ns is not None:
os.utime(temp_filepath, ns=(m_and_a_times_ns, m_and_a_times_ns))
os.replace(temp_filepath, filepath)
except OSError:
def subprocess_run(to_run, raise_exception_on_nonzero_exit_code=True):
if not isinstance(to_run, list):
raise Exception("Expected command input as a list")
completed_process =, capture_output=True)
if raise_exception_on_nonzero_exit_code and completed_process.returncode != 0:
print(f"""stdout\n------\n{completed_process.stdout.decode("utf-8") if completed_process.stdout else ""}\n\n\n\nstderr\n------\n{completed_process.stderr.decode("utf-8") if completed_process.stderr else ""}""")
raise Exception(f"Non-zero return code from command {completed_process.args}: {completed_process.returncode}")
return completed_process.stdout.decode("utf-8")
def get_unique_temp_filename(filename_extension=None):
return"%Y-%m-%d--%H-%M-%S") + "--" + secrets.token_hex(8).lower() + (filename_extension or "")
# --------------------------------------------------------------------
# Transcription
# --------------------------------------------------------------------
def transcribe(input_filepath, print_transcript_lines_to_log=False):
if not os.path.exists(input_filepath):
raise Exception("Input file does not exist")
tmp_str = get_unique_temp_filename()
# convert to 16-bit wav
wav_filepath = os.path.join(TEMP_DIR, tmp_str + ".wav")
ffmpeg_command = ["/opt/homebrew/bin/ffmpeg",
"-i", input_filepath, "-ar", "16000", "-ac", "1", "-af", "dynaudnorm", "-c:a", "pcm_s16le", wav_filepath]
# run whisper.cpp
output_extension = ".json"
output_filepath = os.path.join(TEMP_DIR, tmp_str + output_extension)
output_filepath_without_extension = output_filepath[:-len(output_extension)] # oddly whisper.cpp adds the extension
# I tried running with 12 threads on my 10-core mbp M1 Max and it froze, so just using 8 threads
# you could probably get away with more on the Mac Studio
"--threads", "8", "--output-json", "--model", MODEL_FILEPATH, "--file", wav_filepath,
"--output-file", output_filepath_without_extension]
except OSError: pass
# read and delete the output file
speaker_segments = [""]
with open(output_filepath, "rt") as f:
for row in json.load(f)["transcription"]:
if print_transcript_lines_to_log:
print(f"""Raw transcript line: `{row["text"]}`""")
line = row["text"].strip('" ')
line = line.replace("[BLANK_AUDIO]", "")
# it seems there's some historical context around non-speech tokens from Whisper, see:
# `non_speech_tokens()` @
if line.startswith(">> "):
# not sure what this "means" from Whisper
line = line[len(">> "):]
if line.startswith("- "):
# new speaker segment
line = line[len("- "):]
if print_transcript_lines_to_log:
print("Starting new speaker segment")
if line:
if len(speaker_segments[-1]) > 0:
speaker_segments[-1] = speaker_segments[-1] + " " + line
speaker_segments[-1] = line # don't prepend a space
if print_transcript_lines_to_log:
print(f"Processed transcript line: `{line}`")
if print_transcript_lines_to_log:
print("Processed line is empty, so not adding anything to the transcript")
except OSError: pass
speaker_segments = [ss for ss in speaker_segments if ss]
speaker_segments = [ss[0].capitalize() + ss[1:] for ss in speaker_segments]
if print_transcript_lines_to_log:
print(f"Processed transcript: `{speaker_segments}`")
return speaker_segments # list of strings where each string is a new speaker speaking, roughly
# --------------------------------------------------------------------
# Call main
# --------------------------------------------------------------------
if __name__ == '__main__':
