Skip to content

Instantly share code, notes, and snippets.

@sanjarcode
Last active November 20, 2023 17:43
Show Gist options
  • Save sanjarcode/5ee62e10ad9aabe26ae04f1f119efaaa to your computer and use it in GitHub Desktop.
Save sanjarcode/5ee62e10ad9aabe26ae04f1f119efaaa to your computer and use it in GitHub Desktop.
Get transcripts of tree of .mp4 videos
## Get englisgh transcripts of a tree of .mp4 files
## Cost incurred: 200 videos of ~3 minutes each => around ~$5
## Other params: ~200 API calls; total ~40,000 seconds
## Cost estimate as per pricing chart: 683 min * 0.006 = ~4.098
## Ignores non mp4 files
## Adds .mp3 files for each file, and then calls Whisper API and stores the .txt
## Nothing is deleted
## Resumable
## Skips .mp3 generation it exists
## Skips transcript call if .txt exists
## `export OPENAI_API_KEY="sk-...."` added in .zshrc file
import os
import sys
from openai import OpenAI
client = OpenAI()
def getTranscript(filePath):
with open(filePath, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="text" # commentable
)
return transcript
## function to traverse over all .mp4 files in the folder and subfolders recursively
## each_file (file_path, folder_path)
def traverse_mp4_files(folder_path, do_each_file, sortLikeFileExplorer=False, debug=False):
mp4_files = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.endswith(".mp4"):
mp4_files.append(os.path.join(root, file))
if debug:
print(file)
else:
if do_each_file is not None:
do_each_file(os.path.join(root, file), folder_path)
if sortLikeFileExplorer:
return sort_file_paths(mp4_files)
else:
return mp4_files
def getMP4Transcript(filePath):
with open(filePath, "rb") as video_file:
filePathWithoutMp4 = filePath.replace(".mp4", "")
mp3Name=f"{filePathWithoutMp4}-temp.mp3"
transcript_file_path=f"{filePathWithoutMp4}.txt"
# generate mp3
mp3exists = 0 == os.system(f'ls "{mp3Name}"')
if not mp3exists:
print('Generating mp3 for', filePath)
os.system(f"ffmpeg -i '{filePath}' '{mp3Name}'")
else:
print('Skipping mp3 gen for', filePath)
# get and generate transcript
txtExists = 0 == os.system(f'ls "{transcript_file_path}"')
if not txtExists:
print('Calling transc API for', filePath)
transcript = getTranscript(mp3Name)
transcript_file_path=f"{filePathWithoutMp4}.txt"
with open(transcript_file_path, "w") as transcript_file:
transcript_file.write(transcript)
return transcript
else:
print('Skipping transc API call for', filePath)
with open(transcript_file_path, "r") as transcript_file:
transcript = transcript_file.read()
return transcript
## remove mp3
# os.system(f"rm '{mp3Name}'")
positive = 0
total = 0
negative = []
totalmp3 = 207
done = 0
def writeLongStringToFile(title, content, filePath = './done.md'):
try:
transcript = content
with open(filePath, "a") as storageFile:
# method 1
human_sentences = split_sentences(transcript)
human_transcript = '\n'.join([s.strip() for s in human_sentences if s.strip()])
transcript = f"""
File: `{title}`
Transcript:
```md
{human_transcript}
```
---
"""
storageFile.write(transcript)
except Exception as e:
print(e)
def getAndSaveTranscriptMP4(videoFilePath, rootPath, safeFilePath = './done.md'):
try:
transcript = getMP4Transcript(videoFilePath)
writeLongStringToFile(videoFilePath, transcript, safeFilePath)
global done
global totalmp3
print('Done', f'{done} / {totalmp3}', videoFilePath)
done+=1
except Exception as e:
print(e)
## utils
## sort array of file paths in a way a file explorer would alphabetically
def sort_file_paths(file_paths):
def custom_sort(path):
directory, filename = os.path.split(path)
return (directory.lower(), filename.lower())
return sorted(file_paths, key=custom_sort)
## Given a very large paragraph (as string), divides it into max 80 chars lines
## takes care to not break words between
def split_sentences(sentence):
if len(sentence) <= 80:
return [sentence]
sentences = []
current_sentence = ""
words = sentence.split()
for word in words:
if len(current_sentence) + len(word) + 1 <= 80: # 1 for the space
if current_sentence:
current_sentence += " "
current_sentence += word
else:
sentences.append(current_sentence)
current_sentence = word
if current_sentence:
sentences.append(current_sentence)
return sentences
if __name__ == "__main__":
rootPath = sys.argv[1]
# Work on the node
# traverse_mp4_files(rootPath, getAndSaveTranscriptMP4)
# Run after top has been done, to collate transcripts in alphabetical (file) order
sorted_files = traverse_mp4_files(rootPath, None, True)
for file in sorted_files:
getAndSaveTranscriptMP4(file, rootPath)
## Get english transcripts of a tree of .mp4 files
## Ignores non mp4 files in the tree
## Adds .mp3 files for each file, using `ffmpeg -i`, and then calls Whisper API and stores the .txt
## Nothing is deleted
## Resumable
## Skips .mp3 generation if exists
## Skips transcript call if .txt exists
## `export OPENAI_API_KEY="sk-...."` added in .zshrc file
## Usage: `python video2Text.py path-to-folder`
## Example: `python video2Text.py ~/my-files/large-store/`
## Cost incurred: 200 videos of ~3 minutes each => around ~$5
## Other params: ~200 API calls; total ~40,000 seconds
## Cost estimate as per pricing chart: 683 min * 0.006 = ~4.098
## Time taken ~30 minutes
## Conclusion: too expensive for India, and non-dollar locales
## Workaround: try with local model
import os
from openai import OpenAI
client = OpenAI()
def getTranscript(filePath):
with open(filePath, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="text" # commentable
)
return transcript
## function to traverse over all .mp4 files in the folder and subfolders recursively
## each_file (file_path, folder_path)
def traverse_mp4_files(folder_path, each_file, debug=False):
mp4_files = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.endswith(".mp4"):
mp4_files.append(os.path.join(root, file))
if debug:
print(file)
else:
each_file(os.path.join(root, file), folder_path)
return mp4_files
def getMP4Transcript(filePath, root):
with open(filePath, "rb") as video_file:
filePathWithoutMp4 = filePath.replace(".mp4", "")
mp3Name=f"{filePathWithoutMp4}-temp.mp3"
transcript_file_path=f"{filePathWithoutMp4}.txt"
# generate mp3
mp3exists = 0 == os.system(f'ls "{mp3Name}"')
if not mp3exists:
print('Generating mp3 for', filePath)
os.system(f"ffmpeg -i '{filePath}' '{mp3Name}'")
else:
print('Skipping mp3 gen for', filePath)
# get and generate transcript
txtExists = 0 == os.system(f'ls "{transcript_file_path}"')
if not txtExists:
print('Calling transc API for', filePath)
transcript = getTranscript(mp3Name)
transcript_file_path=f"{filePathWithoutMp4}.txt"
with open(transcript_file_path, "w") as transcript_file:
transcript_file.write(transcript)
return transcript
else:
print('Skipping transc API call for', filePath)
with open(transcript_file_path, "r") as transcript_file:
transcript = transcript_file.read()
return transcript
## remove mp3
# os.system(f"rm '{mp3Name}'")
positive = 0
total = 0
negative = []
totalmp3 = 207
done = 0
def f(*args):
filePath, root = args
try:
transcript = getMP4Transcript(filePath, root)
# safety stash, just in case
with open('./api-done.txt', "a") as transcript_file:
transcript_file.write(transcript)
transcript_file.write('------')
transcript_file.write('------')
transcript_file.write('------')
global done
global totalmp3
print('Done', f'{done} / {totalmp3}', filePath)
done+=1
except Exception as e:
print(e)
import sys
traverse_mp4_files(sys.argv[1], f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment