Last active
February 14, 2020 12:40
-
-
Save mengyyy/39e30a8318d4b3ae9c07d13edb4b5724 to your computer and use it in GitHub Desktop.
aria2 rpc with aiohttp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import base64 | |
import os | |
import aiohttp | |
import stackprinter | |
from loguru import logger | |
DEFAULT_KEYS = ( | |
# "bitfield", | |
"bittorrent", | |
"completedLength", | |
# "connections", | |
"dir", | |
# "downloadSpeed", | |
"files", | |
"following", | |
"gid", | |
"infoHash", | |
# "numPieces", | |
# "numSeeders", | |
# "pieceLength", | |
"seeder", | |
"status", | |
"totalLength", | |
# "uploadLength", | |
# "uploadSpeed", | |
) | |
HOWS = ( | |
"POS_SET", # If how is POS_SET, it moves the download to a position relative to the beginning of the queue. | |
"POS_CUR", # If how is POS_CUR, it moves the download to a position relative to the current position. | |
"POS_END", # If how is POS_END, it moves the download to a position relative to the end of the queue. | |
) | |
class MengyAria2AsyncAPI: | |
def __init__(self, token,): | |
self.token = f"token:{token}" | |
def _handle_fun(self, data): | |
logger.debug(f"get data | {data}") | |
return data | |
def _do(self, method, params=tuple(), multi=False): | |
if multi is True: | |
# token added in param in params | |
data = {"jsonrpc": "2.0", "method": method, "id": "mengy", "params": [params]} | |
else: | |
data = {"jsonrpc": "2.0", "method": method, "id": "mengy", "params": [self.token, *params]} | |
return self._handle_fun(data) | |
def addUri(self, uris, options=dict(), position=0): | |
params = [uris, options, position] | |
return self._do("aria2.addUri", params) | |
def addTorrent(self, torrent_file_path, uris=tuple(), options=dict(), position=0): | |
if not os.path.isfile(torrent_file_path): | |
logger.error(f"torrent file not found {torrent_file_path}") | |
return None | |
torrent = base64.b64encode(open(torrent_file_path, "rb").read()).decode() | |
params = [torrent, uris, options, position] | |
return self._do("aria2.addTorrent", params) | |
def addMetalink(self, metalink_file_path, options=dict(), position=0): | |
if not os.path.isfile(metalink_file_path): | |
logger.error(f"metalink file not found {metalink_file_path}") | |
return None | |
metalink = base64.b64encode(open(metalink_file_path, "rb").read()).decode() | |
params = [metalink, options, position] | |
return self._do("aria2.addMetalink", params) | |
def remove(self, gid): | |
return self._do("aria2.remove", params=[gid]) | |
def forceRemove(self, gid): | |
return self._do("aria2.forceRemove", params=[gid]) | |
def pause(self, gid): | |
return self._do("aria2.pause", params=[gid]) | |
def pauseAll(self): | |
return self._do("aria2.pauseAll") | |
def forcePause(self, gid): | |
return self._do("aria2.forcePause", params=[gid]) | |
def forcePauseAll(self): | |
return self._do("aria2.forcePauseAll") | |
def unpause(self, gid): | |
return self._do("aria2.unpause", params=[gid]) | |
def unpauseAll(self): | |
return self._do("aria2.unpauseAll") | |
def tellStatus(self, gid, keys=DEFAULT_KEYS): | |
return self._do("aria2.tellStatus", params=[gid, keys]) | |
def getUris(self, gid): | |
return self._do("aria2.getUris", params=[gid]) | |
def getFiles(self, gid): | |
return self._do("aria2.getFiles", params=[gid]) | |
def getPeers(self, gid): | |
return self._do("aria2.getPeers", params=[gid]) | |
def getServers(self, gid): | |
return self._do("aria2.getServers", params=[gid]) | |
def tellActive(self, keys=DEFAULT_KEYS): | |
return self._do("aria2.tellActive", params=[keys]) | |
def tellWaiting(self, offset=0, num=300, keys=DEFAULT_KEYS): | |
return self._do("aria2.tellWaiting", params=[offset, num, keys]) | |
def tellStopped(self, offset=0, num=300, keys=DEFAULT_KEYS): | |
return self._do("aria2.tellStopped", params=[offset, num, keys]) | |
def changePosition(self, gid, pos=0, how="POS_END"): | |
if how not in HOWS: | |
logger.error(f"param how is not incalid | {how}") | |
return None | |
return self._do("aria2.changePosition", params=[gid, pos, how]) | |
def changeUri(self, gid, fileIndex=1, delUris=tuple(), addUris=tuple(), position=0): | |
return self._do("aria2.changeUri", params=[gid, fileIndex, delUris, addUris, position]) | |
def getOption(self, gid): | |
return self._do("aria2.getOption", params=[gid]) | |
def changeOption(self, gid, options=dict()): | |
return self._do("aria2.changeOption", params=[gid, options]) | |
def getGlobalOption(self): | |
return self._do("aria2.getGlobalOption") | |
def changeGlobalOption(self, options=dict()): | |
return self._do("aria2.changeGlobalOption", params=[options]) | |
def getGlobalStat(self): | |
return self._do("aria2.getGlobalStat") | |
def purgeDownloadResult(self): | |
return self._do("aria2.purgeDownloadResult") | |
def removeDownloadResult(self, gid): | |
return self._do("aria2.removeDownloadResult", params=[gid]) | |
def getVersion(self): | |
return self._do("aria2.getVersion") | |
def getSessionInfo(self): | |
return self._do("aria2.getSessionInfo") | |
def shutdown(self): | |
return self._do("arai2.shutdown") | |
def forceShutdown(self): | |
return self._do("arai2.forceShutdown") | |
def saveSession(self): | |
return self._do("arai2.saveSession") | |
def sys_multicall(self, method_params_pairs): | |
params = [] | |
for m, p in method_params_pairs: | |
params.append(dict(methodName=m, params=[self.token, *p])) | |
return self._do("system.multicall", params=params, multi=True) | |
def sys_listMethods(self): | |
return self._do("system.listMethods") | |
def sys_listNotifications(self): | |
return self._do("system.listNotifications") | |
class MengyAria2AsyncHttpPost(MengyAria2AsyncAPI): | |
def __init__(self, server_url, token, timeout=180): | |
MengyAria2AsyncAPI.__init__(self, token) | |
self.server_url = server_url | |
self.timeout = timeout | |
async def _handle_fun(self, data): | |
try: | |
async with aiohttp.ClientSession(timeout=self.timeout) as session: | |
async with session.post(self.server_url, json=data, timeout=self.timeout) as resp: | |
resp_dict = await resp.json() | |
status_code = resp.status | |
if status_code != 200: | |
logger.error(f"_request failed | status code {status_code} | {resp_dict}") | |
return resp_dict.get("result") | |
except Exception: | |
logger.error(f"reaquest failed | {data}\n{stackprinter.fromat()}") | |
return None | |
if __name__ == '__main__': | |
self = MengyAria2AsyncHttpPost("https://example.com:6800/jsonrpc", "my_token") | |
async def test(): | |
tellActive_result = await self.tellActive() | |
logger.debug(f"tellActive_result | {tellActive_result}") | |
addUri_result = await self.addUri(["http://releases.ubuntu.com/16.04/ubuntu-16.04.6-server-i386.iso.torrent"]) | |
logger.debug(f"addUri_result | {addUri_result}") | |
addTorrent_result = await self.addTorrent("test.torrent") | |
logger.debug(f"addTorrent_result | {addTorrent_result}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment