Skip to content

Instantly share code, notes, and snippets.

@leoherzog
Last active April 23, 2024 21:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leoherzog/f6ecb6d470ac24741ca9072a069f1b93 to your computer and use it in GitHub Desktop.
Save leoherzog/f6ecb6d470ac24741ca9072a069f1b93 to your computer and use it in GitHub Desktop.
Bulk Download or Delete Zoom Recordings
#!/usr/bin/python3
#
# call main.py --user <user email> --download --delete
#
import os
import argparse
import json
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
import time
import requests
from requests_oauthlib import OAuth2Session
import oauthlib.oauth2.rfc6749.errors
CLIENT_ID = 'clientidclientid'
CLIENT_SECRET = 'clientsecretclientsecret'
REDIRECT_URI = 'https://localhost:8000/callback'
TOKEN_FILE = 'token.json'
AUTHORIZATION_URL = 'https://zoom.us/oauth/authorize'
TOKEN_URL = 'https://zoom.us/oauth/token'
END = datetime.now()
START = END - timedelta(days=5*365)
def get_authorization_url():
oauth2_session = OAuth2Session(CLIENT_ID, redirect_uri=REDIRECT_URI)
authorization_url, _ = oauth2_session.authorization_url(AUTHORIZATION_URL)
return authorization_url
def fetch_zoom_token(authorization_response):
oauth2_session = OAuth2Session(CLIENT_ID, redirect_uri=REDIRECT_URI)
token = oauth2_session.fetch_token(TOKEN_URL, authorization_response=authorization_response, client_secret=CLIENT_SECRET)
with open(TOKEN_FILE, 'w') as f:
json.dump(token, f)
return token
def load_token():
try:
with open(TOKEN_FILE, 'r') as f:
return json.load(f)
except FileNotFoundError:
return None
def refresh_zoom_token(token):
extra = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}
oauth2_session = OAuth2Session(CLIENT_ID, token=token)
new_token = oauth2_session.refresh_token(TOKEN_URL, **extra)
with open(TOKEN_FILE, 'w') as f:
json.dump(new_token, f)
return new_token
def is_token_about_to_expire(token):
expires_at = token.get("expires_at", 0) - 60 # within 60 seconds
current_time = time.time()
return current_time >= expires_at
def list_meetings_with_recordings(access_token, user_id, start_date=START, end_date=END, page_size=300):
all_meetings = []
current_date = start_date
while current_date <= end_date:
next_month_date = current_date + timedelta(days=30)
if next_month_date > end_date:
next_month_date = end_date
from_date_str = current_date.strftime('%Y-%m-%d')
to_date_str = next_month_date.strftime('%Y-%m-%d')
next_page_token = ''
while True:
base_url = f"https://api.zoom.us/v2/users/{user_id}/recordings?page_size={page_size}"
params = {
"from": from_date_str,
"to": to_date_str,
"next_page_token": next_page_token
}
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.get(base_url, headers=headers, params=params)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 10))
print(f"Rate limit exceeded. Retrying in {retry_after} seconds.")
time.sleep(retry_after)
continue
json_response = response.json()
this_page_meetings = json_response.get('meetings', [])
all_meetings.extend(this_page_meetings)
next_page_token = json_response.get('next_page_token', None)
if not next_page_token:
break
current_date = next_month_date + timedelta(days=1)
return all_meetings
def download_single_recording(access_token, meeting_path, meeting_topic, file_info):
download_url = file_info.get('download_url')
recording_start = file_info.get('recording_start', '').replace(':', '-')
recording_type = file_info.get('recording_type')
file_extension = file_info.get('file_extension')
if download_url:
file_name = f"{meeting_topic} ({recording_start} {recording_type}).{file_extension}"
file_path = os.path.join(meeting_path, file_name)
headers = {
"Authorization": f"Bearer {access_token}"
}
if os.path.exists(file_path):
r_head = requests.head(download_url, headers=headers, allow_redirects=True)
reported_size = int(r_head.headers.get('Content-Length', 0))
existing_file_size = os.path.getsize(file_path)
if existing_file_size == reported_size:
print(f"File {file_name} already exists. Verified.")
return
else:
print(f"Existing file {file_name} mismatch ({existing_file_size}/{reported_size}). Redownloading...")
with requests.get(download_url, headers=headers, stream=True) as r:
if r.status_code == 200:
with open(file_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Download complete: {file_name}")
else:
print(f"Failed to download {file_name}")
def trash_all_recordings(access_token, recordings):
for meeting in recordings:
meeting_name = meeting.get('topic', 'Unknown Topic')
meeting_id = meeting.get('uuid')
if meeting_id:
trash_recording_url = f"https://api.zoom.us/v2/meetings/{meeting_id}/recordings?action=trash"
headers = {
"Authorization": f"Bearer {access_token}"
}
while True:
response = requests.delete(trash_recording_url, headers=headers)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 10))
print(f"Rate limit exceeded while trying to trash '{meeting_name}'. Retrying in {retry_after} seconds.")
time.sleep(retry_after)
continue
if response.status_code == 200 or response.status_code == 204:
print(f"Successfully trashed recordings for '{meeting_name}'")
break
else:
print(f"Failed to trash recordings for '{meeting_name}'. Status Code: {response.status_code}")
break
def parse_args():
parser = argparse.ArgumentParser(description='Zoom Cloud Recordings management.')
parser.add_argument('--user', type=str, help='The email ID of the user.')
parser.add_argument('--download', action='store_true', help='Flag to download recordings.')
parser.add_argument('--delete', action='store_true', help='Flag to delete recordings.')
args = parser.parse_args()
if args.user is None:
args.user = input("Please enter the user email: ")
while not args.user.strip():
args.user = input("Email cannot be empty. Please enter the user email: ")
return args
def main():
args = parse_args()
print("Refreshing token...")
token = load_token()
if token is None:
print(f"Visit this URL to authorize the application: {get_authorization_url()}")
authorization_response = input("Enter the full callback URL: ")
token = fetch_zoom_token(authorization_response)
else:
try:
token = refresh_zoom_token(token)
except oauthlib.oauth2.rfc6749.errors.InvalidGrantError:
print(f"Visit this URL to authorize the application: {get_authorization_url()}")
authorization_response = input("Enter the full callback URL: ")
fetch_zoom_token(authorization_response)
print("Fetching recordings...")
meetings = list_meetings_with_recordings(token['access_token'], args.user)
total_meetings = len(meetings)
print(f"Found {total_meetings} meetings with {sum([len(meeting.get('recording_files', [])) for meeting in meetings])} recordings.")
if args.download:
print('Starting download of recordings...')
with ThreadPoolExecutor(max_workers=4) as executor:
for meeting in meetings:
if is_token_about_to_expire(token):
print("Token is about to expire. Refreshing...")
token = refresh_zoom_token(token)
recording_files = meeting.get('recording_files', [])
meeting_topic = meeting.get('topic', 'Unknown Topic').replace('/', '-')
meeting_date = meeting.get('start_time', '').split('T')[0]
meeting_folder = f"{meeting_topic} ({meeting_date})"
meeting_path = os.path.join(args.user, meeting_folder)
if not os.path.exists(meeting_path):
os.makedirs(meeting_path)
executor.map(lambda recording_info, mp=meeting_path, mt=meeting_topic: download_single_recording(token['access_token'], mp, mt, recording_info), recording_files)
if args.delete:
confirm = input('Are you sure you want to trash all recordings? This action is irreversible. (y/n): ')
if confirm.lower() == 'y':
print('Starting trashing of recordings...')
trash_all_recordings(token['access_token'], meetings)
else:
print('Trashing of recordings cancelled.')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment