Skip to content

Instantly share code, notes, and snippets.

@st4ycool
Forked from Dront/vkcom_audio_download.py
Last active July 22, 2017 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save st4ycool/b15fdafdd571ecf00289c27ff5cae58f to your computer and use it in GitHub Desktop.
Save st4ycool/b15fdafdd571ecf00289c27ff5cae58f to your computer and use it in GitHub Desktop.
Python: vkontakte.ru (vk.com) audio music downloader
#! /usr/bin/env python3
"""
Скрипт для скачивания музыки с сайта vkontakte.ru (vk.com)
Запуск:
python vkcom_audio_download.py
Принцип работы:
Скрипт проверяет сохраненный access_token. Если его нет или срок истек,
то открывается страница в браузере с запросом на доступ к аккаунту.
После подтверждения идет редирект на https://oauth.vk.com/blank.htm#... .
Нужно скопировать весь url, на который вас редиректнуло и вставить его
в консоль скрипта.
Далее будут скачиваться все ваши аудиозаписи. Если аудиозапись уже есть на
диске - то скачивания не происходит.
Будут запрошены ваши данные приложением с app_id = 3358129
Можно создать свое Standalone-приложение с доступом к аудио здесь:
http://vk.com/editapp?act=create
И заменить APP_ID на ваше.
"""
import webbrowser
import pickle
import json
import urllib
import requests
import html.parser
import re
import os
import urllib.parse
from datetime import datetime, timedelta
import asyncio
from contextlib import closing
import aiohttp
# id of vk.com application, that has access to audio
APP_ID = '3358129'
# if None, then save mp3 in current folder
MUSIC_FOLDER = 'music'
# file, where auth data is saved
AUTH_FILE = '.auth_data'
# simultaneous requests count
REQUESTS_COUNT = 10
# chars to exclude from filename
FORBIDDEN_CHARS = '/\\\?%*:|"<>!'
def get_saved_auth_params():
access_token = None
user_id = None
try:
with open(AUTH_FILE, 'rb') as pkl_file:
token = pickle.load(pkl_file)
expires = pickle.load(pkl_file)
uid = pickle.load(pkl_file)
if datetime.now() < expires:
access_token = token
user_id = uid
except IOError:
pass
return access_token, user_id
def save_auth_params(access_token, expires_in, user_id):
expires = datetime.now() + timedelta(seconds=int(expires_in))
with open(AUTH_FILE, 'wb') as output:
pickle.dump(access_token, output)
pickle.dump(expires, output)
pickle.dump(user_id, output)
def get_auth_params():
auth_url = ("https://oauth.vk.com/authorize?client_id={app_id}"
"&scope=audio&redirect_uri=http://oauth.vk.com/blank.html"
"&display=page&response_type=token".format(app_id=APP_ID))
webbrowser.open_new_tab(auth_url)
redirected_url = input("Paste here url you were redirected:\n")
aup = urllib.parse.parse_qs(redirected_url)
aup['access_token'] = aup.pop(
'https://oauth.vk.com/blank.html#access_token')
save_auth_params(aup['access_token'][0], aup['expires_in'][0],
aup['user_id'][0])
return aup['access_token'][0], aup['user_id'][0]
def get_tracks_metadata(access_token, user_id):
url = ("https://api.vk.com/method/audio.get.json?"
"uid={uid}&access_token={atoken}".format(
uid=user_id, atoken=access_token))
return requests.get(url).json()['response']
def get_track_full_name(t_data):
html_parser = html.parser.HTMLParser()
full_name = u"{0}_{1}".format(
html_parser.unescape(t_data['artist'][:100]).strip(),
html_parser.unescape(t_data['title'][:100]).strip(),
)
full_name = re.sub('[' + FORBIDDEN_CHARS + ']', "", full_name)
full_name = re.sub(' +', ' ', full_name)
return full_name + ".mp3"
async def download(t_url, t_name, session, semaphore):
t_path = os.path.join(MUSIC_FOLDER or "", t_name)
if os.path.exists(t_path):
return
with (await semaphore):
print('Downloading', t_name)
response = await session.get(t_url)
with closing(response), open(t_path, 'wb') as file:
while True:
chunk = await response.content.read()
if not chunk:
break
file.write(chunk)
print('Saved file', t_name)
return t_name
def main():
access_token, user_id = get_saved_auth_params()
if not access_token or not user_id:
access_token, user_id = get_auth_params()
tracks = get_tracks_metadata(access_token, user_id)
if MUSIC_FOLDER and not os.path.exists(MUSIC_FOLDER):
os.makedirs(MUSIC_FOLDER)
t_info = ((t['url'], get_track_full_name(t)) for t in tracks)
with closing(asyncio.get_event_loop()) as loop, \
closing(aiohttp.ClientSession()) as session:
semaphore = asyncio.Semaphore(REQUESTS_COUNT)
download_tasks = (download(url, name, session, semaphore) for url, name in t_info)
result = loop.run_until_complete(asyncio.gather(*download_tasks))
print("All music is up to date")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment