Skip to content

Instantly share code, notes, and snippets.

@Gabryxx7
Last active December 22, 2023 17:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Gabryxx7/b66ff1e437fc1a82ff23827656d49efd to your computer and use it in GitHub Desktop.
Save Gabryxx7/b66ff1e437fc1a82ff23827656d49efd to your computer and use it in GitHub Desktop.
OneDrive automated files downloader
{
"main_folder_link": "https://unimelbcloud-my.sharepoint.com/:f:/g/personal/marinig_student_unimelb_edu_au/xxxxxxx....",
"root_folder_list_link": "https://unimelbcloud-my.sharepoint.com/personal/deys_student_unimelb_edu_au/_api/web/GetListUsingPath(DecodedUrl=@a1)/RenderListDataAsStream?@a1=%27%2Fpersonal%2Ffolder%5Fgabry...",
"downloads_folderBAK": "./downloads/",
"downloads_folder": "/home/marinig/downloads/",
"zip_main_link": "https://australiaeast1-mediap.svc.ms/transform/zip?cs=",
"zip_headers": {
"authority": "australiaeast1-mediap.svc.ms",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "en-AU,en;q=0.9",
"cache-control": "max-age=0",
"content-type": "application/x-www-form-urlencoded",
"dnt": "1",
"origin": "https://unimelbcloud-my.sharepoint.com",
"sec-ch-ua": "\"Chromium\";v=\"112\", \"Microsoft Edge\";v=\"112\", \"Not:A-Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"macOS\"",
"sec-fetch-dest": "iframe",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "cross-site",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.23"
},
"set_cookie": {
"request": {
"type": "GET",
"url": "https://unimelbcloud-my.sharepoint.com/:f:/g/personal/deys_student_unimelb_edu_au/ErtQSc3ZoEBHohPSxKPvY0QBH_GFzZTyCgLQdawmJ3xxAg?e=aGwOec",
"headers": {
"authority": "unimelbcloud-my.sharepoint.com",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "en-AU,en;q=0.9",
"dnt": "1",
"sec-ch-ua": "\"Chromium\";v=\"112\", \"Microsoft Edge\";v=\"112\", \"Not:A-Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"macOS\"",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.23"
},
"payload": {}
}
},
"get_files_list": {
"request": {
"type": "POST",
"url": "https://unimelbcloud-my.sharepoint.com/personal/marinig_student_unimelb_edu_au/_api/web/GetListUsingPath(DecodedUrl=@a1)/RenderListDataAsStream?@a1=%27%2Fpersonal%2Fmarini...",
"parameters": "&TryNewExperienceSingle=TRUE",
"headers": {
"authority": "unimelbcloud-my.sharepoint.com",
"accept": "application/json;odata=verbose",
"content-type": "application/json;odata=verbose",
"origin": "https://unimelbcloud-my.sharepoint.com",
"referrer": "https://unimelbcloud-my.sharepoint.com/personal/deys_student_unimelb_edu_au/_layouts/15/onedrive.aspx?id=%2Fpersonal%2Fdeys%5Fstudent%5Funimelb%5Fedu%5Fau%2FDocuments%2F1%2E%20ARDC%20Pathzz%2FJess&ga=1",
"accept-language": "en-AU,en;q=0.9",
"dnt": "1",
"sec-ch-ua": "\"Chromium\";v=\"112\", \"Microsoft Edge\";v=\"112\", \"Not:A-Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"macOS\"",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.23"
},
"payload": "{\"parameters\":{\"__metadata\":{\"type\":\"SP.RenderListDataParameters\"},\"RenderOptions\":5445383,\"AllowMultipleValueFilterForTaxonomyFields\":true,\"AddRequiredFields\":true,\"FilterOutChannelFoldersInDefaultDocLib\":true}}"
}
},
}
import requests
import json
import zipfile
import io
import os
from tqdm import tqdm
import math
import urllib.parse
def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
class OneDriveDownloader:
def __init__(self, config_file):
self.session = requests.Session()
self.drive_access_token = None
self.callerStack = None
self.correlationId = None
self.config = {}
self.responses = []
self.files_keys = ['ID', 'UniqueId', 'SMTotalSize', 'FileLeafRef', '.spItemUrl', 'ItemChildCount', 'FolderChildCount']
self.files_list = None
self.downloads_path = "./downloads/"
self.requests_path = "./requests_log/"
self.current_folder = None
with open(config_file) as f:
self.config = json.load(f)
self.downloads_path = self.config.get('downloads_folder', self.downloads_path)
if not os.path.exists(self.requests_path):
os.makedirs(self.requests_path)
if not os.path.exists(self.downloads_path):
os.makedirs(self.downloads_path)
def get_session_cookies(self):
return(self.session.cookies.get_dict())
def send_default_request(self, req_data, req_key):
url = req_data['url']
url += req_data.get('parameters', '')
req_type = req_data.get('type', 'GET')
headers = req_data.get('headers', {})
payload = req_data.get('payload', {})
stream = req_data.get('stream', False)
return self.send_request(req_type, url, headers, payload, stream, req_key)
def send_request(self, req_type, url, headers, payload, stream, extra=None, file_size=-1):
response = self.session.request(req_type, url, headers=headers, data=payload, stream=stream, allow_redirects=True)
response.raise_for_status()
if(stream):
filename = extra if extra is not None else f"File_req_{len(self.responses)}"
if file_size < 0:
total_size_in_bytes= int(response.headers.get('content-length', 0))
else:
total_size_in_bytes = file_size
block_size = 1024*1024 # 1 MiB
progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True)
# in_memory = io.BytesIO()
with open(f"{self.downloads_path}{filename}.zip", "wb") as f:
for data in response.iter_content(block_size):
progress_bar.update(len(data))
f.write(data)
f.flush()
# in_memory.write(data)
# in_memory_zip = zipfile.ZipFile(in_memory)
# in_memory_zip.extractall(f"{self.downloads_path}")
progress_bar.close()
print(f"Total Downloaded {convert_size(progress_bar.n)}")
if extra is None:
extra = req_type
req_data = {"req_type": req_type, "url": url, "headers": headers, "payload": payload, "stream":stream, "req_key": extra}
self.responses.append({"key": extra, "response": response, "request": req_data})
with open(f"{self.requests_path}{extra}.json", "w") as f:
f.write(json.dumps(req_data))
return response
def set_cookie(self):
req_data = self.config['set_cookie']['request']
return self.send_default_request(req_data, 'set_cookie')
def print_files_list(self):
print(f"\n\tCURRENT FOLDER: {self.current_folder if self.current_folder is not None else 'root'}")
print(f"\t{0:>3}.\t{'ALL':<50}")
for i in range(0, len(self.files_list)):
f = self.files_list[i]
f_type = "D" if f['isFolder'] else "F"
print(f"\t{i+1:>3}.\t{f['name']:<35}\t{convert_size(f['size']):<10} [{f_type}]")
def pick_file(self):
while(True):
self.print_files_list()
# choice = int(input("\nWhich file do you want to download? (-1 to exit): "))
choice = str(input("\n - Navigate: 'ls N' to enter the folder or 'ls ..' to go back)\n - Download: 'd N' to download the selected option \n> "))
if "ls " in choice:
choice = choice.split(" ")[1]
if ".." in choice:
self.current_folder = None
else:
self.current_folder = self.files_list[int(choice)-1]['name']
self.get_files_list()
continue
elif "d " in choice:
choice = int(choice.split(" ")[1])
if choice >= 0 and choice <= len(self.files_list):
return choice-1
elif choice == -1:
return None
def download_file(self, file_choice=None, folder=None):
if self.files_list is None:
print("Retrieving files list...")
self.get_files_list(folder)
file_data = None
if file_choice is None:
file_choice = self.pick_file()
if file_choice is None:
print("Exiting...")
return
if file_choice < 0:
file_choice = list(range(0, len(self.files_list)))
else:
file_data = self.files_list[file_choice]
if isinstance(file_choice, list):
print(f"Downloading {len(file_choice)} files...")
for i in range(0, len(file_choice)):
print(f"\nDownload {i+1}/{len(file_choice)}:")
self.download_file(int(file_choice[i]))
elif isinstance(file_choice, int):
if file_choice >= 0 and file_choice <= len(self.files_list):
file_data = self.files_list[file_choice]
else:
print(f"File choice {file_choice} outside of bounds (0-{len(self.files_list)})")
else:
files_found = filter(lambda x: x['name'] == file_choice, self.files_list)
if len(files_found) <= 0:
print(f"No file found with name: {file_choice}")
if file_data is None:
print("\nDownload canceled...")
return
try:
self.send_download_file_request(file_data)
except Exception as e:
print(f"Error downloading file {file_choice}")
print(e)
def send_download_file_request(self, file_data):
zip_file_path = self.downloads_path+f"{file_data['name']}.zip"
if os.path.exists(zip_file_path):
print(f"File already downloaded in: {zip_file_path}")
return
print(f"Downloading file '{file_data['name']}' to {self.downloads_path} Uncompressed size: {convert_size(file_data['size'])}")
url = f"{self.config['zip_main_link']}{self.callerStack}"
req_type = 'POST'
stream = True
headers = self.config['zip_headers']
payload = {}
payload['zipFileName'] = f"{file_data['name']}.zip"
payload['guid'] = self.correlationId
payload['provider'] = 'spo'
payload['files'] = {}
file_payload = {}
file_payload['name'] = file_data['name']
file_payload['size'] = 0
file_payload['docid'] = f"{file_data['.spItemUrl']}&{self.drive_access_token}"
file_payload['isFolder'] = str(file_data['isFolder']).lower()
payload['files']['items'] = [file_payload]
payload['oAuthToken'] = ''
# txt_payload = urllib.parse.urlencode(json.dumps(payload))
txt_payload = urllib.parse.urlencode(payload)
# print(f"\nPayload {txt_payload}")
self.send_request(req_type, url, headers, txt_payload, stream, extra=f"{file_data['name']}", file_size=file_data['size'])
def get_files_list(self, folder=None):
if len(self.get_session_cookies()) <= 0:
print("\nSetting cookie first...")
self.set_cookie()
if folder is None:
folder = self.current_folder
print(f"Listing files in '{self.current_folder}'")
self.files_list = None
req_data = self.config['get_files_list']['request']
req_data['url'] = self.config['root_folder_list_link']
req_data['url'] += urllib.parse.quote("/" +folder, safe='') if folder is not None else ""
req_data['url'] = req_data['url'].replace("_", "%5F")
resp = self.send_default_request(req_data, 'get_files_list')
files_data = resp.json()
self.drive_access_token = files_data["ListSchema"][".driveAccessToken"]
self.callerStack = files_data["ListSchema"][".callerStack"]
self.correlationId = files_data["ListSchema"][".correlationId"]
file_rows = files_data["ListData"]["Row"]
for row in file_rows:
file_meta = {}
for f_key in self.files_keys:
file_meta[f_key] = row[f_key]
if int(file_meta['ItemChildCount']) + int(file_meta['FolderChildCount']) > 0:
file_meta['isFolder'] = True
else:
file_meta['isFolder'] = False
file_meta['size'] = int(file_meta['SMTotalSize'])
file_meta['name'] = file_meta['FileLeafRef']
if self.files_list is None:
self.files_list = []
self.files_list.append(file_meta)
def get_last_response(self):
res = self.responses[-1]
if res['response'].content:
with open(f"last_response.txt", "wb") as f:
f.write(res['response'].content)
return self.responses[-1]
downloader = OneDriveDownloader("config.json")
# Example usage
# from onedrive_download import downloader
# files = [5, 1, 4, 6]
# downloader.download_files(files)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment