网易云音乐远程控制
#!/usr/bin/env python | |
#coding:utf-8 | |
# Author: Sg4Dylan -- <sg4dylan#gmail.com> | |
# Update: 10/13/2017 | |
''' | |
透过 Chrome Debug Protocol 读取网易云音乐当前播放状态 | |
启动播放器时附带启动参数 | |
--remote-debugging-port=8080 | |
安装依赖 | |
pip install requests cproto BeautifulSoup4 | |
''' | |
import json | |
import requests | |
from bs4 import BeautifulSoup | |
from cproto import CProto | |
def get_neteast_status(remote_host, remote_port): | |
def get_node_id(children, name): | |
for i in children: | |
if name in i.get('nodeName',''): | |
return i['nodeId'] | |
if 'children' in i: | |
return get_node_id(i['children'], name) | |
def detect_if_fm(): | |
raw_api = requests.get('http://{0}:{1}/json'.format(remote_host, remote_port)) | |
page_url = json.loads(raw_api.content.decode('UTF-8'))[0]['url'] | |
if page_url.endswith('fm/'): | |
return True | |
return False | |
def play_status(cp): | |
status_result = { | |
'is_pause': False, | |
'time_now': '', | |
'time_all': '' | |
} | |
# Get body nodeId | |
raw_dom_info = cp.DOM.getDocument() | |
root_children = raw_dom_info['result']['root']['children'] | |
body_nodeid = get_node_id(root_children, 'BODY') | |
# Get body HTML | |
body_html = cp.DOM.getOuterHTML(nodeId=body_nodeid)['result']['outerHTML'] | |
# Get play status | |
body_soup = BeautifulSoup(body_html, "html.parser") | |
target_dom = body_soup.select("[class~=btnp]") | |
# Detect FM page | |
index_id = 0 | |
if detect_if_fm(): | |
index_id = 1 | |
status_result['is_pause'] = True if 'btnp-play' in target_dom[index_id].get('class') else False | |
status_result['time_now'] = body_soup.select("[class~=now]")[index_id].contents[0] | |
status_result['time_all'] = body_soup.select("[class~=all]")[index_id].contents[0] | |
return status_result | |
# Create CProto instance and connect to a browser over CDP | |
cp = CProto(host=remote_host, port=remote_port) | |
result = play_status(cp) | |
cp.close() | |
return result | |
print(get_neteast_status('127.0.0.1',8080)) | |
# {'is_pause': True, 'time_now': '00:11', 'time_all': '03:41'} |
# NeteaseMusicController - impl from fb2kcontroller | |
# version: alpha 0.7 | |
""" | |
依赖: | |
pip install cherrypy psutil requests pymediainfo | |
pip install https://github.com/AndreMiras/pycaw/archive/master.zip | |
pywin32 <- 这个自己找预编译安装包搞定 | |
pymediainfo <- 这个库需要自己放一个 MediaInfo.dll (与Python的编译架构相同) 放在环境变量目录里 | |
设定: | |
启动播放器时附带启动参数 | |
--remote-debugging-port=8088 | |
手机下载 foobar2000controller 并与电脑在同一局域网 | |
填写电脑的 IP | |
端口是 8080 | |
用户名密码空着 | |
Changelog: | |
alpha 0.7 | |
- 透过 CDP 协议读播放状态 | |
alpha 0.6 | |
- 修复一个读取歌曲信息的遗漏BUG | |
- 加入播放列表切换功能(仅支持顺序循环播放功能) | |
alpha 0.5 | |
- 完善播放列表(修复BUG) | |
alpha 0.4 | |
- 实现播放列表显示(仍有BUG) | |
alpha 0.3 | |
- 修复了私人FM状态读取 | |
alpha 0.2 | |
- 修复了本地音乐读取错误的问题 | |
alpha 0.1 | |
- 初次完成编写 | |
TODO: | |
- 实现进度条读取(完成) | |
- 实现播放状态检测(完成) | |
- 实现点击播放列表切换歌曲(咕咕一半) | |
""" | |
# 基础库 | |
import cherrypy | |
import json | |
# 键盘动作控制 | |
import win32api | |
import time | |
# 音量控制 | |
from ctypes import cast, POINTER | |
from comtypes import CLSCTX_ALL | |
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume | |
import math | |
# 读取播放信息 | |
import os | |
import json | |
import string | |
import psutil | |
from ctypes import wintypes | |
import win32con | |
import win32gui | |
import win32process | |
from pymediainfo import MediaInfo | |
from get_play_status_from_netease_music import get_neteast_status | |
# 封面图 | |
import requests | |
import uuid | |
class FoorbarAPI(object): | |
codec_info = "" | |
song_name = "" | |
song_artist = "" | |
song_album = "" | |
play_status = 1 | |
eta_time = "" | |
total_time = "" | |
cover_link = "" | |
now_volume = "" | |
playlist_now = "" | |
now_foc = 0 | |
headers = { | |
'Accept-Encoding': 'gzip, deflate, sdch', | |
'Accept-Language': 'en-US,en;q=0.8,zh-CN;q=0.6,zh;q=0.4', | |
'DNT': '1', | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' | |
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2873.0 Safari/537.36', | |
} | |
devices = AudioUtilities.GetSpeakers() | |
interface = devices.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None) | |
volume = cast(interface, POINTER(IAudioEndpointVolume)) | |
@cherrypy.expose | |
def index(self): | |
return "OK" | |
@cherrypy.expose | |
def foobar2000controller(self, cmd="", param3="", param1=""): | |
self.api_update_info() | |
# 检查连接 | |
if cmd=="FormatTitles" and param1=="": | |
return "[]" | |
if cmd=="FormatTitles" and param1=="%path%": | |
return "[]" | |
# 队列信息 | |
if cmd=="GetQueue" and param3=="NoResponse": | |
return "" | |
# 获取分类风格信息 | |
if param3=="library.json": | |
return """{ | |
"queryInfo": [], | |
"query": [""] | |
}""" | |
# 获取歌曲长度 | |
if cmd=="FormatTitles" and param1=="%length_seconds%": | |
return "[\"%s\"]" % self.total_time | |
# 读取当前信息 | |
if param3=="info.json": | |
return self.api_info() | |
# 歌曲库 | |
if param3=="playlist.json": | |
return self.playlist_now | |
# 文件访问 | |
if param3=="browser.json": | |
return """{ | |
"path": [], | |
"pathcurrent": "", | |
"parent": "%20", | |
"browser": [] | |
}""" | |
# 版本 | |
if cmd=="Version": | |
return "" | |
# 播放列表页 | |
if cmd=="PlaylistItemsPerPage": | |
return "" | |
# 返回歌曲的封面 | |
if "albumart" in cherrypy.url(): | |
return self.api_cover("covers/"+cherrypy.url().split("albumart_")[1]) | |
# 停止 or 播放 | |
if cmd=="PlayOrPause": | |
self.api_startorpause() | |
# 播放列表切歌 | |
if cmd=="Start": | |
# 重播这一曲 | |
if not param1: | |
self.api_replay() | |
# 切换 | |
else: | |
self.api_change_now_play(int(param1)) | |
# 上一曲 | |
if cmd=="StartPrevious": | |
self.api_play_previous() | |
# 下一曲 | |
if cmd=="StartNext": | |
self.api_play_next() | |
# 音量控制 | |
if cmd=="Volume": | |
if param1: | |
self.api_volume_control(param1) | |
return "OK" | |
def api_cover(self, download_img_name): | |
if not os.path.isfile(download_img_name): | |
curl_command = "curl -o " + download_img_name + \ | |
" -A \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " \ | |
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2906.0 Safari/537.36\" " | |
final_command = curl_command + self.cover_link | |
os.system(final_command) | |
cherrypy.response.headers['Content-Type'] = "image/jpg" | |
return open(download_img_name, 'rb') | |
def api_change_now_play(self, target_pos): | |
if target_pos - self.now_foc >= 0: | |
for i in range(target_pos - self.now_foc): | |
self.api_play_next() | |
time.sleep(0.15) | |
else: | |
for i in range(self.now_foc - target_pos): | |
self.api_play_previous() | |
time.sleep(0.15) | |
def api_startorpause(self): | |
win32api.keybd_event(0xB3, win32api.MapVirtualKey(0xB3, 0)) | |
def api_replay(self): | |
win32api.keybd_event(0xB1, win32api.MapVirtualKey(0xB1, 0)) | |
time.sleep(0.05) | |
win32api.keybd_event(0xB0, win32api.MapVirtualKey(0xB0, 0)) | |
def api_play_previous(self): | |
win32api.keybd_event(0xB1, win32api.MapVirtualKey(0xB1, 0)) | |
def api_play_next(self): | |
win32api.keybd_event(0xB0, win32api.MapVirtualKey(0xB0, 0)) | |
def api_volume_control(self, value): | |
target_volume = 34.5 * math.log(int(value)/200, 10) | |
self.volume.SetMasterVolumeLevel(target_volume, None) | |
def api_detect_play_info(self): | |
# 读窗体标题识别歌曲名, 读文件句柄识别当前打开文件 | |
window_title,file_path,self.song_name,self.song_artist,self.song_album,self.total_time,self.cover_link,self.codec_info = ReadTitlePath().get_info() | |
# 检测当前音量 | |
self.now_volume = str(int((10 ** (self.volume.GetMasterVolumeLevel()/34.5))*200)) | |
def api_detect_play_status(self): | |
# 播放状态 | |
status_dict = get_neteast_status('127.0.0.1',8088) | |
self.play_status = 0 if status_dict['is_pause'] else 1 | |
time_now = status_dict['time_now'].split(':',1) | |
self.eta_time = str(int(self.total_time)-60*int(time_now[0])-int(time_now[1])) | |
pass | |
def api_update_info(self): | |
self.api_detect_play_info() | |
self.api_detect_play_status() | |
self.playlist_now,self.now_foc = ConvertPlaylist(self.song_name,self.song_artist,self.song_album).parse_netease_json() | |
def api_info(self): | |
json_dict = {"codec":"","helper2":"","isPlaying":"","isPaused":"","elapsedTime":"0","trackLength":"","volume":"100","volumeDB":"0","order":"1","albumArt":"","itemplaying":"0","page":"1","itemsPage":"60","search":"","sac":"","nowPlayingRating":""} | |
# 当前歌曲编码信息 | |
json_dict["codec"] = self.codec_info | |
# 歌手 歌曲名 专辑名 | |
json_dict["helper2"] = self.song_artist.replace("-","–") + " - " + self.song_name.replace("-","–") + " - " + self.song_album.replace("-","–") | |
# 播放状态 | |
json_dict["isPlaying"] = "1" if self.play_status == 1 else "0" | |
json_dict["isPaused"] = "1" if self.play_status == 0 else "0" | |
# 剩余时间 | |
json_dict["elapsedTime"] = self.eta_time | |
# 歌曲时间 | |
json_dict["trackLength"] = self.total_time | |
# 随机化音乐封面名称 | |
json_dict["albumArt"] = "/foobar2000controller/albumart_" + str(uuid.uuid5(uuid.NAMESPACE_URL, json_dict["helper2"])) | |
# 音量 | |
json_dict["volume"] = self.now_volume | |
# 返回 JSON | |
response_json = json.dumps(json_dict, ensure_ascii=False) | |
return response_json.encode('utf-8') | |
class ConvertPlaylist: | |
json_str = "" | |
file_name = os.getenv('APPDATA')[:-7] + "Local\\Netease\\CloudMusic\\webdata\\file\\queue" | |
song_name = "" | |
song_artist = "" | |
song_album = "" | |
def __init__(self, song_name, song_artist, song_album): | |
file = "" | |
self.song_name, self.song_artist, self.song_album = song_name, song_artist, song_album | |
with open(self.file_name,"rb") as target: | |
file = target.read().decode("utf-8") | |
self.json_str = json.loads(file) | |
def parse_netease_json(self): | |
count_ = 0 | |
song_time = 0 | |
playlist = {'playlist':{'js':[],"t":"16:48:20","pa":"1","pp":"1","page":"0","pages":"1","foc":"1","ip":"1","ipp":"999","ic":"249"}, | |
"order":"1", | |
"playlists":{"js":[{"name":"网易云音乐","count":"0"}],"ac":"1","pl":"1","page":"0","t":""} | |
} | |
song_foc = 0 | |
for item in self.json_str: | |
count_ += 1 | |
temp_dict = {} | |
temp_dict['artist'] = item['track']['artists'][0]['name'] | |
temp_dict['album'] = item['track']['album']['name'] | |
temp_dict['track'] = item['track']['name'] | |
song_time +=item['track']['duration'] | |
temp_dict['len'] = "%s:%s" % (str(int(item['track']['duration']/60000)) , str(int(item['track']['duration']/1000 % 60))) | |
temp_dict['rating'] = "?" | |
temp_dict['queued'] = "" | |
temp_dict['playCount'] = "1" | |
# 填充当前播放位置 | |
if temp_dict['artist'] == self.song_artist and temp_dict['album'] == self.song_album and temp_dict['track'] == self.song_name: | |
song_foc = count_ - 1 | |
# 写入列表 | |
playlist['playlist']['js'].append(temp_dict) | |
playlist['playlist']['ic'] = str(count_) | |
playlist['playlists']['js'][0]['count'] = str(count_) | |
playlist['playlist']['foc'] = str(song_foc) | |
playlist['playlist']['ip'] = str(song_foc) | |
playlist['playlist']['t'] = time.strftime('%H:%M:%S',time.gmtime(song_time)) | |
playlist['playlists']['t'] = playlist['playlist']['t'] | |
return json.dumps(playlist, ensure_ascii=False), song_foc | |
class ReadTitlePath: | |
title = "" | |
target_path = "" | |
codec_info = "" | |
song_name = "" | |
song_artist = "" | |
song_album = "" | |
song_len = "" | |
cover_link = "" | |
playlist = "" | |
def enumWindowsProc(self, hwnd, lParam): | |
if (lParam is None) or ((lParam is not None) and (win32process.GetWindowThreadProcessId(hwnd)[1] == lParam)): | |
text = win32gui.GetWindowText(hwnd) | |
if text: | |
wStyle = win32api.GetWindowLong(hwnd, win32con.GWL_STYLE) | |
if wStyle & win32con.WS_VISIBLE: | |
self.title = text | |
def enumProcWnds(self, pid=None): | |
win32gui.EnumWindows(self.enumWindowsProc, pid) | |
def getProcName(self, procName): | |
pid_list = [] | |
for proc in psutil.process_iter(): | |
if proc.name() == procName: | |
pid = str(proc) | |
rig = pid.split('=',3) | |
pid = rig[1] | |
rig = pid.split(',',2) | |
pid = rig[0] | |
pid_list.append(pid) | |
return pid_list | |
def get_info(self): | |
pid = self.getProcName("cloudmusic.exe") | |
for x in range(0, len(pid)): | |
self.enumProcWnds(int(pid[x])) | |
if self.title != '': | |
target_process = psutil.Process(int(pid[x])) | |
self.target_path = "" | |
for x in target_process.open_files(): | |
if "mp3" in x.path or "aac" in x.path or "flac" in x.path or "ape" in x.path: | |
self.target_path = x.path | |
self.get_detail_info() | |
return self.title, self.target_path, self.song_name, self.song_artist, self.song_album, self.song_len, self.cover_link, self.codec_info | |
def parse_netease_json(self): | |
for item in self.playlist: | |
if self.song_name in item['track']['name']: | |
self.song_artist = item['track']['artists'][0]['name'] | |
self.song_album = item['track']['album']['name'] | |
self.cover_link = item['track']['album']['picUrl'] | |
self.song_len = str(int(int(item['track']['duration'])/1000)) | |
# 文件信息读取 | |
read_else = True | |
try: | |
if "playFile" in item: | |
media_info = MediaInfo.parse(item['playFile']) | |
track = media_info.tracks[1] | |
codec_t = media_info.tracks[0].codec | |
self.codec_info = codec_t.upper() + " | " + str(int(track.bit_rate/1000)) + "kbps | " + str(int(track.sampling_rate)) + "Hz" | |
read_else = False | |
elif "yunSong" in item['track']: | |
if item['track']['yunSong']: | |
self.codec_info = item['track']['yunSong']['fileExt'][1:].upper() + " | " + str(int(item['track']['yunSong']['bitrate'])) + "kbps" | |
read_else = False | |
else: | |
read_else = True | |
if read_else: | |
target_mode = "hMusic" | |
if 'bMusic' in item['track']: | |
if item['track']['bMusic']: | |
target_mode = "bMusic" | |
if 'lMusic' in item['track']: | |
if item['track']['lMusic']: | |
target_mode = "lMusic" | |
if 'mMusic' in item['track']: | |
if item['track']['mMusic']: | |
target_mode = "mMusic" | |
if 'hMusic' in item['track']: | |
if item['track']['hMusic']: | |
target_mode = "hMusic" | |
if "extension" in item['track'][target_mode]: | |
self.codec_info = item['track'][target_mode]['extension'].upper() + " | " | |
self.codec_info += str(int(item['track'][target_mode]['bitrate'])/1000) + "kbps | " | |
if "sr" in item['track'][target_mode]: | |
self.codec_info += str(item['track'][target_mode]['sr']) + "Hz" | |
else: | |
self.codec_info += "44100Hz" | |
except: | |
print(item) | |
def get_detail_info(self): | |
# 当前播放歌曲名 | |
self.song_artist = self.title.split(" - ")[-1] | |
self.song_name = self.title.split(" - " + self.title.split(" - ")[-1])[0] | |
# 播放列表 JSON | |
playlist_path = os.getenv('APPDATA')[:-7] + "Local\\Netease\\CloudMusic\\webdata\\file\\queue" | |
playlist_raw_json = "" | |
with open(playlist_path, 'rb') as playlist_text: | |
playlist_raw_json = playlist_text.read().decode("utf-8") | |
self.playlist = json.loads(playlist_raw_json) | |
# 读取信息 | |
self.parse_netease_json() | |
# 检测是否读取成功,如果是FM,应该是会失败的 | |
if not self.codec_info: | |
# FM 播放列表 | |
playlist_path = os.getenv('APPDATA')[:-7] + "Local\\Netease\\CloudMusic\\webdata\\file\\history" | |
playlist_raw_json = "" | |
with open(playlist_path, 'rb') as playlist_text: | |
playlist_raw_json = playlist_text.read().decode("utf-8") | |
self.playlist = json.loads(playlist_raw_json) | |
# 读取信息 | |
self.parse_netease_json() | |
if __name__ == '__main__': | |
cherrypy.server.socket_host = '0.0.0.0' | |
cherrypy.quickstart(FoorbarAPI()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment